Source code for sim2l.services.catalog_service

# @package    sim2l library
# @copyright  Copyright (c) 2005-2026 Purdue University.
# @license    http://opensource.org/licenses/MIT MIT

"""
Standalone catalog service for simulation registry.

Provides REST API for tool discovery, registration, and statistics
with session-based authentication. Supports both SQLite and PostgreSQL.
"""

import os
import sys
import argparse
import logging
import threading
import json
import base64
import binascii
import hashlib
from pathlib import Path
from typing import Optional
from flask import Flask, request, jsonify
from datetime import datetime
import sim2l
from sim2l.executor import LocalExecutor, NotebookExecutor

logging.basicConfig(
    level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)

app = Flask(__name__)

# Database backend (initialized in main)
catalog_db = None
# Authentication configuration (set by --no-auth flag)
require_auth = True


def _adapt_catalog_schema_for_sqlite(schema_sql: str) -> str:
    """Convert a PostgreSQL catalog schema to SQLite-compatible SQL.

    Handles type substitutions and strips PostgreSQL-specific constructs
    (functions, views, triggers) that SQLite doesn't support.
    """
    schema_sql = schema_sql.replace("SERIAL PRIMARY KEY", "INTEGER PRIMARY KEY AUTOINCREMENT")
    schema_sql = schema_sql.replace("BIGSERIAL PRIMARY KEY", "INTEGER PRIMARY KEY AUTOINCREMENT")
    schema_sql = schema_sql.replace("JSONB", "TEXT")
    schema_sql = schema_sql.replace("BIGINT", "INTEGER")
    schema_sql = schema_sql.replace("BOOLEAN", "INTEGER")
    schema_sql = schema_sql.replace("DEFAULT true", "DEFAULT 1")
    schema_sql = schema_sql.replace("DEFAULT false", "DEFAULT 0")
    schema_sql = schema_sql.replace("CREATE TABLE IF NOT EXISTS", "CREATE TABLE")

    # Remove PostgreSQL-specific blocks using $$ delimiter tracking
    lines = schema_sql.split("\n")
    filtered_lines = []
    skip_until_end = False
    paren_depth = 0

    for line in lines:
        if any(x in line for x in [
            "CREATE OR REPLACE FUNCTION",
            "CREATE OR REPLACE VIEW",
            "CREATE TRIGGER",
        ]):
            skip_until_end = True
            paren_depth = 0

        if skip_until_end:
            if "$$" in line:
                if paren_depth == 0:
                    paren_depth = 1
                else:
                    paren_depth = 0
                    skip_until_end = False
            elif line.rstrip().endswith(";") and paren_depth == 0:
                skip_until_end = False
            continue

        stripped = line.strip()
        if stripped and not skip_until_end:
            filtered_lines.append(line)

    schema_sql = "\n".join(filtered_lines)
    # Restore IF NOT EXISTS for table creation
    schema_sql = schema_sql.replace("CREATE TABLE ", "CREATE TABLE IF NOT EXISTS ")
    return schema_sql


def _default_workflow_entrypoint(workflow_type: Optional[str]) -> str:
    workflow_type = (workflow_type or "").lower()
    if workflow_type == "notebook":
        return "workflow.ipynb"
    if workflow_type in ("function", "python"):
        return "workflow.py"
    if workflow_type == "docker":
        return "Dockerfile"
    return "workflow.bin"


def _normalize_workflow_file(file_data: dict, index: int) -> dict:
    if not isinstance(file_data, dict):
        raise ValueError(f"workflow_files[{index}] must be an object")

    path = file_data.get("path")
    if not isinstance(path, str) or not path.strip():
        raise ValueError(f"workflow_files[{index}].path must be a non-empty string")

    content_base64 = file_data.get("content_base64")
    if content_base64 is None:
        content = file_data.get("content")
        if content is None:
            raise ValueError(
                f"workflow_files[{index}] must contain either 'content_base64' or 'content'"
            )
        if not isinstance(content, str):
            raise ValueError(f"workflow_files[{index}].content must be a string")
        raw_content = content.encode("utf-8")
        content_base64 = base64.b64encode(raw_content).decode("ascii")
    elif not isinstance(content_base64, str):
        raise ValueError(f"workflow_files[{index}].content_base64 must be a string")
    else:
        try:
            raw_content = base64.b64decode(content_base64, validate=True)
        except (ValueError, binascii.Error):
            raise ValueError(
                f"workflow_files[{index}].content_base64 is not valid base64"
            )

    normalized = {
        "path": path,
        "encoding": "base64",
        "content_base64": content_base64,
        "size_bytes": len(raw_content),
        "sha256": hashlib.sha256(raw_content).hexdigest(),
    }

    for optional_key in ("content_type", "mode", "description"):
        value = file_data.get(optional_key)
        if value is not None:
            normalized[optional_key] = value

    return normalized


