|
import json
|
|
import sqlite3
|
|
import time
|
|
from typing import List, Optional
|
|
from .workflow_definition import Workflow
|
|
|
|
|
|
class WorkflowStorage:
|
|
|
|
def __init__(self, db_path: str):
|
|
self.db_path = db_path
|
|
self._initialize_storage()
|
|
|
|
def _initialize_storage(self):
|
|
conn = sqlite3.connect(self.db_path, check_same_thread=False)
|
|
cursor = conn.cursor()
|
|
cursor.execute(
|
|
"\n CREATE TABLE IF NOT EXISTS workflows (\n workflow_id TEXT PRIMARY KEY,\n name TEXT NOT NULL,\n description TEXT,\n workflow_data TEXT NOT NULL,\n created_at INTEGER NOT NULL,\n updated_at INTEGER NOT NULL,\n execution_count INTEGER DEFAULT 0,\n last_execution_at INTEGER,\n tags TEXT\n )\n "
|
|
)
|
|
cursor.execute(
|
|
"\n CREATE TABLE IF NOT EXISTS workflow_executions (\n execution_id TEXT PRIMARY KEY,\n workflow_id TEXT NOT NULL,\n started_at INTEGER NOT NULL,\n completed_at INTEGER,\n status TEXT NOT NULL,\n execution_log TEXT,\n variables TEXT,\n step_results TEXT,\n FOREIGN KEY (workflow_id) REFERENCES workflows(workflow_id)\n )\n "
|
|
)
|
|
cursor.execute(
|
|
"\n CREATE INDEX IF NOT EXISTS idx_workflow_name ON workflows(name)\n "
|
|
)
|
|
cursor.execute(
|
|
"\n CREATE INDEX IF NOT EXISTS idx_execution_workflow ON workflow_executions(workflow_id)\n "
|
|
)
|
|
cursor.execute(
|
|
"\n CREATE INDEX IF NOT EXISTS idx_execution_started ON workflow_executions(started_at)\n "
|
|
)
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
def save_workflow(self, workflow: Workflow) -> str:
|
|
import hashlib
|
|
|
|
workflow_data = json.dumps(workflow.to_dict())
|
|
workflow_id = hashlib.sha256(workflow.name.encode()).hexdigest()[:16]
|
|
conn = sqlite3.connect(self.db_path, check_same_thread=False)
|
|
cursor = conn.cursor()
|
|
current_time = int(time.time())
|
|
tags_json = json.dumps(workflow.tags)
|
|
cursor.execute(
|
|
"\n INSERT OR REPLACE INTO workflows\n (workflow_id, name, description, workflow_data, created_at, updated_at, tags)\n VALUES (?, ?, ?, ?, ?, ?, ?)\n ",
|
|
(
|
|
workflow_id,
|
|
workflow.name,
|
|
workflow.description,
|
|
workflow_data,
|
|
current_time,
|
|
current_time,
|
|
tags_json,
|
|
),
|
|
)
|
|
conn.commit()
|
|
conn.close()
|
|
return workflow_id
|
|
|
|
def load_workflow(self, workflow_id: str) -> Optional[Workflow]:
|
|
conn = sqlite3.connect(self.db_path, check_same_thread=False)
|
|
cursor = conn.cursor()
|
|
cursor.execute("SELECT workflow_data FROM workflows WHERE workflow_id = ?", (workflow_id,))
|
|
row = cursor.fetchone()
|
|
conn.close()
|
|
if row:
|
|
workflow_dict = json.loads(row[0])
|
|
return Workflow.from_dict(workflow_dict)
|
|
return None
|
|
|
|
def load_workflow_by_name(self, name: str) -> Optional[Workflow]:
|
|
conn = sqlite3.connect(self.db_path, check_same_thread=False)
|
|
cursor = conn.cursor()
|
|
cursor.execute("SELECT workflow_data FROM workflows WHERE name = ?", (name,))
|
|
row = cursor.fetchone()
|
|
conn.close()
|
|
if row:
|
|
workflow_dict = json.loads(row[0])
|
|
return Workflow.from_dict(workflow_dict)
|
|
return None
|
|
|
|
def list_workflows(self, tag: Optional[str] = None) -> List[dict]:
|
|
conn = sqlite3.connect(self.db_path, check_same_thread=False)
|
|
cursor = conn.cursor()
|
|
if tag:
|
|
cursor.execute(
|
|
"\n SELECT workflow_id, name, description, execution_count, last_execution_at, tags\n FROM workflows\n WHERE tags LIKE ?\n ORDER BY name\n ",
|
|
(f'%"{tag}"%',),
|
|
)
|
|
else:
|
|
cursor.execute(
|
|
"\n SELECT workflow_id, name, description, execution_count, last_execution_at, tags\n FROM workflows\n ORDER BY name\n "
|
|
)
|
|
workflows = []
|
|
for row in cursor.fetchall():
|
|
workflows.append(
|
|
{
|
|
"workflow_id": row[0],
|
|
"name": row[1],
|
|
"description": row[2],
|
|
"execution_count": row[3],
|
|
"last_execution_at": row[4],
|
|
"tags": json.loads(row[5]) if row[5] else [],
|
|
}
|
|
)
|
|
conn.close()
|
|
return workflows
|
|
|
|
def delete_workflow(self, workflow_id: str) -> bool:
|
|
conn = sqlite3.connect(self.db_path, check_same_thread=False)
|
|
cursor = conn.cursor()
|
|
cursor.execute("DELETE FROM workflows WHERE workflow_id = ?", (workflow_id,))
|
|
deleted = cursor.rowcount > 0
|
|
cursor.execute("DELETE FROM workflow_executions WHERE workflow_id = ?", (workflow_id,))
|
|
conn.commit()
|
|
conn.close()
|
|
return deleted
|
|
|
|
def save_execution(
|
|
self, workflow_id: str, execution_context: "WorkflowExecutionContext"
|
|
) -> str:
|
|
import uuid
|
|
|
|
execution_id = str(uuid.uuid4())[:16]
|
|
conn = sqlite3.connect(self.db_path, check_same_thread=False)
|
|
cursor = conn.cursor()
|
|
started_at = (
|
|
int(execution_context.execution_log[0]["timestamp"])
|
|
if execution_context.execution_log
|
|
else int(time.time())
|
|
)
|
|
completed_at = int(time.time())
|
|
cursor.execute(
|
|
"\n INSERT INTO workflow_executions\n (execution_id, workflow_id, started_at, completed_at, status, execution_log, variables, step_results)\n VALUES (?, ?, ?, ?, ?, ?, ?, ?)\n ",
|
|
(
|
|
execution_id,
|
|
workflow_id,
|
|
started_at,
|
|
completed_at,
|
|
"completed",
|
|
json.dumps(execution_context.execution_log),
|
|
json.dumps(execution_context.variables),
|
|
json.dumps(execution_context.step_results),
|
|
),
|
|
)
|
|
cursor.execute(
|
|
"\n UPDATE workflows\n SET execution_count = execution_count + 1,\n last_execution_at = ?\n WHERE workflow_id = ?\n ",
|
|
(completed_at, workflow_id),
|
|
)
|
|
conn.commit()
|
|
conn.close()
|
|
return execution_id
|
|
|
|
def get_execution_history(self, workflow_id: str, limit: int = 10) -> List[dict]:
|
|
conn = sqlite3.connect(self.db_path, check_same_thread=False)
|
|
cursor = conn.cursor()
|
|
cursor.execute(
|
|
"\n SELECT execution_id, started_at, completed_at, status\n FROM workflow_executions\n WHERE workflow_id = ?\n ORDER BY started_at DESC\n LIMIT ?\n ",
|
|
(workflow_id, limit),
|
|
)
|
|
executions = []
|
|
for row in cursor.fetchall():
|
|
executions.append(
|
|
{
|
|
"execution_id": row[0],
|
|
"started_at": row[1],
|
|
"completed_at": row[2],
|
|
"status": row[3],
|
|
}
|
|
)
|
|
conn.close()
|
|
return executions
|