Storage Layer¶
Storage is abstracted behind a StoragePort protocol. The agent and all tools talk to the protocol — never to a specific adapter directly.
Architecture¶
graph TD
AG[Agent tools] --> SP[StoragePort protocol\napp/memory/ports.py]
BG[Background loops] --> SP
DASH[Dashboard router] --> SP
SP --> SQ[SqliteAdapter\napp/memory/adapters/sqlite.py]
SP --> PG[PostgresAdapter\napp/memory/adapters/postgres.py]
SQ --> DB1[(kwasi.db\nlocal dev)]
PG --> DB2[(PostgreSQL\nRailway prod)]
Switch adapters with a single env var:
STORAGE_BACKEND=sqlite # default, uses DATABASE_URL=sqlite:///kwasi.db
STORAGE_BACKEND=postgres # uses DATABASE_URL from Railway
Schema¶
The table/column listing below is hand-maintained reference. At runtime the agent never relies on it:
describe_schema()(onStoragePort+ both adapters) introspects the live schema frominformation_schema(Postgres) /sqlite_master+PRAGMA table_info(SQLite) and injects it into the system prompt as a "## Live database schema" section, so the raw-SQL tools (db_query/db_execute) can never drift from the real database. It is cached for the process lifetime and fails safe to an empty section.
interactions¶
Every conversation exchange, from any channel.
| Column | Type | Notes |
|---|---|---|
id |
TEXT (UUID) | Primary key |
user_message |
TEXT | What you sent |
agent_response |
TEXT | What Kwasi replied |
tools_used |
TEXT | JSON array of tool call records |
channel |
TEXT | "telegram", "cli", or "whatsapp" |
user_id |
TEXT | Telegram user ID (nullable) |
created_at |
TIMESTAMP | UTC |
embedding |
TEXT (SQLite) / halfvec(3072) (Postgres) | 3072-float embedding from Gemini gemini-embedding-001 (16-bit on Postgres). NULL until embedded. Used by semantic_search. |
notes¶
| Column | Type | Notes |
|---|---|---|
id |
TEXT (UUID) | Primary key |
title |
TEXT | |
content |
TEXT | |
created_at |
TIMESTAMP | UTC |
embedding |
TEXT (SQLite) / halfvec(3072) (Postgres) | 3072-float embedding from Gemini gemini-embedding-001 (16-bit on Postgres). NULL until embedded. Used by semantic_search. |
tasks¶
| Column | Type | Notes |
|---|---|---|
id |
TEXT (UUID) | Primary key |
title |
TEXT | |
status |
TEXT | "todo" or "done" |
due_date |
TEXT | ISO date string (YYYY-MM-DD), nullable |
notified |
BOOLEAN | true once a due-date notification has been sent |
created_at |
TIMESTAMP | UTC |
reminders¶
| Column | Type | Notes |
|---|---|---|
id |
TEXT (UUID) | Primary key |
user_id |
TEXT | Telegram user ID or WhatsApp phone number |
chat_id |
TEXT | Delivery target: Telegram chat ID or WhatsApp phone number |
message |
TEXT | The reminder text |
remind_at |
TIMESTAMP | UTC — when to fire |
status |
TEXT | "pending", "sent", or "cancelled" |
channel |
TEXT | "telegram" or "whatsapp" — delivery channel |
created_at |
TIMESTAMP | UTC |
context¶
Dual-purpose: it stores the long-term user profile produced by the Reflection Engine and doubles as a general-purpose system key-value store — runtime config overrides, deployment markers, loop heartbeats, and auth caches all live here under system:* keys. Access is exact-key only (get_context(user_id) / save_context(...)); there is no prefix scan, so consumers read a known fixed set of keys.
| Column | Type | Notes |
|---|---|---|
user_id |
TEXT | Primary key. Holds the profile or a system key (see below). |
content |
TEXT | Markdown profile ≤550 words (for "global"); ISO date string, JSON, or iso\|status\|detail for system keys; "" = cleared (read back as absent). |
last_updated |
TIMESTAMP | UTC |
System keys (user_id prefixes):
| Key | Written by | Content |
|---|---|---|
global |
Reflection Engine | The user profile (markdown) |
system:briefing |
morning-briefing loop | Today's date — dedups the daily send across restarts |
system:stale_check |
reflection | This week's Monday — weekly stale-facts nudge cooldown |
system:meeting_prep:<event_id> |
meeting-prep loop | Per-event prep-sent marker |
system:meeting_followup:gmail:<message_id> |
meeting-followup loop | Per-email processed marker |
system:config:<field> |
set_runtime_config |
A runtime config override, e.g. system:config:model_name → anthropic:claude-sonnet-4-6. Re-applied at boot by load_overrides. |
system:pending_self_redeploy |
railway_set_env / railway_redeploy executors |
JSON marker {chat_id, kind, change, expected, triggered_at, prev_deploy_id}. Consumed on next boot by confirm_pending_self_redeploy to post a ✅/⚠️. |
system:<loop>:heartbeat |
background loops | Loop health signal iso\|status\|detail (e.g. 2026-06-09T…\|ok\|3_processed). diagnose_self reads these to surface silent failures. |
system:outlook_msal_cache |
Outlook MSAL | Serialized MSAL token cache (opaque) |
user_facts¶
Durable key-value facts about the user, treated as ground truth. Written by the agent (remember_fact) or auto-extracted by nightly reflection. Fetched at runtime via recall_facts() — not injected into the system prompt. Mostly permanent, but decay_user_facts() removes a narrow class of rapidly-stale auto-facts (date-stamped snapshots, completed-task records, external-system shadows) once per nightly reflection.
| Column | Type | Notes |
|---|---|---|
id |
TEXT (UUID) | Primary key |
key |
TEXT | UNIQUE. snake_case identifier, e.g. home_address, partner_name |
value |
TEXT | The actual fact value |
category |
TEXT | location, personal, preference, work, health, or general |
source |
TEXT | "agent" (proactive/explicit) or "reflection" (auto-extracted nightly) |
created_at |
TIMESTAMP | UTC — set on first insert, never changed |
updated_at |
TIMESTAMP | UTC — refreshed on every upsert |
The unique index on key means save_user_fact is always an upsert — writing the same key a second time overwrites the value.
news_topics¶
Explicitly followed news topics.
| Column | Type | Notes |
|---|---|---|
id |
TEXT (UUID) | Primary key |
topic |
TEXT | Unique, stored lowercase |
created_at |
TIMESTAMP | UTC |
seen_news¶
Rolling 7-day dedup store for fetched news stories.
| Column | Type | Notes |
|---|---|---|
url |
TEXT | Primary key |
title |
TEXT | Story headline |
seen_at |
TIMESTAMP | UTC — entries older than 7 days are purged on next get_news call |
scheduled_tasks¶
User-defined recurring jobs evaluated by _user_scheduled_tasks_loop every 60 seconds.
| Column | Type | Notes |
|---|---|---|
id |
TEXT (UUID) | Primary key |
name |
TEXT | Human-readable label |
prompt |
TEXT | What to run the agent with when the task fires |
cron_expression |
TEXT | Standard 5-field cron, e.g. 0 9 * * 1 = Monday 9am |
timezone |
TEXT | IANA timezone (defaults to USER_TIMEZONE) |
enabled |
BOOLEAN | true to run, false to pause |
created_by |
TEXT | "user" or "system" |
last_run |
TIMESTAMP | UTC — last time this task successfully fired |
created_at |
TIMESTAMP | UTC |
audit_log¶
Every tool call the agent makes, with sanitised arguments and truncated output.
| Column | Type | Notes |
|---|---|---|
id |
TEXT (UUID) | Primary key |
timestamp |
TIMESTAMP | UTC |
tool_name |
TEXT | Name of the tool called |
arguments |
TEXT | JSON — keys matching key, token, secret, password are redacted to "***" |
result_summary |
TEXT | First 200 characters of tool output |
channel |
TEXT | "telegram", "whatsapp", or "cli" |
user_id |
TEXT | Nullable |
read_later¶
Articles saved by the user via the Content Curator skill.
| Column | Type | Notes |
|---|---|---|
id |
TEXT (UUID) | Primary key (prefix match supported in delete_read_later) |
url |
TEXT | Original URL |
title |
TEXT | Article title |
summary |
TEXT | Auto-generated at save time via LLM summarisation |
note |
TEXT | Optional user annotation |
tags |
TEXT | Comma-separated keywords extracted from title + summary at save time (frequency-ranked, stop-word filtered, up to 10). Used by find_relevant_read_later() to surface contextually relevant articles. |
added_at |
TIMESTAMP | UTC |
read_at |
TIMESTAMP | Set when marked read via mark_read_later_read (nullable); list_read_later(include_read=False) filters on it |
embedding |
TEXT (SQLite) / halfvec(3072) (Postgres) | 3072-float embedding from Gemini gemini-embedding-001 (16-bit on Postgres). NULL until embedded. Used by semantic_search. |
alert_rules¶
Proactive alert rules evaluated every 5 minutes by _alert_loop. Two system defaults seeded on first run.
| Column | Type | Notes |
|---|---|---|
id |
TEXT (UUID) | Primary key |
name |
TEXT | Human-readable label |
trigger_type |
TEXT | task_due_today, task_overdue (phase 2: meeting_soon, email_arrived) |
conditions |
TEXT | JSON object of trigger-specific params (e.g. {"fire_hour": 11}) |
cooldown_hours |
INTEGER | Minimum hours between consecutive firings — prevents spam |
enabled |
BOOLEAN | true to evaluate, false to pause |
created_by |
TEXT | "user" or "system" |
last_fired |
TIMESTAMP | UTC — last time this rule triggered a message |
created_at |
TIMESTAMP | UTC |
agent_learnings¶
Behavioral corrections extracted by the nightly Reflection Engine. Promoted from "candidate" to "active" when seen in ≥2 reflection cycles.
| Column | Type | Notes |
|---|---|---|
id |
TEXT (UUID) | Primary key |
rule |
TEXT | UNIQUE (normalized lowercase). Short imperative sentence ≤200 chars, e.g. "always confirm before deleting any item" |
category |
TEXT | response_format, tool_usage, decision_making, tone, or scope |
recurrence_count |
INTEGER | Incremented each time the same rule is re-extracted — triggers promotion at ≥2 |
status |
TEXT | "candidate" (new, not yet confirmed) or "active" (shown in system prompt) |
first_seen |
TIMESTAMP | UTC |
last_seen |
TIMESTAMP | UTC |
pending_actions¶
Holds consequential actions awaiting user confirmation via the Telegram inline approval flow. Created by approval_gate() when the agent calls a gated tool; resolved by the user tapping Confirm, Cancel, or Edit.
| Column | Type | Notes |
|---|---|---|
id |
TEXT (UUID) | Primary key |
tool_name |
TEXT | e.g. "create_task", "send_email_wrapper", "__plan__", "__plan_resume__" |
action_type |
TEXT | "task", "reminder", "note", "journal", "email", "calendar", "todo", "slack", "code", "delegation", "database", "config", "deployment", "plan" (Spec 008 — multi-step plan awaiting Confirm), or "plan_resume" (Spec 008 — partial-failure resume) |
payload |
TEXT | JSON-serialized kwargs to pass to the executor on Confirm. For "plan" this is the serialised ExecutionPlan; for "plan_resume" it is the serialised PlanResumePayload (plan + remaining_steps + scratchpad + failed_step + failure_reason). |
preview_text |
TEXT | Human-readable description shown to the user |
chat_id |
TEXT | Telegram chat ID that triggered the action |
message_id |
INTEGER | Telegram message ID of the preview message (set after send, used by expiry loop to edit) |
status |
TEXT | "pending", "confirmed", "cancelled", "expired", or "failed" |
notified |
BOOLEAN | true once the user has seen an expiry notice |
edit_count |
INTEGER | Number of times the user has used the Edit button on this action |
original_user_message |
TEXT | The user's original message, preserved for the Edit re-run prompt |
trace_id |
TEXT | Nullable. OTEL trace ID captured at gate time so the eventual Confirm/Cancel/Edit decision can attach a user_approval / user_edit score to the originating trace asynchronously (Spec 009 — Langfuse observability). |
created_at |
TIMESTAMP | UTC |
expires_at |
TIMESTAMP | UTC — created_at + 30 min; gate rejects execution after this point |
Indexed on (chat_id, status) and (expires_at, status) for efficient per-chat and expiry lookups.
pending_intentions¶
Soft personal commitments extracted from conversation by the nightly Reflection Engine. Follow-ups delivered proactively.
| Column | Type | Notes |
|---|---|---|
id |
TEXT (UUID) | Primary key |
text |
TEXT | The intention text, e.g. "call doctor about knee" |
mentioned_at |
TIMESTAMP | UTC — when the intention was first expressed |
follow_up_days |
INTEGER | Days until first follow-up (default 3) |
follow_up_at |
TIMESTAMP | UTC — next scheduled follow-up delivery |
status |
TEXT | pending, snoozed, resolved, dismissed |
last_follow_up_at |
TIMESTAMP | UTC — when the last follow-up was sent (nullable) |
created_at |
TIMESTAMP | UTC |
journal_entries¶
Private journal entries saved by the agent via save_journal_entry. Included in the nightly Reflection Engine context so they inform the long-term narrative profile and intention extraction. A weekly digest fires via _journal_digest_loop.
| Column | Type | Notes |
|---|---|---|
id |
TEXT (UUID) | Primary key |
content |
TEXT | Full text of the entry |
title |
TEXT | ≤10-word title capturing the main theme, generated by the model |
source |
TEXT | "voice" (default) or "text" — how the entry was created |
created_at |
TIMESTAMP | UTC |
Indexed on created_at for efficient time-range lookups. Entries are never embedded (too personal/short for vector search) — retrieved by date range only.
health_samples¶
Wearable / Health Connect samples ingested by the Android bridge (Spec 010). Single normalised, append-only table covering all 11 metric types with a flexible JSON value payload — schema is unchanged when adding a new metric type.
| Column | Type | Notes |
|---|---|---|
id |
BIGSERIAL (Postgres) / INTEGER AUTOINCREMENT (SQLite) | Primary key |
metric_type |
TEXT | One of: steps, sleep_session, heart_rate, hrv_rmssd, spo2, resting_hr, respiratory_rate, exercise, body_fat, weight, blood_pressure |
start_time |
TIMESTAMP WITH TIME ZONE | UTC |
end_time |
TIMESTAMP WITH TIME ZONE | UTC, nullable — null for instantaneous samples (HR, HRV, SpO2, RHR, …); set for sessions (sleep, exercise) and intervals (steps) |
value |
JSONB (Postgres) / TEXT (SQLite) | Metric-specific payload — e.g. {"bpm": 62}, {"rmssd_ms": 38}, {"score": 78, "stages": [...]}, {"count": 4231} |
source_device |
TEXT | e.g. Galaxy Watch 5 Pro |
source_app |
TEXT | e.g. com.sec.android.app.shealth |
created_at |
TIMESTAMP WITH TIME ZONE | UTC, when the row landed in the server |
Indexes:
- UNIQUE (metric_type, start_time, source_device) — idempotency key; duplicate POSTs from the bridge are dropped silently
- (metric_type, start_time DESC) — for time-range reads by the agent's health tools
Writes never embed (high volume, low semantic value). Reads are always time-bounded — get_health_samples(metric_type, since, until=None, limit=5000).
capability_gaps¶
Logged when Kwasi declines a request. Classified by whether the underlying capability exists in the deployed source — see Architecture → Self-Improvement Loop.
| Column | Type | Notes |
|---|---|---|
id |
TEXT (UUID) | Primary key |
user_message |
TEXT | The original user message |
agent_response |
TEXT | Kwasi's decline response |
classification |
TEXT | "gap" (no source matches → real capability hole), "available" (source matches found → routing/prompt failure), or "unknown" |
suspected_capability |
TEXT | Short noun phrase extracted by the mini-model classifier, e.g. "linkedin scraping" |
classifier_reason |
TEXT | Why this classification — combines grep-result summary + mini-model rationale |
grep_evidence |
TEXT | Truncated grep_source output that informed the classification (nullable) |
status |
TEXT | "open", "resolved", or "dismissed" |
created_at |
TIMESTAMP | UTC |
resolved_at |
TIMESTAMP | UTC, nullable |
Indexes: (status, created_at DESC) for the digest, (classification) for filtering by class.
proposed_skills¶
Drafts produced by propose_skill — a complete .py file held in the DB until activation writes it to app/skills/.
| Column | Type | Notes |
|---|---|---|
id |
TEXT (UUID) | Primary key |
gap_id |
TEXT | Nullable. Source capability_gap id, if drafted from one |
name |
TEXT | Snake-case slug, e.g. "linkedin_summary" — becomes app/skills/{name}.py on activation |
description |
TEXT | Free-form description provided by the proposer |
code |
TEXT | Full .py file content. Never on disk until activated |
validation_status |
TEXT | "pending", "passed", or "failed" |
validation_output |
TEXT | Validator report (passed/failed reason) |
status |
TEXT | "draft", "activated", or "rejected" |
created_at |
TIMESTAMP | UTC |
activated_at |
TIMESTAMP | UTC, nullable |
Indexes: (status, created_at DESC) for listing drafts, (name) for collision checks.
delegations¶
Each row tracks one delegate_to_coding_agent or delegate_web_task invocation — see Architecture → Task Delegation.
| Column | Type | Notes |
|---|---|---|
id |
TEXT (UUID) | Primary key |
task_description |
TEXT | The task sent to the sandbox |
status |
TEXT | "queued", "running", "completed", "failed", "cancelled" |
backend |
TEXT | "opencode" (default), "claude_code", or "web_task" |
chat_id |
TEXT | Telegram chat to deliver result to (nullable) |
session_id |
TEXT | Backend session id (for future --resume; nullable) |
started_at / completed_at |
TIMESTAMP | UTC (nullable) |
result_summary |
TEXT | Final answer (default '') |
files_summary |
TEXT | Comma-separated generated-file paths (default '') |
duration_seconds |
INT | Wall-clock (nullable) |
input_tokens / output_tokens |
INT | Stream-json telemetry (Claude Code path only; nullable) |
estimated_cost_usd |
REAL/NUMERIC | Soft-cap enforced for Claude Code; null otherwise |
error |
TEXT | Failure summary (nullable) |
credential |
TEXT | web_task only: name of a vault credential used for login — never the secret (nullable) |
created_at |
TIMESTAMP | UTC |
Indexes: (status, created_at DESC) for listing + recovering orphans on container restart.
vault_credentials¶
Encrypted website logins for delegate_web_task(credential=...) — see Deployment → Credential Vault. The *_enc columns are Fernet ciphertext (master key from VAULT_MASTER_KEY, env only); domains is plaintext (not a secret — needed for allowed_domains scoping). db_query refuses this table.
| Column | Type | Notes |
|---|---|---|
id |
TEXT (UUID) | Primary key |
service |
TEXT | Unique lookup name, e.g. navette |
domains |
TEXT | Comma-separated hostnames (plaintext) |
username_enc / password_enc |
TEXT | Fernet ciphertext |
totp_seed_enc |
TEXT | Fernet ciphertext, nullable |
created_at / updated_at |
TIMESTAMP | UTC |
Data Models¶
@dataclass
class SemanticSearchResult:
source: str # "notes" | "interactions" | "read_later"
id: str
title: str # note title / "Conversation YYYY-MM-DD" / article title
snippet: str # first 200 chars of relevant content
similarity: float # cosine similarity 0.0–1.0
created_at: datetime | None # used for recency weighting in context injection
StoragePort Protocol¶
Defined in app/memory/ports.py (the single source of truth — read it directly rather than relying on a copy here). Every method is async. The protocol groups into:
- Interactions —
save_interaction,get_interactions,get_interactions_by_user,get_interactions_since,get_interactions_by_ids,search_interactions - Context / facts / learnings / intentions —
get_context/save_context; user-fact CRUD +decay_user_facts;save_agent_learning/get_agent_learnings/delete_agent_learning; intention CRUD +get_due_intentions/count_followups_today - Notes / tasks / reminders / read-later / journal / news — standard CRUD (+
get_tasks_due/mark_task_notified,mark_read_later_read,get_due_reminders) - Scheduled tasks / alert rules / audit log — CRUD +
get_request_log/delete_old_audit_entries - Pending actions (approval gate), Delegations (
save/get/update/count_delegations_today/list_running_delegations), Vault (save_vault_entry/get_vault_entry/list_vault_services/list_vault_meta) - Self-improvement — capability-gap + proposed-skill CRUD
- Health samples —
save_health_samples(idempotent),get_health_samples,get_health_freshness - Search —
semantic_search,hybrid_search(keyword + dense, RRF-fused) - Raw SQL / schema —
raw_sql_query(read-only),raw_sql_execute,raw_sql_dryrun,describe_schema close
Representative signatures
class StoragePort(Protocol):
async def save_interaction(self, interaction: Interaction) -> None: ...
async def get_interactions(self, limit: int = 50) -> list[Interaction]: ...
async def get_interactions_by_user(self, user_id: str, limit: int = 20) -> list[Interaction]: ...
async def get_interactions_since(self, since: datetime) -> list[Interaction]: ...
async def search_interactions(self, query: str, limit: int = 10) -> list[Interaction]: ...
# Context
async def get_context(self, user_id: str) -> UserContext | None: ...
async def save_context(self, context: UserContext) -> None: ...
# Notes
async def save_note(self, note: Note) -> None: ...
async def get_notes(self) -> list[Note]: ...
async def search_notes(self, query: str) -> list[Note]: ...
async def update_note(self, note_id: str, title: str, content: str) -> bool: ...
async def delete_note(self, note_id: str) -> bool: ...
# Tasks
async def create_task(self, task: Task) -> None: ...
async def list_tasks(self, status: str | None = None) -> list[Task]: ...
async def search_tasks(self, query: str) -> list[Task]: ...
async def get_tasks_due(self) -> list[Task]: ...
async def mark_task_notified(self, task_id: str) -> None: ...
async def update_task(self, task_id: str, title: str | None, due_date: str | None) -> bool: ...
async def complete_task(self, task_id: str) -> bool: ...
async def delete_task(self, task_id: str) -> bool: ...
# Reminders
async def create_reminder(self, reminder: Reminder) -> None: ...
async def get_due_reminders(self) -> list[Reminder]: ...
async def list_reminders(self, user_id: str | None = None) -> list[Reminder]: ...
async def update_reminder_status(self, reminder_id: str, status: str) -> None: ...
async def cancel_reminder(self, reminder_id: str) -> bool: ...
# Scheduled tasks
async def create_scheduled_task(self, task: ScheduledTask) -> None: ...
async def list_scheduled_tasks(self) -> list[ScheduledTask]: ...
async def update_scheduled_task(self, task_id: str, **kwargs) -> bool: ...
async def delete_scheduled_task(self, task_id: str) -> bool: ...
async def update_scheduled_task_last_run(self, task_id: str, last_run: datetime) -> None: ...
# Audit log
async def log_audit_entry(self, entry: AuditEntry) -> None: ...
async def get_audit_log(self, limit: int = 50) -> list[AuditEntry]: ...
# News
async def add_news_topic(self, topic: NewsTopic) -> None: ...
async def remove_news_topic(self, topic: str) -> bool: ...
async def list_news_topics(self) -> list[NewsTopic]: ...
async def add_seen_story(self, story: SeenStory) -> None: ...
async def get_seen_urls(self) -> set[str]: ...
async def cleanup_seen_stories(self, days: int = 7) -> None: ...
# Read Later
async def save_read_later(self, item: ReadLaterItem) -> None: ...
async def list_read_later(self) -> list[ReadLaterItem]: ...
async def delete_read_later(self, item_id: str) -> None: ...
# User Facts
async def save_user_fact(self, fact: UserFact) -> None: ... # upsert on key
async def get_user_fact(self, key: str) -> UserFact | None: ...
async def get_all_user_facts(self) -> list[UserFact]: ... # ordered by category, key
async def search_user_facts(self, query: str) -> list[UserFact]: ...
async def delete_user_fact(self, key: str) -> bool: ...
# Alert Rules
async def create_alert_rule(self, rule: AlertRule) -> None: ...
async def list_alert_rules(self, enabled_only: bool = False) -> list[AlertRule]: ...
async def update_alert_rule(self, rule_id: str, **kwargs) -> bool: ...
async def delete_alert_rule(self, rule_id: str) -> bool: ...
async def update_alert_rule_last_fired(self, rule_id: str, fired_at: datetime) -> None: ...
# Pending Intentions
async def create_intention(self, intention: PendingIntention) -> None: ...
async def list_intentions(self, status: str | None = None) -> list[PendingIntention]: ...
async def get_due_intentions(self) -> list[PendingIntention]: ...
async def update_intention(self, intention_id: str, **kwargs) -> bool: ...
async def count_followups_today(self) -> int: ...
# Agent Learnings
async def save_agent_learning(self, learning: AgentLearning) -> None: ... # upsert on rule
async def get_agent_learnings(self, status: str | None = None) -> list[AgentLearning]: ...
async def delete_agent_learning(self, learning_id: str) -> bool: ...
# Pending Actions (approval flow)
async def save_pending_action(self, action: PendingAction) -> None: ...
async def get_pending_action(self, action_id: str) -> PendingAction | None: ...
async def update_pending_action(self, action_id: str, **kwargs) -> bool: ...
async def list_expired_pending_actions(self) -> list[PendingAction]: ... # pending rows past expires_at
async def get_pending_actions_for_chat(self, chat_id: str, status: str = "pending") -> list[PendingAction]: ...
async def delete_old_pending_actions(self, days: int = 7) -> int: ... # prune confirmed/cancelled/expired rows
async def delete_old_audit_entries(self, days: int = 90) -> int: ... # prune old audit_log rows
# Journal Entries
async def save_journal_entry(self, entry: JournalEntry) -> None: ...
async def list_journal_entries(self, days: int | None = None) -> list[JournalEntry]: ...
# Health Samples (Spec 010 — Wearable / Health Connect ingest)
async def save_health_samples(self, samples: list[dict]) -> tuple[int, int]: ...
# Idempotent on (metric_type, start_time, source_device); returns (accepted, duplicates).
async def get_health_samples(
self,
metric_type: str,
since: datetime,
until: datetime | None = None,
limit: int = 5000,
) -> list[dict]: ...
# Semantic Search
async def semantic_search(
self,
embedding: list[float],
sources: list[str] | None = None, # default: all three
limit: int = 5,
) -> list[SemanticSearchResult]: ...
async def close(self) -> None: ...
Adapter Differences¶
| Feature | SQLite | PostgreSQL |
|---|---|---|
| Case-insensitive search | lower(column) LIKE lower(pattern) — fully case-insensitive for all Unicode |
ILIKE — always case-insensitive |
| Timestamps | Stored as ISO 8601 strings | Native TIMESTAMP WITH TIME ZONE |
| Connection | Single persistent connection | asyncpg connection pool (1–10) |
| Schema init | CREATE TABLE IF NOT EXISTS on first connect |
Same, plus SSL fallback |
| Migrations | ALTER TABLE ... ADD COLUMN in try/except |
ALTER TABLE ... ADD COLUMN IF NOT EXISTS |
| Boolean | INTEGER (0/1) |
Native BOOLEAN |
| Vector search | Python cosine similarity (full scan) | pgvector <=> cosine operator with HNSW index |
| Embedding storage | JSON text array | Native halfvec(3072) type (16-bit; requires pgvector ≥0.7 extension) |
Semantic Search¶
Both adapters implement semantic_search() which finds content by meaning rather than keywords. The query text is embedded via embed_text() in app/tools/embedding.py (Gemini gemini-embedding-001, 3072 dimensions) before calling this method.
SQLite (local dev): Fetches all rows with non-null embeddings, computes Python cosine similarity in memory, sorts and truncates to limit. Streams row-by-row via cursor — does not load all embeddings into memory at once. Acceptable for a personal assistant's data volumes.
PostgreSQL (production): Uses the pgvector extension with the <=> cosine distance operator and an HNSW index (halfvec_cosine_ops) on each table. Each source independently returns its best limit candidates; results are merged and re-ranked before returning the top limit overall. Embeddings are stored as halfvec(3072) (16-bit floats) — full-precision vector(3072) exceeds pgvector's 2000-dim HNSW limit and would force a sequential scan.
Graceful degradation: If GOOGLE_API_KEY is not set, embed_text() returns None and no embedding is stored or queried — keyword search continues to work normally. If the pgvector extension is unavailable on Postgres, the CREATE EXTENSION and ADD COLUMN statements fail silently and semantic_search() returns an empty list.
Backfilling: Existing rows without embeddings can be embedded in bulk via POST /embed-backfill (protected by X-Reflection-Secret). Rate-limited to ~10 rows/second.
Schema Migrations¶
Neither adapter uses a migration framework. New columns are added via ALTER TABLE statements run on every startup. New tables are created with CREATE TABLE IF NOT EXISTS. On Postgres these use ADD COLUMN IF NOT EXISTS (idempotent); on SQLite (no IF NOT EXISTS for columns) the loop catches only the "duplicate column name" OperationalError and re-raises anything else, so a genuinely broken migration no longer silently leaves a half-built schema. No rollback support.
Current migrations applied at startup:
tasks.notified— due-date notification trackingaudit_log.duration_ms,audit_log.tokens_used— request timing and token trackingread_later.tags— keyword tags for relevance matching;read_later.read_at— read-state filteringreminders.channel— delivery channel (telegram/whatsapp)pending_actions.trace_id— Langfuse trace id for delayed approval scoringdelegations.backend,delegations.credential— backend label + vault-credential name (web_task)notes.embedding,interactions.embedding,read_later.embedding— vector embeddings for semantic search (added as TEXT in SQLite,halfvec(3072)in Postgres via pgvector ≥0.7 extension; legacyvector(3072)columns auto-migrate in-place on first pool init)health_samples,vault_credentials,capability_gaps,proposed_skills— created on first connect for both adapters