def _normalize_workflow_bundle(payload: dict) -> Optional[dict]:
    workflow_type = payload.get("workflow_type", "notebook")
    workflow_entrypoint = payload.get("workflow_entrypoint")

    bundle = payload.get("workflow_bundle")
    if bundle is not None:
        if isinstance(bundle, str):
            try:
                bundle = json.loads(bundle)
            except Exception:
                raise ValueError("workflow_bundle must be valid JSON")
        if not isinstance(bundle, dict):
            raise ValueError("workflow_bundle must be an object")

        raw_files = bundle.get("files", [])
        if not isinstance(raw_files, list):
            raise ValueError("workflow_bundle.files must be a list")

        normalized_files = []
        seen_paths = set()
        for i, file_data in enumerate(raw_files):
            normalized = _normalize_workflow_file(file_data, i)
            if normalized["path"] in seen_paths:
                raise ValueError(f"workflow_bundle.files has duplicate path '{normalized['path']}'")
            seen_paths.add(normalized["path"])
            normalized_files.append(normalized)

        normalized_bundle = dict(bundle)
        normalized_bundle["format_version"] = bundle.get("format_version", 1)
        normalized_bundle["workflow_type"] = bundle.get("workflow_type", workflow_type)
        normalized_bundle["entrypoint"] = (
            bundle.get("entrypoint")
            or workflow_entrypoint
            or _default_workflow_entrypoint(normalized_bundle["workflow_type"])
        )
        normalized_bundle["files"] = normalized_files
        return normalized_bundle

    workflow_files = payload.get("workflow_files")
    if workflow_files is None:
        workflow_files = []
        workflow_data_base64 = payload.get("workflow_data_base64")
        workflow_data = payload.get("workflow_data")
        dockerfile = payload.get("dockerfile")
        docker_context_files = payload.get("docker_context_files")

        if workflow_data_base64 is not None:
            workflow_files.append(
                {
                    "path": workflow_entrypoint or _default_workflow_entrypoint(workflow_type),
                    "content_base64": workflow_data_base64,
                }
            )
        elif workflow_data is not None:
            workflow_files.append(
                {
                    "path": workflow_entrypoint or _default_workflow_entrypoint(workflow_type),
                    "content": workflow_data,
                }
            )

        if dockerfile is not None:
            workflow_files.append(
                {
                    "path": "Dockerfile",
                    "content": dockerfile,
                    "content_type": "text/plain",
                }
            )

        if docker_context_files is not None:
            if not isinstance(docker_context_files, list):
                raise ValueError("docker_context_files must be a list")
            workflow_files.extend(docker_context_files)

        if not workflow_files:
            return None

    if not isinstance(workflow_files, list):
        raise ValueError("workflow_files must be a list")

    normalized_files = []
    seen_paths = set()
    for i, file_data in enumerate(workflow_files):
        normalized = _normalize_workflow_file(file_data, i)
        if normalized["path"] in seen_paths:
            raise ValueError(f"workflow_files has duplicate path '{normalized['path']}'")
        seen_paths.add(normalized["path"])
        normalized_files.append(normalized)

    return {
        "format_version": 1,
        "workflow_type": workflow_type,
        "entrypoint": workflow_entrypoint or _default_workflow_entrypoint(workflow_type),
        "files": normalized_files,
    }


def _normalize_schema(schema_value, field_name: str) -> Optional[dict]:
    if schema_value is None:
        return None
    if isinstance(schema_value, str):
        try:
            schema_value = json.loads(schema_value)
        except Exception:
            raise ValueError(f"{field_name} must be valid JSON")
    if not isinstance(schema_value, dict):
        raise ValueError(f"{field_name} must be an object")
    return schema_value


def _resolve_input_output_schemas(payload: dict, workflow_bundle: Optional[dict]) -> tuple[dict, dict]:
    input_schema = payload.get("input_schema")
    output_schema = payload.get("output_schema")

    if workflow_bundle:
        if input_schema is None:
            input_schema = workflow_bundle.get("input_schema")
        if output_schema is None:
            output_schema = workflow_bundle.get("output_schema")

        schemas = workflow_bundle.get("schemas")
        if isinstance(schemas, dict):
            if input_schema is None:
                input_schema = schemas.get("input")
            if output_schema is None:
                output_schema = schemas.get("output")

    input_schema = _normalize_schema(input_schema, "input_schema")
    output_schema = _normalize_schema(output_schema, "output_schema")

    if input_schema is None or output_schema is None:
        raise ValueError(
            "input_schema and output_schema are required "
            "(provide directly or include them in workflow_bundle)"
        )

    return input_schema, output_schema


