Source code for sim2l.executor.notebook

# @package    sim2l library
# @copyright  Copyright (c) 2005-2026 Purdue University.
# @license    http://opensource.org/licenses/MIT MIT

"""Notebook executor using Papermill"""

import os
import uuid
import tempfile
import shutil
from pathlib import Path
from typing import Dict, Any, Optional
from datetime import datetime
import time
import sqlite3
import json

import papermill as pm

from .base import Executor
from ..definition import SimulationDefinition
from ..result import ExecutionResult
from ..utils import compute_squid_id
from ..utils.serialization import serialize_output_value as _to_value
from ..config import get_config, get_logger

logger = get_logger()


[docs] class NotebookExecutor(Executor): """Execute simulations using Papermill This executor runs Jupyter notebooks using Papermill, similar to simtool's LocalRun class. """
[docs] def __init__( self, cache: bool = True, output_dir: Optional[Path] = None, copy_files: bool = True, register_results: bool = None, ): """Initialize NotebookExecutor Args: cache: Enable caching output_dir: Output directory (uses temp if None) copy_files: Copy supporting files to output directory register_results: Register results with results service (default: from config) """ super().__init__(cache=cache, output_dir=output_dir) self.copy_files = copy_files # Check if results registration should be enabled # Use parameter if provided, otherwise check config if register_results is None: config = get_config() # Enable if results service URL is configured or results database exists self.register_results = ( hasattr(config, 'results_service_url') and config.results_service_url is not None ) or (Path.home() / ".sim2l" / "results.db").exists() else: self.register_results = register_results
[docs] def check_cache( self, simulation: SimulationDefinition, inputs: Dict[str, Any], ) -> Optional[ExecutionResult]: """Check if cached result exists Args: simulation: Simulation definition inputs: Input parameters Returns: Cached ExecutionResult or None """ if not self.cache: logger.debug("Cache is disabled, skipping cache check") return None # Prepare/validate inputs to match what will be used in execution validated_inputs = self.prepare_inputs(simulation, inputs) # Compute SQUID ID (deterministic cache key) squid_id = compute_squid_id( simtool_name=simulation.name, simtool_revision=simulation.version, inputs=validated_inputs ) logger.debug(f"Computed SQUID ID for cache lookup: {squid_id}") # Check if cache service is configured config = get_config() logger.debug(f"Cache service URL: {getattr(config, 'cache_service_url', 'NOT SET')}") if hasattr(config, 'cache_service_url') and config.cache_service_url: # Use cache service with SQUID ID as cache key logger.debug(f"Using cache service at {config.cache_service_url}") return self._check_cache_service(squid_id) else: # Use local SQLite cache with SQUID ID as cache key logger.debug("Using local SQLite cache") return self._check_cache_local(squid_id)
def _check_cache_service(self, cache_key: str) -> Optional[ExecutionResult]: """Check cache service for cached result""" try: from ..database import CacheClient config = get_config() logger.debug(f"Checking cache service for key: {cache_key}") cache_client = CacheClient(service_url=config.cache_service_url) cached_data = cache_client.get(cache_key) if cached_data is None: logger.debug(f"Cache miss for key: {cache_key}") return None logger.debug(f"Cache hit for key: {cache_key}, data: {cached_data}") # Load execution result from cached data execution_id = cached_data.get('execution_id') if execution_id: from ..result import load_result logger.debug(f"Loading cached result from execution: {execution_id}") result = load_result(execution_id) logger.info(f"CACHED. Fetching results from execution {execution_id[:8]}...") return result logger.warning(f"Cache data missing execution_id: {cached_data}") return None except Exception as e: logger.warning(f"Failed to check cache service: {e}", exc_info=True) return None def _check_cache_local(self, cache_key: str) -> Optional[ExecutionResult]: """Check local SQLite cache for cached result""" db_path = get_config().db_path conn = sqlite3.connect(db_path) cursor = conn.cursor() try: cursor.execute(""" SELECT execution_id FROM cache WHERE cache_key = ? """, (cache_key,)) row = cursor.fetchone() if row is None: return None execution_id = row[0] # Update cache access cursor.execute(""" UPDATE cache SET last_accessed = ?, access_count = access_count + 1 WHERE cache_key = ? """, (datetime.now().isoformat(), cache_key)) conn.commit() # Load execution result from ..result import load_result result = load_result(execution_id) logger.info(f"CACHED. Fetching results from execution {execution_id[:8]}...") return result finally: conn.close()
[docs] def execute( self, simulation: SimulationDefinition, inputs: Dict[str, Any], run_name: Optional[str] = None, ) -> ExecutionResult: """Execute simulation notebook using Papermill Args: simulation: Simulation definition inputs: Input parameters run_name: Optional run name Returns: ExecutionResult Raises: ExecutionError: If execution fails """ # Check cache first cached_result = self.check_cache(simulation, inputs) if cached_result is not None: return cached_result # Prepare inputs validated_inputs = self.prepare_inputs(simulation, inputs) # Generate run name if run_name is None: run_name = str(uuid.uuid4()).replace('-', '') # Create output directory if self.output_dir: outdir = Path(self.output_dir) / run_name else: outdir = Path(tempfile.gettempdir()) / "sim2l_runs" / run_name outdir.mkdir(parents=True, exist_ok=True) logger.info(f"Executing {simulation.name} v{simulation.version} in {outdir}") # Write notebook to temporary file notebook_path = outdir / f"{simulation.name}.ipynb" with open(notebook_path, 'wb') as f: f.write(simulation.workflow) # Output notebook path output_notebook = outdir / f"{simulation.name}_output.ipynb" # Get simulation DB ID from ..repository import SimulationRepository repo = SimulationRepository() sim_id = repo.get_simulation_id(simulation.name, simulation.version) # Compute SQUID ID (also used as cache key for deterministic caching) squid_id = compute_squid_id( simtool_name=simulation.name, simtool_revision=simulation.version, inputs=validated_inputs ) # Use SQUID ID as cache key (deterministic based on simulation + inputs) cache_key = squid_id # Create execution result result = ExecutionResult.create( simulation_id=sim_id or 0, simulation_name=simulation.name, simulation_version=simulation.version, inputs=validated_inputs, output_schema=simulation.outputs, executor_type="notebook", cache_key=cache_key, squid_id=squid_id, ) # Execute notebook start_time = time.time() try: # Convert Pint Quantity objects to plain values for Papermill papermill_params = {} for key, value in validated_inputs.items(): if hasattr(value, 'magnitude'): # Extract magnitude from Pint Quantity papermill_params[key] = value.magnitude else: papermill_params[key] = value # Set environment variables for save_outputs() from ..config import get_config # Convert DB path to absolute path db_path = Path(get_config().db_path).resolve() # NOTE: os.environ is process-wide and NOT thread-safe. # If two NotebookExecutor.execute() calls run concurrently in the # same process (e.g. via threading), these env vars may race. # Papermill itself spawns a subprocess so the env is forked safely, # but the set/clear window here is not protected by a lock. os.environ['SIM2L_EXECUTION_ID'] = result.execution_id os.environ['SIM2L_SQUID_ID'] = squid_id os.environ['SIM2L_DB_PATH'] = str(db_path) # Execute with Papermill pm.execute_notebook( input_path=str(notebook_path), output_path=str(output_notebook), parameters=papermill_params, cwd=str(outdir), ) # Clean up environment variables os.environ.pop('SIM2L_EXECUTION_ID', None) os.environ.pop('SIM2L_SQUID_ID', None) os.environ.pop('SIM2L_DB_PATH', None) # Calculate duration duration = time.time() - start_time result.duration_seconds = duration # Extract outputs from executed notebook outputs = self._extract_outputs(output_notebook, simulation.outputs, result.execution_id) result.set_outputs(outputs) result.status = "completed" logger.info(f"Execution completed in {duration:.2f}s") except Exception as e: duration = time.time() - start_time result.duration_seconds = duration result.set_error(str(e)) logger.error(f"Execution failed: {e}") # Save result to database result.save() # Store in cache service if configured and execution successful if self.cache and result.status == "completed" and result.cache_key: config = get_config() if hasattr(config, 'cache_service_url') and config.cache_service_url: self._store_cache_service(result) # Register with results service if enabled if self.register_results and result.status == "completed": self._register_result(result, simulation, validated_inputs) return result
def _extract_outputs( self, notebook_path: Path, output_schema, execution_id: str = None ) -> Dict[str, Any]: """Extract outputs from executed notebook Reads outputs from database (saved by save_outputs() in notebook). Falls back to scrapbook for backward compatibility. Args: notebook_path: Path to executed notebook output_schema: Output schema execution_id: Execution ID to read outputs for Returns: Dictionary of output values """ outputs = {} # Try to read from database first (saved by save_outputs()) if execution_id: try: import sqlite3 from ..config import get_config from ..utils.serialization import deserialize_value db_path = get_config().db_path conn = sqlite3.connect(db_path) cursor = conn.cursor() # Read outputs from database cursor.execute(""" SELECT name, value FROM outputs WHERE execution_id = ? """, (execution_id,)) for name, value_json in cursor.fetchall(): if name in output_schema.keys(): try: outputs[name] = deserialize_value(value_json) except Exception as e: logger.warning(f"Failed to deserialize output '{name}': {e}") conn.close() # If we got outputs from database, return them if outputs: logger.info(f"Extracted {len(outputs)} outputs from database") return outputs except Exception as e: logger.warning(f"Failed to read outputs from database: {e}") # Fall back to scrapbook try: import scrapbook as sb # Read notebook with scrapbook nb = sb.read_notebook(str(notebook_path)) # Extract each output from schema for name in output_schema.keys(): try: # Try to get scraped data value = nb.scraps[name].data outputs[name] = value except (KeyError, AttributeError): # Output not found logger.warning(f"Output '{name}' not found in notebook") except Exception as e: logger.error(f"Failed to extract outputs from scrapbook: {e}") return outputs def _store_cache_service(self, result: ExecutionResult): """Store result in cache service Args: result: Execution result to cache """ try: from ..database import CacheClient import hashlib import json import numpy as np config = get_config() cache_client = CacheClient(service_url=config.cache_service_url) # Convert inputs to JSON-serializable format (extract magnitudes from Pint Quantities) serializable_inputs = {} for key, value in result.inputs.items(): if hasattr(value, 'magnitude'): # Extract magnitude from Pint Quantity serializable_inputs[key] = value.magnitude elif isinstance(value, np.ndarray): # Convert numpy arrays to lists serializable_inputs[key] = value.tolist() elif isinstance(value, (np.integer, np.floating)): # Convert numpy scalars to Python types serializable_inputs[key] = value.item() else: serializable_inputs[key] = value # Compute input hash for cache entry input_hash = hashlib.sha256( json.dumps(serializable_inputs, sort_keys=True).encode() ).hexdigest() # Store cache entry with all required parameters success = cache_client.set( cache_key=result.cache_key, simulation_id=result.simulation_id, simulation_name=result.simulation_name, simulation_version=result.simulation_version, execution_id=result.execution_id, squid_id=result.squid_id, input_hash=input_hash, run_db_path='', # NotebookExecutor doesn't use per-run databases ttl_seconds=None, # No expiration metadata={ 'executed_at': result.executed_at.isoformat(), 'duration_seconds': result.duration_seconds } ) if success: logger.debug(f"Stored result in cache service: {result.cache_key[:40]}...") else: logger.warning(f"Failed to store result in cache service") except Exception as e: logger.warning(f"Failed to store result in cache service: {e}") def _register_result( self, result: ExecutionResult, simulation: SimulationDefinition, inputs: Dict[str, Any] ): """Register execution result with results service Args: result: Execution result to register simulation: Simulation definition inputs: Input parameters """ try: from ..config import get_config config = get_config() # Check if results service is configured if hasattr(config, 'results_service_url') and config.results_service_url: # Use results service API self._register_result_via_api(result, simulation, inputs) else: # Fall back to local SQLite registration self._register_result_local(result, simulation, inputs) except Exception as e: logger.warning(f"Failed to register result with results service: {e}") def _register_result_via_api( self, result: ExecutionResult, simulation: SimulationDefinition, inputs: Dict[str, Any] ): """Register result via results service API Args: result: Execution result to register simulation: Simulation definition inputs: Input parameters """ import requests import json from ..config import get_config config = get_config() # Serialize outputs and inputs using the shared helper outputs_dict = {} if result.outputs: for key, value in result.outputs.to_dict().items(): outputs_dict[key] = _to_value(value) inputs_dict = {} for key, value in inputs.items(): inputs_dict[key] = _to_value(value) response = requests.post( f"{config.results_service_url.rstrip('/')}/register_direct", json={ 'execution_id': result.execution_id, 'simulation_name': simulation.name, 'simulation_version': simulation.version, 'squid_id': result.squid_id, 'input_params': inputs_dict, 'output_params': outputs_dict, 'status': result.status, 'duration_seconds': result.duration_seconds, 'run_db_path': '' }, headers={'Content-Type': 'application/json'}, timeout=10 ) if response.status_code == 200: logger.debug(f"Registered result {result.squid_id[:40]}... via API") else: logger.warning(f"Failed to register result via API: {response.status_code} {response.text}") def _register_result_local( self, result: ExecutionResult, simulation: SimulationDefinition, inputs: Dict[str, Any] ): """Register result to local SQLite database Args: result: Execution result to register simulation: Simulation definition inputs: Input parameters """ import sqlite3 import json # Get results database path results_db_path = Path.home() / ".sim2l" / "results.db" if not results_db_path.exists(): logger.debug("Results database not found, skipping registration") return # Serialize outputs and inputs using the shared helper outputs_dict = {} if result.outputs: for key, value in result.outputs.to_dict().items(): outputs_dict[key] = _to_value(value) inputs_dict = {} for key, value in inputs.items(): inputs_dict[key] = _to_value(value) # Insert into results database conn = sqlite3.connect(str(results_db_path)) cursor = conn.cursor() cursor.execute(""" INSERT OR REPLACE INTO execution_results ( execution_id, simulation_name, simulation_version, squid_id, input_params, output_params, status, duration_seconds, run_db_path, completed_at ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, datetime('now')) """, ( result.squid_id, simulation.name, simulation.version, result.squid_id, json.dumps(inputs_dict), json.dumps(outputs_dict), result.status, result.duration_seconds, '' # No run database in notebook executor )) conn.commit() conn.close() logger.debug(f"Registered result {result.squid_id[:40]}... to local database") def __repr__(self): return f"NotebookExecutor(cache={self.cache}, copy_files={self.copy_files}, register_results={self.register_results})"