Source code for sim2l.result.result

# @package    sim2l library
# @copyright  Copyright (c) 2005-2026 Purdue University.
# @license    http://opensource.org/licenses/MIT MIT

"""Execution result class"""

import sqlite3
import json
from typing import Dict, Any, Optional
from datetime import datetime
from pathlib import Path
import uuid

from .outputs import OutputData
from ..schema import OutputSchema
from ..config import get_config, get_logger
from ..utils.serialization import serialize_value, deserialize_value

logger = get_logger()


[docs] class ExecutionResult: """Container for simulation execution results"""
[docs] def __init__( self, execution_id: str, simulation_id: int, simulation_name: str, simulation_version: str, inputs: Dict[str, Any], outputs: Optional[OutputData] = None, output_schema: Optional[OutputSchema] = None, output_data: Optional[Dict[str, Any]] = None, status: str = "completed", executed_at: Optional[datetime] = None, duration_seconds: Optional[float] = None, executor_type: str = "local", executor_config: Optional[Dict[str, Any]] = None, error_message: Optional[str] = None, cache_key: Optional[str] = None, squid_id: Optional[str] = None, ): """Initialize execution result Args: execution_id: Unique execution ID (UUID) simulation_id: Database simulation ID simulation_name: Simulation name simulation_version: Simulation version inputs: Input parameters used outputs: OutputData instance (if already created) output_schema: Output schema (if creating OutputData) output_data: Raw output data dictionary status: Execution status executed_at: Execution timestamp duration_seconds: Execution duration executor_type: Type of executor used executor_config: Executor configuration error_message: Error message if failed cache_key: Cache key for lookup squid_id: SQUID ID for this execution """ self.execution_id = execution_id self.simulation_id = simulation_id self.simulation_name = simulation_name self.simulation_version = simulation_version self.inputs = inputs self.status = status self.executed_at = executed_at or datetime.now() self.duration_seconds = duration_seconds self.executor_type = executor_type self.executor_config = executor_config or {} self.error_message = error_message self.cache_key = cache_key self.squid_id = squid_id # Create OutputData if outputs is not None: self.outputs = outputs elif output_schema and output_data: self.outputs = OutputData(output_schema, output_data) else: self.outputs = None self._output_schema = output_schema self._output_data = output_data
[docs] def save(self, db_path: Optional[Path] = None): """Save execution result to database Args: db_path: Database path (uses default if None) """ if db_path is None: db_path = get_config().db_path conn = sqlite3.connect(db_path) cursor = conn.cursor() try: # Insert execution record cursor.execute(""" INSERT INTO executions ( id, simulation_id, simulation_name, simulation_version, executed_at, duration_seconds, executor_type, executor_config, inputs, status, error_message, cache_key ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( self.execution_id, self.simulation_id, self.simulation_name, self.simulation_version, self.executed_at.isoformat(), self.duration_seconds, self.executor_type, serialize_value(self.executor_config), serialize_value(self.inputs), self.status, self.error_message, self.cache_key, )) # Insert outputs if available if self._output_data and self._output_schema: for name, value in self._output_data.items(): if name not in self._output_schema: continue field = self._output_schema[name] # Serialize value serialized = field.serialize() if hasattr(field, '_value') and field._value is not None else value # Check if output already exists (may have been saved by save_outputs() in notebook) cursor.execute(""" SELECT COUNT(*) FROM outputs WHERE execution_id = ? AND name = ? """, (self.execution_id, name)) if cursor.fetchone()[0] > 0: # Output already exists, skip continue # Store as JSON cursor.execute(""" INSERT INTO outputs ( execution_id, name, type, value ) VALUES (?, ?, ?, ?) """, ( self.execution_id, name, field.type_name, json.dumps(serialized), )) # Update cache if cache_key provided if self.cache_key: cursor.execute(""" INSERT OR REPLACE INTO cache ( cache_key, execution_id, created_at, last_accessed, access_count ) VALUES (?, ?, ?, ?, ?) """, ( self.cache_key, self.execution_id, self.executed_at.isoformat(), self.executed_at.isoformat(), 1, )) conn.commit() logger.info(f"Saved execution {self.execution_id}") except Exception as e: conn.rollback() logger.error(f"Failed to save execution: {e}") raise finally: conn.close()
[docs] @classmethod def load(cls, execution_id: str, db_path: Optional[Path] = None) -> 'ExecutionResult': """Load execution result from database Args: execution_id: Execution ID to load db_path: Database path (uses default if None) Returns: ExecutionResult instance """ if db_path is None: db_path = get_config().db_path conn = sqlite3.connect(db_path) conn.row_factory = sqlite3.Row cursor = conn.cursor() try: # Load execution record cursor.execute(""" SELECT * FROM executions WHERE id = ? """, (execution_id,)) row = cursor.fetchone() if row is None: raise ValueError(f"Execution {execution_id} not found") # Load outputs cursor.execute(""" SELECT * FROM outputs WHERE execution_id = ? """, (execution_id,)) output_data = {} for output_row in cursor.fetchall(): name = output_row['name'] value_json = output_row['value'] output_data[name] = json.loads(value_json) if value_json else None # Parse timestamp executed_at = datetime.fromisoformat(row['executed_at']) # Load output schema from simulation output_schema = None try: from ..repository import SimulationRepository repo = SimulationRepository() sim = repo.load(row['simulation_name'], row['simulation_version']) if sim: output_schema = sim.outputs except Exception as e: logger.debug(f"Could not load output schema for {row['simulation_name']}/{row['simulation_version']}: {e}") pass # Schema not available, outputs will be None # Create result result = cls( execution_id=row['id'], simulation_id=row['simulation_id'], simulation_name=row['simulation_name'], simulation_version=row['simulation_version'], inputs=json.loads(row['inputs']), output_data=output_data, output_schema=output_schema, status=row['status'], executed_at=executed_at, duration_seconds=row['duration_seconds'], executor_type=row['executor_type'], executor_config=json.loads(row['executor_config']) if row['executor_config'] else {}, error_message=row['error_message'], cache_key=row['cache_key'], ) return result finally: conn.close()
[docs] @classmethod def create( cls, simulation_id: int, simulation_name: str, simulation_version: str, inputs: Dict[str, Any], output_schema: OutputSchema, executor_type: str = "local", cache_key: Optional[str] = None, squid_id: Optional[str] = None, ) -> 'ExecutionResult': """Create a new execution result Args: simulation_id: Database simulation ID simulation_name: Simulation name simulation_version: Simulation version inputs: Input parameters output_schema: Output schema executor_type: Executor type cache_key: Cache key squid_id: SQUID ID Returns: New ExecutionResult instance """ execution_id = str(uuid.uuid4()) return cls( execution_id=execution_id, simulation_id=simulation_id, simulation_name=simulation_name, simulation_version=simulation_version, inputs=inputs, output_schema=output_schema, output_data={}, executor_type=executor_type, cache_key=cache_key, squid_id=squid_id, )
[docs] def set_outputs(self, output_data: Dict[str, Any]): """Set output data Args: output_data: Dictionary of output values """ self._output_data = output_data if self._output_schema: self.outputs = OutputData(self._output_schema, output_data)
[docs] def set_error(self, error_message: str): """Mark execution as failed Args: error_message: Error description """ self.status = "failed" self.error_message = error_message
def __repr__(self): return f"ExecutionResult(id={self.execution_id[:8]}..., status={self.status})"
[docs] def load_result(execution_id: str, db_path: Optional[Path] = None) -> ExecutionResult: """Load execution result from database (convenience function) Args: execution_id: Execution ID db_path: Database path Returns: ExecutionResult """ return ExecutionResult.load(execution_id, db_path)