[docs] class CatalogServiceBackend: """Abstract backend for catalog service."""
[docs] def search(self, query, tags, status, limit): raise NotImplementedError
[docs] def get_simulation(self, name, version): raise NotImplementedError
[docs] def register_simulation(self, data, session_id): raise NotImplementedError
[docs] def update_simulation(self, simulation_id, updates, session_id): raise NotImplementedError
[docs] def delete_simulation(self, simulation_id, session_id): raise NotImplementedError
[docs] def record_execution(self, data): raise NotImplementedError
[docs] def get_stats(self, simulation_id): raise NotImplementedError
[docs] def sync_pending_requests(self, installation_id): raise NotImplementedError
[docs] def approve_sync(self, request_id, session_id): raise NotImplementedError
[docs] def get_overview_stats(self): raise NotImplementedError
[docs] def health_check(self): raise NotImplementedError
[docs] class SQLiteCatalogBackend(CatalogServiceBackend): """SQLite backend for catalog service. Uses a per-thread connection pool (threading.local) so that concurrent Flask requests each get their own SQLite connection, avoiding 'OperationalError: database is locked' errors under load. WAL journal mode is enabled for better read concurrency. """
[docs] def __init__(self, db_path: str): self.db_path = db_path self._local = threading.local() self._schema_lock = threading.Lock() self._create_schema()
def _get_conn(self): """Return the per-thread SQLite connection, creating it if needed.""" import sqlite3 conn = getattr(self._local, "conn", None) if conn is None: conn = sqlite3.connect(self.db_path) conn.row_factory = sqlite3.Row conn.execute("PRAGMA journal_mode=WAL") self._local.conn = conn return conn def _create_schema(self): """Create catalog database schema (run once on the initializing thread).""" schema_path = ( Path(__file__).parent.parent / "database" / "master_catalog_schema.sql" ) with open(schema_path, "r") as f: schema_sql = f.read() schema_sql = _adapt_catalog_schema_for_sqlite(schema_sql) with self._schema_lock: conn = self._get_conn() try: conn.executescript(schema_sql) self._ensure_schema_extensions(conn) conn.commit() logger.info("SQLite catalog schema created") except Exception as e: logger.error(f"Failed to create schema: {e}") def _ensure_schema_extensions(self, conn): """Apply non-breaking schema upgrades for existing SQLite databases.""" cursor = conn.cursor() cursor.execute("PRAGMA table_info(simulations)") columns = {row["name"] for row in cursor.fetchall()} if "workflow_bundle" not in columns: cursor.execute("ALTER TABLE simulations ADD COLUMN workflow_bundle TEXT") logger.info("Added 'workflow_bundle' column to SQLite catalog schema") def _check_session(self, session_id: str) -> bool: """Check if session is valid.""" conn = self._get_conn() cursor = conn.cursor() cursor.execute( """ SELECT 1 FROM sessions WHERE session_id = ? AND is_valid = 1 AND expires_at > datetime('now') """, (session_id,), ) return cursor.fetchone() is not None def _check_privilege(self, session_id: str, privilege: str) -> bool: """Check if session has privilege.""" # Allow no-auth-session to have all privileges if session_id == "no-auth-session": return True import json conn = self._get_conn() cursor = conn.cursor() cursor.execute( """ SELECT privileges FROM sessions WHERE session_id = ? AND is_valid = 1 AND expires_at > datetime('now') """, (session_id,), ) row = cursor.fetchone() if not row: return False privileges = json.loads(row["privileges"]) if row["privileges"] else [] return privilege in privileges or "admin" in privileges
[docs] def search(self, query, tags, status, limit): import json conn = self._get_conn() cursor = conn.cursor() conditions = [] params = [] if query: conditions.append("name LIKE ?") params.append(f"%{query}%") if status and status != "all": conditions.append("status = ?") params.append(status) where_clause = " AND ".join(conditions) if conditions else "1=1" cursor.execute( f""" SELECT id, name, version, description, author, tags, status, created_at, updated_at FROM simulations WHERE {where_clause} ORDER BY created_at DESC LIMIT ? """, params + [limit], ) results = [] for row in cursor.fetchall(): sim = dict(row) sim["tags"] = json.loads(sim["tags"]) if sim["tags"] else [] # Filter by tags if specified if tags: sim_tags = set(sim["tags"]) if not sim_tags.intersection(set(tags)): continue results.append(sim) return results, 200
[docs] def get_simulation(self, name, version): import json conn = self._get_conn() cursor = conn.cursor() if version: cursor.execute( """ SELECT * FROM simulations WHERE name = ? AND version = ? """, (name, version), ) else: # Get latest version cursor.execute( """ SELECT * FROM simulations WHERE name = ? ORDER BY created_at DESC LIMIT 1 """, (name,), ) row = cursor.fetchone() if not row: return None, 404 sim = dict(row) for field in [ "tags", "input_schema", "output_schema", "dependencies", "metadata", "workflow_bundle", ]: if sim.get(field): sim[field] = json.loads(sim[field]) return sim, 200
[docs] def register_simulation(self, data, session_id): import json if not self._check_privilege(session_id, "catalog_update"): return {"error": "Insufficient privileges"}, 403 try: workflow_bundle = _normalize_workflow_bundle(data) input_schema, output_schema = _resolve_input_output_schemas( data, workflow_bundle ) except ValueError as exc: return {"error": str(exc)}, 400 conn = self._get_conn() cursor = conn.cursor() if session_id == "no-auth-session": user_id = None else: cursor.execute( "SELECT user_id FROM sessions WHERE session_id = ?", (session_id,) ) user_row = cursor.fetchone() user_id = user_row["user_id"] if user_row else None cursor.execute( "SELECT id FROM simulations WHERE name = ? AND version = ?", (data["name"], data["version"]), ) if cursor.fetchone(): return {"error": "Simulation already registered"}, 409 cursor.execute( """ INSERT INTO simulations ( name, version, description, author, author_email, organization, license, repository_url, documentation_url, tags, input_schema, output_schema, workflow_type, workflow_hash, workflow_bundle, dependencies, python_version, status, visibility, created_by, updated_by, metadata ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( data["name"], data["version"], data.get("description"), data.get("author"), data.get("author_email"), data.get("organization"), data.get("license"), data.get("repository_url"), data.get("documentation_url"), json.dumps(data.get("tags", [])), json.dumps(input_schema), json.dumps(output_schema), data.get("workflow_type", "notebook"), data.get("workflow_hash"), json.dumps(workflow_bundle) if workflow_bundle is not None else None, json.dumps(data.get("dependencies", [])), data.get("python_version"), "active", data.get("visibility", "public"), user_id, user_id, json.dumps(data.get("metadata")), ), ) simulation_id = cursor.lastrowid conn.commit() logger.info(f"Registered simulation {data['name']}/{data['version']} (ID: {simulation_id})") return {"id": simulation_id, "status": "registered"}, 201
[docs] def update_simulation(self, simulation_id, updates, session_id): if not self._check_privilege(session_id, "catalog_update"): return {"error": "Insufficient privileges"}, 403 import json conn = self._get_conn() cursor = conn.cursor() normalized_updates = dict(updates) if any( key in updates for key in ( "workflow_bundle", "workflow_files", "workflow_entrypoint", "workflow_data", "workflow_data_base64", "dockerfile", "docker_context_files", ) ): payload = {"workflow_type": updates.get("workflow_type", "notebook"), **updates} try: normalized_updates["workflow_bundle"] = _normalize_workflow_bundle(payload) except ValueError as exc: return {"error": str(exc)}, 400 set_clauses = [] params = [] for key, value in normalized_updates.items(): if key in [ "tags", "dependencies", "metadata", "input_schema", "output_schema", "workflow_bundle", ]: set_clauses.append(f"{key} = ?") params.append(json.dumps(value)) elif key in [ "description", "status", "visibility", "license", "workflow_type", "workflow_hash", ]: set_clauses.append(f"{key} = ?") params.append(value) if not set_clauses: return {"error": "No valid fields to update"}, 400 set_clauses.append("updated_at = datetime('now')") params.append(simulation_id) cursor.execute( f"UPDATE simulations SET {', '.join(set_clauses)} WHERE id = ?", params, ) conn.commit() return {"status": "updated"}, 200
[docs] def delete_simulation(self, simulation_id, session_id): if not self._check_privilege(session_id, "catalog_update"): return {"error": "Insufficient privileges"}, 403 conn = self._get_conn() cursor = conn.cursor() cursor.execute( "SELECT id, name, version FROM simulations WHERE id = ?", (simulation_id,), ) simulation = cursor.fetchone() if not simulation: return {"error": "Simulation not found"}, 404 # execution_registry does not cascade on simulation delete in this schema. cleanup_statements = [ ("DELETE FROM execution_registry WHERE simulation_id = ?", (simulation_id,)), ("DELETE FROM execution_stats WHERE simulation_id = ?", (simulation_id,)), ("DELETE FROM simulation_tags WHERE simulation_id = ?", (simulation_id,)), ("DELETE FROM access_control WHERE simulation_id = ?", (simulation_id,)), ( "DELETE FROM simulation_dependencies WHERE simulation_id = ? OR depends_on_simulation_id = ?", (simulation_id, simulation_id), ), ] for statement, params in cleanup_statements: try: cursor.execute(statement, params) except Exception as exc: # Backward compatibility for older SQLite DBs that may miss optional tables. if "no such table" in str(exc).lower(): logger.debug(f"Skipping cleanup statement due to missing table: {statement}") else: raise cursor.execute("DELETE FROM simulations WHERE id = ?", (simulation_id,)) deleted_count = cursor.rowcount conn.commit() if deleted_count == 0: return {"error": "Simulation not found"}, 404 logger.info( f"Deleted simulation {simulation['name']}/{simulation['version']} (ID: {simulation_id})" ) return { "status": "deleted", "simulation_id": simulation_id, "name": simulation["name"], "version": simulation["version"], }, 200
[docs] def record_execution(self, data): import json conn = self._get_conn() cursor = conn.cursor() cursor.execute( """ INSERT INTO execution_registry ( execution_id, squid_id, simulation_id, user_id, started_at, completed_at, duration_seconds, status, executor_type, cache_hit, run_db_path, run_db_size_bytes, input_hash, output_count, artifact_count, error_count, warning_count, environment ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( data["execution_id"], data["squid_id"], data["simulation_id"], data.get("user_id"), data["started_at"], data.get("completed_at"), data.get("duration_seconds"), data["status"], data["executor_type"], data.get("cache_hit", False), data.get("run_db_path"), data.get("run_db_size_bytes"), data.get("input_hash"), data.get("output_count", 0), data.get("artifact_count", 0), data.get("error_count", 0), data.get("warning_count", 0), json.dumps(data.get("environment")), ), ) conn.commit() return {"status": "recorded"}, 201
[docs] def get_stats(self, simulation_id): conn = self._get_conn() cursor = conn.cursor() cursor.execute( """ SELECT COUNT(*) as total_executions, SUM(CASE WHEN status = 'completed' THEN 1 ELSE 0 END) as successful, SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END) as failed, SUM(CASE WHEN cache_hit = 1 THEN 1 ELSE 0 END) as cached, AVG(duration_seconds) as avg_duration, MIN(duration_seconds) as min_duration, MAX(duration_seconds) as max_duration FROM execution_registry WHERE simulation_id = ? """, (simulation_id,), ) row = cursor.fetchone() if row: return dict(row), 200 return {}, 404
[docs] def sync_pending_requests(self, installation_id): conn = self._get_conn() cursor = conn.cursor() if installation_id: cursor.execute( """ SELECT * FROM sync_queue WHERE installation_id = ? AND status = 'pending' ORDER BY created_at """, (installation_id,), ) else: cursor.execute( """ SELECT * FROM sync_queue WHERE status = 'pending' ORDER BY created_at """ ) results = [dict(row) for row in cursor.fetchall()] return results, 200
[docs] def approve_sync(self, request_id, session_id): import json if not self._check_privilege(session_id, "admin"): return {"error": "Admin privilege required"}, 403 conn = self._get_conn() cursor = conn.cursor() cursor.execute("SELECT * FROM sync_queue WHERE id = ?", (request_id,)) # Renamed from `request` to avoid shadowing the Flask `request` global sync_request = cursor.fetchone() if not sync_request: return {"error": "Request not found"}, 404 payload = json.loads(sync_request["payload"]) result, status = self.register_simulation(payload, session_id) if status == 201: cursor.execute( """ UPDATE sync_queue SET status = 'approved', processed_at = datetime('now') WHERE id = ? """, (request_id,), ) conn.commit() return {"status": "approved", "simulation_id": result["id"]}, 200 else: cursor.execute( """ UPDATE sync_queue SET status = 'failed', processed_at = datetime('now'), rejection_reason = ? WHERE id = ? """, (result.get("error", "Unknown error"), request_id), ) conn.commit() return result, status
[docs] def get_overview_stats(self): """Get overview statistics for dashboard.""" try: conn = self._get_conn() cursor = conn.cursor() cursor.execute("SELECT COUNT(*) FROM simulations") total_simulations = cursor.fetchone()[0] cursor.execute("SELECT COUNT(*) FROM simulations WHERE status = 'active'") active_simulations = cursor.fetchone()[0] return { "total_simulations": total_simulations, "active_simulations": active_simulations, "total_executions": 0, "successful_executions": 0, "cached_executions": 0, }, 200 except Exception as e: logger.error(f"Error getting overview stats: {e}", exc_info=True) return { "total_simulations": 0, "active_simulations": 0, "total_executions": 0, "successful_executions": 0, "cached_executions": 0, }, 200
[docs] def health_check(self): try: conn = self._get_conn() cursor = conn.cursor() cursor.execute("SELECT COUNT(*) FROM simulations") count = cursor.fetchone()[0] return { "status": "healthy", "backend": "sqlite", "simulations": count, }, 200 except Exception as e: logger.error(f"Health check failed: {e}", exc_info=True) return {"status": "unhealthy", "error": "Internal error"}, 500
[docs] class PostgreSQLCatalogBackend(CatalogServiceBackend): """PostgreSQL backend for catalog service. Uses a per-thread connection pool (threading.local) for thread safety under concurrent Flask requests. """
[docs] def __init__(self, connection_string: str): self.connection_string = connection_string self._local = threading.local() self._create_schema()
def _get_conn(self): """Return the per-thread PostgreSQL connection, creating it if needed.""" import psycopg2 import psycopg2.extras conn = getattr(self._local, "conn", None) if conn is None or conn.closed: conn = psycopg2.connect( self.connection_string, cursor_factory=psycopg2.extras.RealDictCursor, ) self._local.conn = conn return conn def _create_schema(self): """Create catalog schema if needed. The PostgreSQL schema file contains some non-idempotent statements (e.g., trigger creation). To avoid duplicate-object failures on service restart, only apply the schema on first initialization. """ conn = self._get_conn() cursor = conn.cursor() cursor.execute("SELECT to_regclass('public.simulations') AS simulations_table") row = cursor.fetchone() or {} if row.get("simulations_table"): logger.debug("PostgreSQL catalog schema already initialized") else: schema_path = Path(__file__).parent.parent / "database" / "master_catalog_schema.sql" with open(schema_path, "r") as f: schema_sql = f.read() cursor.execute(schema_sql) conn.commit() logger.info("PostgreSQL catalog schema created") self._ensure_schema_extensions() def _ensure_schema_extensions(self): """Apply non-breaking schema upgrades for existing PostgreSQL databases.""" conn = self._get_conn() cursor = conn.cursor() cursor.execute( """ ALTER TABLE simulations ADD COLUMN IF NOT EXISTS workflow_bundle JSONB """ ) conn.commit() def _check_session(self, session_id: str) -> bool: conn = self._get_conn() cursor = conn.cursor() cursor.execute( """ SELECT 1 FROM sessions WHERE session_id = %s AND is_valid = true AND expires_at > CURRENT_TIMESTAMP """, (session_id,), ) return cursor.fetchone() is not None def _check_privilege(self, session_id: str, privilege: str) -> bool: # Allow no-auth-session to have all privileges if session_id == "no-auth-session": return True conn = self._get_conn() cursor = conn.cursor() cursor.execute( """ SELECT privileges FROM sessions WHERE session_id = %s AND is_valid = true AND expires_at > CURRENT_TIMESTAMP """, (session_id,), ) row = cursor.fetchone() if not row: return False privileges = row.get("privileges") or [] if isinstance(privileges, str): try: privileges = json.loads(privileges) except Exception: privileges = [] return privilege in privileges or "admin" in privileges def _normalize_json(self, value): if value is None: return None if isinstance(value, str): try: return json.loads(value) except Exception: return value return value
[docs] def search(self, query, tags, status, limit): conn = self._get_conn() cursor = conn.cursor() conditions = [] params = [] if query: conditions.append("name ILIKE %s") params.append(f"%{query}%") if status and status != "all": conditions.append("status = %s") params.append(status) where_clause = " AND ".join(conditions) if conditions else "1=1" cursor.execute( f""" SELECT id, name, version, description, author, tags, status, created_at, updated_at FROM simulations WHERE {where_clause} ORDER BY created_at DESC LIMIT %s """, params + [limit], ) results = [] for row in cursor.fetchall(): sim = dict(row) sim["tags"] = self._normalize_json(sim.get("tags")) or [] # Filter by tags if specified if tags: sim_tags = set(sim["tags"]) if isinstance(sim["tags"], list) else set() if not sim_tags.intersection(set(tags)): continue if sim.get("created_at"): sim["created_at"] = str(sim["created_at"]) if sim.get("updated_at"): sim["updated_at"] = str(sim["updated_at"]) results.append(sim) return results, 200
[docs] def get_simulation(self, name, version): conn = self._get_conn() cursor = conn.cursor() if version: cursor.execute( """ SELECT * FROM simulations WHERE name = %s AND version = %s """, (name, version), ) else: cursor.execute( """ SELECT * FROM simulations WHERE name = %s ORDER BY created_at DESC LIMIT 1 """, (name,), ) row = cursor.fetchone() if not row: return None, 404 sim = dict(row) for field in [ "tags", "input_schema", "output_schema", "dependencies", "metadata", "workflow_bundle", ]: sim[field] = self._normalize_json(sim.get(field)) if sim.get("created_at"): sim["created_at"] = str(sim["created_at"]) if sim.get("updated_at"): sim["updated_at"] = str(sim["updated_at"]) return sim, 200
[docs] def register_simulation(self, data, session_id): if not self._check_privilege(session_id, "catalog_update"): return {"error": "Insufficient privileges"}, 403 try: workflow_bundle = _normalize_workflow_bundle(data) input_schema, output_schema = _resolve_input_output_schemas( data, workflow_bundle ) except ValueError as exc: return {"error": str(exc)}, 400 conn = self._get_conn() cursor = conn.cursor() if session_id == "no-auth-session": user_id = None else: cursor.execute( "SELECT user_id FROM sessions WHERE session_id = %s", (session_id,) ) user_row = cursor.fetchone() user_id = user_row["user_id"] if user_row else None cursor.execute( "SELECT id FROM simulations WHERE name = %s AND version = %s", (data["name"], data["version"]), ) if cursor.fetchone(): return {"error": "Simulation already registered"}, 409 cursor.execute( """ INSERT INTO simulations ( name, version, description, author, author_email, organization, license, repository_url, documentation_url, tags, input_schema, output_schema, workflow_type, workflow_hash, workflow_bundle, dependencies, python_version, status, visibility, created_by, updated_by, metadata ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s::jsonb, %s::jsonb, %s::jsonb, %s, %s, %s::jsonb, %s::jsonb, %s, %s, %s, %s, %s, %s::jsonb) RETURNING id """, ( data["name"], data["version"], data.get("description"), data.get("author"), data.get("author_email"), data.get("organization"), data.get("license"), data.get("repository_url"), data.get("documentation_url"), json.dumps(data.get("tags", [])), json.dumps(input_schema), json.dumps(output_schema), data.get("workflow_type", "notebook"), data.get("workflow_hash"), json.dumps(workflow_bundle) if workflow_bundle is not None else None, json.dumps(data.get("dependencies", [])), data.get("python_version"), "active", data.get("visibility", "public"), user_id, user_id, json.dumps(data.get("metadata")), ), ) simulation_id = cursor.fetchone()["id"] conn.commit() logger.info(f"Registered simulation {data['name']}/{data['version']} (ID: {simulation_id})") return {"id": simulation_id, "status": "registered"}, 201
[docs] def update_simulation(self, simulation_id, updates, session_id): if not self._check_privilege(session_id, "catalog_update"): return {"error": "Insufficient privileges"}, 403 normalized_updates = dict(updates) if any( key in updates for key in ( "workflow_bundle", "workflow_files", "workflow_entrypoint", "workflow_data", "workflow_data_base64", "dockerfile", "docker_context_files", ) ): payload = {"workflow_type": updates.get("workflow_type", "notebook"), **updates} try: normalized_updates["workflow_bundle"] = _normalize_workflow_bundle(payload) except ValueError as exc: return {"error": str(exc)}, 400 set_clauses = [] params = [] for key, value in normalized_updates.items(): if key in [ "tags", "dependencies", "metadata", "input_schema", "output_schema", "workflow_bundle", ]: set_clauses.append(f"{key} = %s::jsonb") params.append(json.dumps(value)) elif key in [ "description", "status", "visibility", "license", "workflow_type", "workflow_hash", ]: set_clauses.append(f"{key} = %s") params.append(value) if not set_clauses: return {"error": "No valid fields to update"}, 400 set_clauses.append("updated_at = CURRENT_TIMESTAMP") params.append(simulation_id) conn = self._get_conn() cursor = conn.cursor() cursor.execute( f"UPDATE simulations SET {', '.join(set_clauses)} WHERE id = %s", params, ) conn.commit() return {"status": "updated"}, 200
[docs] def delete_simulation(self, simulation_id, session_id): if not self._check_privilege(session_id, "catalog_update"): return {"error": "Insufficient privileges"}, 403 conn = self._get_conn() cursor = conn.cursor() cursor.execute( "SELECT id, name, version FROM simulations WHERE id = %s", (simulation_id,), ) simulation = cursor.fetchone() if not simulation: return {"error": "Simulation not found"}, 404 cursor.execute("DELETE FROM execution_registry WHERE simulation_id = %s", (simulation_id,)) cursor.execute("DELETE FROM execution_stats WHERE simulation_id = %s", (simulation_id,)) cursor.execute("DELETE FROM simulation_tags WHERE simulation_id = %s", (simulation_id,)) cursor.execute("DELETE FROM access_control WHERE simulation_id = %s", (simulation_id,)) cursor.execute( "DELETE FROM simulation_dependencies WHERE simulation_id = %s OR depends_on_simulation_id = %s", (simulation_id, simulation_id), ) cursor.execute("DELETE FROM simulations WHERE id = %s", (simulation_id,)) deleted_count = cursor.rowcount conn.commit() if deleted_count == 0: return {"error": "Simulation not found"}, 404 logger.info( f"Deleted simulation {simulation['name']}/{simulation['version']} (ID: {simulation_id})" ) return { "status": "deleted", "simulation_id": simulation_id, "name": simulation["name"], "version": simulation["version"], }, 200
[docs] def record_execution(self, data): conn = self._get_conn() cursor = conn.cursor() cursor.execute( """ INSERT INTO execution_registry ( execution_id, squid_id, simulation_id, user_id, started_at, completed_at, duration_seconds, status, executor_type, cache_hit, run_db_path, run_db_size_bytes, input_hash, output_count, artifact_count, error_count, warning_count, environment ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s::jsonb) """, ( data["execution_id"], data["squid_id"], data["simulation_id"], data.get("user_id"), data["started_at"], data.get("completed_at"), data.get("duration_seconds"), data["status"], data["executor_type"], data.get("cache_hit", False), data.get("run_db_path"), data.get("run_db_size_bytes"), data.get("input_hash"), data.get("output_count", 0), data.get("artifact_count", 0), data.get("error_count", 0), data.get("warning_count", 0), json.dumps(data.get("environment")), ), ) conn.commit() return {"status": "recorded"}, 201
[docs] def get_stats(self, simulation_id): conn = self._get_conn() cursor = conn.cursor() cursor.execute( """ SELECT COUNT(*) as total_executions, SUM(CASE WHEN status = 'completed' THEN 1 ELSE 0 END) as successful, SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END) as failed, SUM(CASE WHEN cache_hit = true THEN 1 ELSE 0 END) as cached, AVG(duration_seconds) as avg_duration, MIN(duration_seconds) as min_duration, MAX(duration_seconds) as max_duration FROM execution_registry WHERE simulation_id = %s """, (simulation_id,), ) row = cursor.fetchone() if row: return dict(row), 200 return {}, 404
[docs] def sync_pending_requests(self, installation_id): conn = self._get_conn() cursor = conn.cursor() if installation_id: cursor.execute( """ SELECT * FROM sync_queue WHERE installation_id = %s AND status = 'pending' ORDER BY created_at """, (installation_id,), ) else: cursor.execute( """ SELECT * FROM sync_queue WHERE status = 'pending' ORDER BY created_at """ ) results = [dict(row) for row in cursor.fetchall()] return results, 200
[docs] def approve_sync(self, request_id, session_id): if not self._check_privilege(session_id, "admin"): return {"error": "Admin privilege required"}, 403 conn = self._get_conn() cursor = conn.cursor() cursor.execute("SELECT * FROM sync_queue WHERE id = %s", (request_id,)) sync_request = cursor.fetchone() if not sync_request: return {"error": "Request not found"}, 404 payload = self._normalize_json(sync_request.get("payload")) or {} result, status = self.register_simulation(payload, session_id) if status == 201: cursor.execute( """ UPDATE sync_queue SET status = 'approved', processed_at = CURRENT_TIMESTAMP WHERE id = %s """, (request_id,), ) conn.commit() return {"status": "approved", "simulation_id": result["id"]}, 200 cursor.execute( """ UPDATE sync_queue SET status = 'failed', processed_at = CURRENT_TIMESTAMP, rejection_reason = %s WHERE id = %s """, (result.get("error", "Unknown error"), request_id), ) conn.commit() return result, status
[docs] def get_overview_stats(self): try: conn = self._get_conn() cursor = conn.cursor() cursor.execute("SELECT COUNT(*) AS count FROM simulations") total_simulations = (cursor.fetchone() or {}).get("count", 0) cursor.execute("SELECT COUNT(*) AS count FROM simulations WHERE status = 'active'") active_simulations = (cursor.fetchone() or {}).get("count", 0) cursor.execute( """ SELECT COUNT(*) as total_executions, SUM(CASE WHEN status = 'completed' THEN 1 ELSE 0 END) as successful_executions, SUM(CASE WHEN cache_hit = true THEN 1 ELSE 0 END) as cached_executions FROM execution_registry """ ) exec_stats = cursor.fetchone() or {} return { "total_simulations": total_simulations or 0, "active_simulations": active_simulations or 0, "total_executions": exec_stats.get("total_executions", 0) or 0, "successful_executions": exec_stats.get("successful_executions", 0) or 0, "cached_executions": exec_stats.get("cached_executions", 0) or 0, }, 200 except Exception as e: logger.error(f"Error getting overview stats: {e}", exc_info=True) return { "total_simulations": 0, "active_simulations": 0, "total_executions": 0, "successful_executions": 0, "cached_executions": 0, }, 200
[docs] def health_check(self): try: conn = self._get_conn() cursor = conn.cursor() cursor.execute("SELECT COUNT(*) AS count FROM simulations") row = cursor.fetchone() or {} count = row.get("count", 0) or 0 return { "status": "healthy", "backend": "postgresql", "simulations": count, }, 200 except Exception as e: logger.error(f"Health check failed: {e}", exc_info=True) return {"status": "unhealthy", "error": "Internal error"}, 500
# REST API Endpoints
[docs] @app.route("/health", methods=["GET"]) def health(): data, status = catalog_db.health_check() return jsonify(data), status
[docs] @app.route("/simulations/search", methods=["GET"]) def search_simulations(): query = request.args.get("query") tags = request.args.get("tags", "").split(",") if request.args.get("tags") else None status = request.args.get("status", "active") limit = request.args.get("limit", 100, type=int) results, status_code = catalog_db.search(query, tags, status, limit) return jsonify(results), status_code
[docs] @app.route("/simulations/<name>", methods=["GET"]) def get_simulation(name): version = request.args.get("version") result, status = catalog_db.get_simulation(name, version) if result: return jsonify(result), status return jsonify({"error": "Not found"}), status
[docs] @app.route("/simulations", methods=["POST"]) def register_simulation(): session_id = request.headers.get("X-Session-ID") if require_auth: if not session_id: return jsonify({"error": "Missing session ID"}), 401 else: # In no-auth mode, always use elevated local session regardless of header value. session_id = "no-auth-session" data = request.json result, status = catalog_db.register_simulation(data, session_id) return jsonify(result), status
[docs] @app.route("/simulations/<int:simulation_id>", methods=["PATCH"]) def update_simulation(simulation_id): session_id = request.headers.get("X-Session-ID") if require_auth: if not session_id: return jsonify({"error": "Missing session ID"}), 401 else: # In no-auth mode, always use elevated local session regardless of header value. session_id = "no-auth-session" updates = request.json result, status = catalog_db.update_simulation(simulation_id, updates, session_id) return jsonify(result), status
[docs] @app.route("/simulations/<int:simulation_id>", methods=["DELETE"]) def delete_simulation(simulation_id): session_id = request.headers.get("X-Session-ID") if require_auth: if not session_id: return jsonify({"error": "Missing session ID"}), 401 else: # In no-auth mode, always use elevated local session regardless of header value. session_id = "no-auth-session" result, status = catalog_db.delete_simulation(simulation_id, session_id) return jsonify(result), status
[docs] @app.route("/executions", methods=["POST"]) def record_execution(): data = request.json result, status = catalog_db.record_execution(data) return jsonify(result), status
[docs] @app.route("/run", methods=["POST"]) def run_simulation(): """Run a simulation and return the execution results.""" # Read header first, then check — avoids auth bypass via default value session_id = request.headers.get("X-Session-ID") if require_auth and not session_id: return jsonify({"error": "Missing session ID"}), 401 data = request.json if not data: return jsonify({'error': 'No JSON payload provided'}), 400 simulation_name = data.get('simulation_name') version = data.get('version') params = data.get('params', {}) if not simulation_name: return jsonify({'error': 'simulation_name is required'}), 400 try: # Load the simulation sim = sim2l.load_simulation(simulation_name, version=version) # Use appropriate executor based on workflow type if getattr(sim, 'workflow_type', 'python') == 'notebook': executor = NotebookExecutor(cache=True) else: executor = LocalExecutor(cache=True) # Run simulation import time start_time = time.time() result = sim.run(**params, executor=executor) elapsed_time = time.time() - start_time # Format the output to return outputs = result.outputs.dict() if hasattr(result.outputs, 'dict') else result.outputs if hasattr(outputs, '__dict__'): outputs = vars(outputs) def make_serializable(obj): import numpy as np if isinstance(obj, dict): return {k: make_serializable(v) for k, v in obj.items()} elif isinstance(obj, list): return [make_serializable(v) for v in obj] elif isinstance(obj, np.ndarray): return obj.tolist() elif isinstance(obj, (np.floating, np.float32, np.float64)): return float(obj) elif isinstance(obj, (np.integer, np.int32, np.int64)): return int(obj) return obj outputs = make_serializable(outputs) return jsonify({ 'success': True, 'execution_id': result.execution_id, 'squid_id': result.squid_id, 'status': result.status, 'duration_seconds': result.duration_seconds or elapsed_time, 'outputs': outputs }) except ValueError as e: logger.error(f"Validation error running simulation {simulation_name}: {e}") return jsonify({'error': str(e)}), 400 except Exception as e: logger.error(f"Error executing simulation {simulation_name}: {e}", exc_info=True) return jsonify({'error': f"Execution failed: {str(e)}"}), 500
[docs] @app.route("/simulations/<int:simulation_id>/stats", methods=["GET"]) def get_stats(simulation_id): result, status = catalog_db.get_stats(simulation_id) return jsonify(result), status
[docs] @app.route("/sync/pending", methods=["GET"]) def get_pending_sync(): installation_id = request.args.get("installation_id") results, status = catalog_db.sync_pending_requests(installation_id) return jsonify(results), status
[docs] @app.route("/sync/<int:request_id>/approve", methods=["POST"]) def approve_sync(request_id): session_id = request.headers.get("X-Session-ID") if require_auth: if not session_id: return jsonify({"error": "Missing session ID"}), 401 else: # In no-auth mode, always use elevated local session regardless of header value. session_id = "no-auth-session" result, status = catalog_db.approve_sync(request_id, session_id) return jsonify(result), status
[docs] @app.route("/statistics/overview", methods=["GET"]) def get_overview_stats(): """Get overview statistics for the dashboard.""" result, status = catalog_db.get_overview_stats() return jsonify(result), status
[docs] def main(): parser = argparse.ArgumentParser(description="Sim2l Catalog Service") parser.add_argument("--host", default="0.0.0.0", help="Host to bind to") parser.add_argument("--port", type=int, default=8002, help="Port to listen on") parser.add_argument( "--backend", choices=["sqlite", "postgresql"], default="sqlite", help="Database backend", ) parser.add_argument( "--db-path", default=str(Path.home() / ".sim2l" / "catalog.db"), help="SQLite database path", ) parser.add_argument( "--db-url", help="PostgreSQL connection string (for postgresql backend)" ) parser.add_argument( "--debug", action="store_true", help="Enable DEBUG logging" ) parser.add_argument( "--no-auth", action="store_true", help="Disable authentication (for testing/development)" ) args = parser.parse_args() if args.debug: logging.basicConfig( level=logging.DEBUG, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", force=True, ) logger.setLevel(logging.DEBUG) logger.debug("DEBUG logging enabled") global catalog_db, require_auth if args.no_auth: require_auth = False logger.warning("Authentication DISABLED - for development/testing only!") if args.backend == "sqlite": catalog_db = SQLiteCatalogBackend(args.db_path) logger.info(f"Using SQLite backend: {args.db_path}") elif args.backend == "postgresql": if not args.db_url: logger.error("PostgreSQL backend requires --db-url") sys.exit(1) catalog_db = PostgreSQLCatalogBackend(args.db_url) logger.info("Using PostgreSQL backend") logger.info(f"Starting catalog service on {args.host}:{args.port}") app.run(host=args.host, port=args.port, debug=False)
if __name__ == "__main__": main()