TinyDB Persistence Layer
The Supervaizer TinyDB persistence layer provides thread-safe, local storage for WorkflowEntity instances (Jobs, Cases, Missions) across sessions and processes.
Features
- Thread-safe operations using TinyDB's CachingMiddleware and threading locks
- Entity-specific tables with foreign key relationships via ID references
- Auto-persistence on EntityLifecycle state transitions
- Type-safe repositories for entity-specific operations
- Configurable storage path with automatic directory creation
- Cross-session data loading for service restarts
Core Components
StorageManager
The main persistence interface providing CRUD operations for entity dictionaries.
from supervaizer.storage import StorageManager
# Initialize with custom path
storage = StorageManager(db_path="./data/entities.json")
# Save an entity
job_dict = job.to_dict
storage.save_object("Job", job_dict)
# Retrieve entities
job_data = storage.get_object_by_id("Job", "job-123")
all_jobs = storage.get_objects("Job")
# Delete entities
storage.delete_object("Job", "job-123")
# Reset all data
storage.reset_storage()
EntityRepository
Type-safe repository pattern for specific entity types.
from supervaizer.storage import create_job_repository, create_case_repository
# Create repositories
job_repo = create_job_repository()
case_repo = create_case_repository()
# Save entities (auto-converts using to_dict)
job_repo.save(job)
case_repo.save(case)
# Retrieve entities (auto-reconstructs using model_validate)
job = job_repo.get_by_id("job-123")
all_jobs = job_repo.get_all()
# Delete entities
job_repo.delete("job-123")
PersistentEntityLifecycle
Enhanced lifecycle management with automatic persistence.
from supervaizer.storage import PersistentEntityLifecycle, StorageManager
from supervaizer.lifecycle import EntityStatus, EntityEvents
storage = StorageManager()
# Auto-persist on transitions
success, error = PersistentEntityLifecycle.transition(
job, EntityStatus.IN_PROGRESS, storage
)
# Auto-persist on events
success, error = PersistentEntityLifecycle.handle_event(
job, EntityEvents.START_WORK, storage
)
Data Model
Entity Tables
Each entity type is stored in a dedicated TinyDB table:
- Job table: Stores Job entities with
case_ids
field for relationships - Case table: Stores Case entities with
job_id
field for parent reference - Mission table: Future extension for Mission entities
Foreign Key Relationships
Relationships are represented via ID references:
# Job stores list of case IDs
job.case_ids = ["case-1", "case-2", "case-3"]
# Case stores its parent job ID
case.job_id = "job-123"
Entity Structure
Entities are persisted using their to_dict
property:
job_dict = {
"id": "job-123",
"name": "Example Job",
"status": "in_progress",
"case_ids": ["case-1", "case-2"],
"job_context": {...},
"finished_at": "2025-01-01T12:00:00",
# ... other fields
}
Usage Patterns
Basic Setup
from supervaizer.storage import StorageManager
# Production setup
storage = StorageManager(db_path="./data/entities.json")
# Test setup
storage = StorageManager(db_path="./test_data/test_entities.json")
# In-memory for unit tests
storage = StorageManager(db_path=":memory:")
Entity Creation and Persistence
from supervaizer.job import Job, JobContext
from supervaizer.case import Case
from supervaizer.storage import StorageManager
storage = StorageManager()
# Create job
job = Job(
id="job-123",
name="My Job",
agent_name="my-agent",
status=EntityStatus.STOPPED,
job_context=job_context,
)
# Create case (automatically adds to job.case_ids)
case = Case.start(
job_id="job-123",
name="My Case",
account=account,
description="Case description",
)
# Persist entities
storage.save_object("Job", job.to_dict)
storage.save_object("Case", case.to_dict)
Loading Data on Startup
The system automatically loads running entities from storage during server startup. This ensures that after a server restart, all running workflows continue to be accessible through the in-memory registries.
Automatic Loading
The server startup process includes automatic loading of running entities:
# This happens automatically during server initialization
load_running_entities_on_startup()
Only entities in running states are loaded:
IN_PROGRESS
CANCELLING
AWAITING
This selective loading ensures that only active workflows are restored to memory, keeping the system efficient.
Manual Loading (All Entities)
For testing or special cases, you can manually load all entities:
def load_all_entities_on_startup():
"""Load all entities from storage and populate registries."""
storage = StorageManager()
# Load jobs
job_data_list = storage.get_objects("Job")
for job_data in job_data_list:
# Reconstruct Job object and add to registry
job = Job.model_validate(job_data)
Jobs().add_job(job)
# Load cases
case_data_list = storage.get_objects("Case")
for case_data in case_data_list:
# Reconstruct Case object and add to registry
case = Case.model_validate(case_data)
Cases().add_case(case)
print(f"Loaded {len(job_data_list)} jobs and {len(case_data_list)} cases")
Auto-Persistence Integration
Replace standard lifecycle calls with persistent versions:
# Standard (no persistence)
from supervaizer.lifecycle import EntityLifecycle
EntityLifecycle.transition(job, EntityStatus.IN_PROGRESS)
# Auto-persisting version
from supervaizer.storage import PersistentEntityLifecycle
PersistentEntityLifecycle.transition(job, EntityStatus.IN_PROGRESS, storage)
Relationship Queries
# Get all cases for a job
job_cases = storage.get_cases_for_job("job-123")
# Get job for a case
case_data = storage.get_object_by_id("Case", "case-456")
job_data = storage.get_object_by_id("Job", case_data["job_id"])
# Count cases per job
all_jobs = storage.get_objects("Job")
for job_data in all_jobs:
case_count = len(storage.get_cases_for_job(job_data["id"]))
print(f"Job {job_data['id']} has {case_count} cases")
Configuration
Environment-Based Setup
import os
from supervaizer.storage import StorageManager
# Environment-based configuration
def create_storage():
env = os.getenv("SUPERVAIZER_ENV", "production")
if env == "test":
return StorageManager(db_path="./test_data/entities.json")
elif env == "development":
return StorageManager(db_path="./dev_data/entities.json")
else:
return StorageManager(db_path="./data/entities.json")
Storage Path Configuration
The default storage path is ./data/entities.json
, but can be customized:
# Custom path
storage = StorageManager(db_path="/var/lib/supervaizer/entities.json")
# Relative to project root
storage = StorageManager(db_path="./storage/production_entities.json")
# Temporary for testing
import tempfile
with tempfile.TemporaryDirectory() as temp_dir:
storage = StorageManager(db_path=f"{temp_dir}/test_entities.json")
Thread Safety
The StorageManager is thread-safe through:
- TinyDB CachingMiddleware: Provides atomic writes
- Threading locks: Protects concurrent access to database operations
- Singleton pattern: Ensures single database instance per path
import threading
from supervaizer.storage import StorageManager
storage = StorageManager()
def worker_thread(worker_id):
for i in range(100):
# Thread-safe operations
data = {"id": f"worker-{worker_id}-{i}", "data": f"data-{i}"}
storage.save_object("Test", data)
retrieved = storage.get_object_by_id("Test", data["id"])
assert retrieved == data
# Start multiple threads - all operations are thread-safe
threads = [threading.Thread(target=worker_thread, args=(i,)) for i in range(10)]
for thread in threads:
thread.start()
for thread in threads:
thread.join()
Error Handling
from supervaizer.storage import StorageManager
storage = StorageManager()
try:
# Missing required 'id' field
storage.save_object("Job", {"name": "No ID"})
except ValueError as e:
print(f"Validation error: {e}")
# Non-existent object returns None
job = storage.get_object_by_id("Job", "non-existent")
assert job is None
# Failed deletion returns False
deleted = storage.delete_object("Job", "non-existent")
assert deleted is False
Testing
Unit Test Setup
import tempfile
import pytest
from supervaizer.storage import StorageManager
@pytest.fixture
def temp_storage():
"""Create a temporary storage manager for testing."""
with tempfile.TemporaryDirectory() as temp_dir:
yield StorageManager(db_path=f"{temp_dir}/test_entities.json")
def test_job_persistence(temp_storage):
job_data = {"id": "test-job", "name": "Test Job"}
temp_storage.save_object("Job", job_data)
retrieved = temp_storage.get_object_by_id("Job", "test-job")
assert retrieved == job_data
Integration Test Patterns
def test_full_workflow():
"""Test complete job/case workflow with persistence."""
with tempfile.TemporaryDirectory() as temp_dir:
storage = StorageManager(db_path=f"{temp_dir}/test.json")
# Clear registries
Jobs().__init__()
Cases().__init__()
# Create and persist entities
job = create_test_job()
case = create_test_case(job.id)
storage.save_object("Job", job.to_dict)
storage.save_object("Case", case.to_dict)
# Verify persistence
assert storage.get_object_by_id("Job", job.id) is not None
assert len(storage.get_cases_for_job(job.id)) == 1
Best Practices
- Use the singleton pattern: StorageManager is a singleton to prevent multiple database connections
- Leverage EntityRepository: Use type-safe repositories for better code organization
- Auto-persist lifecycle changes: Use PersistentEntityLifecycle for automatic persistence
- Load data on startup: Restore entity registries from storage when service starts
- Handle errors gracefully: Check for None returns and catch validation errors
- Test with temporary storage: Use temporary directories for unit tests
- Environment-specific paths: Use different storage paths for dev/test/production
Limitations
- No complex queries: TinyDB is suitable for simple key-value and basic filtering operations
- JSON serialization: All data must be JSON-serializable (handled by entity
to_dict
methods) - Single-process locks: Thread safety within a single process, not across processes
- No transactions: No atomic multi-table operations (use application-level coordination)
- Memory usage: TinyDB loads data into memory (fine for moderate datasets)
Migration Path
If you need to migrate to a more robust database later:
- Keep the interface: StorageManager interface can be implemented with SQLite, PostgreSQL, etc.
- Export data: Use
get_objects()
to export all data for migration - Preserve relationships: Foreign key structure translates well to relational databases
- Maintain threading: Keep thread-safety patterns for any database backend
Uploaded on 2025-08-12 14:19:38