Skip to content

Storage Layer

Storage is abstracted behind a StoragePort protocol. The agent and all tools talk to the protocol — never to a specific adapter directly.


Architecture

graph TD
    AG[Agent tools] --> SP[StoragePort protocol\napp/memory/ports.py]
    BG[Background loops] --> SP
    DASH[Dashboard router] --> SP
    SP --> SQ[SqliteAdapter\napp/memory/adapters/sqlite.py]
    SP --> PG[PostgresAdapter\napp/memory/adapters/postgres.py]
    SQ --> DB1[(kwasi.db\nlocal dev)]
    PG --> DB2[(PostgreSQL\nRailway prod)]

Switch adapters with a single env var:

STORAGE_BACKEND=sqlite   # default, uses DATABASE_URL=sqlite:///kwasi.db
STORAGE_BACKEND=postgres # uses DATABASE_URL from Railway

Schema

The table/column listing below is hand-maintained reference. At runtime the agent never relies on it: describe_schema() (on StoragePort + both adapters) introspects the live schema from information_schema (Postgres) / sqlite_master + PRAGMA table_info (SQLite) and injects it into the system prompt as a "## Live database schema" section, so the raw-SQL tools (db_query / db_execute) can never drift from the real database. It is cached for the process lifetime and fails safe to an empty section.

interactions

Every conversation exchange, from any channel.

Column Type Notes
id TEXT (UUID) Primary key
user_message TEXT What you sent
agent_response TEXT What Kwasi replied
tools_used TEXT JSON array of tool call records
channel TEXT "telegram", "cli", or "whatsapp"
user_id TEXT Telegram user ID (nullable)
created_at TIMESTAMP UTC
embedding TEXT (SQLite) / halfvec(3072) (Postgres) 3072-float embedding from Gemini gemini-embedding-001 (16-bit on Postgres). NULL until embedded. Used by semantic_search.

notes

Column Type Notes
id TEXT (UUID) Primary key
title TEXT
content TEXT
created_at TIMESTAMP UTC
embedding TEXT (SQLite) / halfvec(3072) (Postgres) 3072-float embedding from Gemini gemini-embedding-001 (16-bit on Postgres). NULL until embedded. Used by semantic_search.

tasks

Column Type Notes
id TEXT (UUID) Primary key
title TEXT
status TEXT "todo" or "done"
due_date TEXT ISO date string (YYYY-MM-DD), nullable
notified BOOLEAN true once a due-date notification has been sent
created_at TIMESTAMP UTC

reminders

Column Type Notes
id TEXT (UUID) Primary key
user_id TEXT Telegram user ID or WhatsApp phone number
chat_id TEXT Delivery target: Telegram chat ID or WhatsApp phone number
message TEXT The reminder text
remind_at TIMESTAMP UTC — when to fire
status TEXT "pending", "sent", or "cancelled"
channel TEXT "telegram" or "whatsapp" — delivery channel
created_at TIMESTAMP UTC

context

Dual-purpose: it stores the long-term user profile produced by the Reflection Engine and doubles as a general-purpose system key-value store — runtime config overrides, deployment markers, loop heartbeats, and auth caches all live here under system:* keys. Access is exact-key only (get_context(user_id) / save_context(...)); there is no prefix scan, so consumers read a known fixed set of keys.

Column Type Notes
user_id TEXT Primary key. Holds the profile or a system key (see below).
content TEXT Markdown profile ≤550 words (for "global"); ISO date string, JSON, or iso\|status\|detail for system keys; "" = cleared (read back as absent).
last_updated TIMESTAMP UTC

System keys (user_id prefixes):

Key Written by Content
global Reflection Engine The user profile (markdown)
system:briefing morning-briefing loop Today's date — dedups the daily send across restarts
system:stale_check reflection This week's Monday — weekly stale-facts nudge cooldown
system:meeting_prep:<event_id> meeting-prep loop Per-event prep-sent marker
system:meeting_followup:gmail:<message_id> meeting-followup loop Per-email processed marker
system:config:<field> set_runtime_config A runtime config override, e.g. system:config:model_nameanthropic:claude-sonnet-4-6. Re-applied at boot by load_overrides.
system:pending_self_redeploy railway_set_env / railway_redeploy executors JSON marker {chat_id, kind, change, expected, triggered_at, prev_deploy_id}. Consumed on next boot by confirm_pending_self_redeploy to post a ✅/⚠️.
system:<loop>:heartbeat background loops Loop health signal iso\|status\|detail (e.g. 2026-06-09T…\|ok\|3_processed). diagnose_self reads these to surface silent failures.
system:outlook_msal_cache Outlook MSAL Serialized MSAL token cache (opaque)

user_facts

Durable key-value facts about the user, treated as ground truth. Written by the agent (remember_fact) or auto-extracted by nightly reflection. Fetched at runtime via recall_facts()not injected into the system prompt. Mostly permanent, but decay_user_facts() removes a narrow class of rapidly-stale auto-facts (date-stamped snapshots, completed-task records, external-system shadows) once per nightly reflection.

Column Type Notes
id TEXT (UUID) Primary key
key TEXT UNIQUE. snake_case identifier, e.g. home_address, partner_name
value TEXT The actual fact value
category TEXT location, personal, preference, work, health, or general
source TEXT "agent" (proactive/explicit) or "reflection" (auto-extracted nightly)
created_at TIMESTAMP UTC — set on first insert, never changed
updated_at TIMESTAMP UTC — refreshed on every upsert

The unique index on key means save_user_fact is always an upsert — writing the same key a second time overwrites the value.

news_topics

Explicitly followed news topics.

Column Type Notes
id TEXT (UUID) Primary key
topic TEXT Unique, stored lowercase
created_at TIMESTAMP UTC

seen_news

Rolling 7-day dedup store for fetched news stories.

Column Type Notes
url TEXT Primary key
title TEXT Story headline
seen_at TIMESTAMP UTC — entries older than 7 days are purged on next get_news call

scheduled_tasks

User-defined recurring jobs evaluated by _user_scheduled_tasks_loop every 60 seconds.

Column Type Notes
id TEXT (UUID) Primary key
name TEXT Human-readable label
prompt TEXT What to run the agent with when the task fires
cron_expression TEXT Standard 5-field cron, e.g. 0 9 * * 1 = Monday 9am
timezone TEXT IANA timezone (defaults to USER_TIMEZONE)
enabled BOOLEAN true to run, false to pause
created_by TEXT "user" or "system"
last_run TIMESTAMP UTC — last time this task successfully fired
created_at TIMESTAMP UTC

audit_log

Every tool call the agent makes, with sanitised arguments and truncated output.

Column Type Notes
id TEXT (UUID) Primary key
timestamp TIMESTAMP UTC
tool_name TEXT Name of the tool called
arguments TEXT JSON — keys matching key, token, secret, password are redacted to "***"
result_summary TEXT First 200 characters of tool output
channel TEXT "telegram", "whatsapp", or "cli"
user_id TEXT Nullable

read_later

Articles saved by the user via the Content Curator skill.

Column Type Notes
id TEXT (UUID) Primary key (prefix match supported in delete_read_later)
url TEXT Original URL
title TEXT Article title
summary TEXT Auto-generated at save time via LLM summarisation
note TEXT Optional user annotation
tags TEXT Comma-separated keywords extracted from title + summary at save time (frequency-ranked, stop-word filtered, up to 10). Used by find_relevant_read_later() to surface contextually relevant articles.
added_at TIMESTAMP UTC
read_at TIMESTAMP Set when marked read via mark_read_later_read (nullable); list_read_later(include_read=False) filters on it
embedding TEXT (SQLite) / halfvec(3072) (Postgres) 3072-float embedding from Gemini gemini-embedding-001 (16-bit on Postgres). NULL until embedded. Used by semantic_search.

alert_rules

Proactive alert rules evaluated every 5 minutes by _alert_loop. Two system defaults seeded on first run.

Column Type Notes
id TEXT (UUID) Primary key
name TEXT Human-readable label
trigger_type TEXT task_due_today, task_overdue (phase 2: meeting_soon, email_arrived)
conditions TEXT JSON object of trigger-specific params (e.g. {"fire_hour": 11})
cooldown_hours INTEGER Minimum hours between consecutive firings — prevents spam
enabled BOOLEAN true to evaluate, false to pause
created_by TEXT "user" or "system"
last_fired TIMESTAMP UTC — last time this rule triggered a message
created_at TIMESTAMP UTC

agent_learnings

Behavioral corrections extracted by the nightly Reflection Engine. Promoted from "candidate" to "active" when seen in ≥2 reflection cycles.

Column Type Notes
id TEXT (UUID) Primary key
rule TEXT UNIQUE (normalized lowercase). Short imperative sentence ≤200 chars, e.g. "always confirm before deleting any item"
category TEXT response_format, tool_usage, decision_making, tone, or scope
recurrence_count INTEGER Incremented each time the same rule is re-extracted — triggers promotion at ≥2
status TEXT "candidate" (new, not yet confirmed) or "active" (shown in system prompt)
first_seen TIMESTAMP UTC
last_seen TIMESTAMP UTC

pending_actions

Holds consequential actions awaiting user confirmation via the Telegram inline approval flow. Created by approval_gate() when the agent calls a gated tool; resolved by the user tapping Confirm, Cancel, or Edit.

Column Type Notes
id TEXT (UUID) Primary key
tool_name TEXT e.g. "create_task", "send_email_wrapper", "__plan__", "__plan_resume__"
action_type TEXT "task", "reminder", "note", "journal", "email", "calendar", "todo", "slack", "code", "delegation", "database", "config", "deployment", "plan" (Spec 008 — multi-step plan awaiting Confirm), or "plan_resume" (Spec 008 — partial-failure resume)
payload TEXT JSON-serialized kwargs to pass to the executor on Confirm. For "plan" this is the serialised ExecutionPlan; for "plan_resume" it is the serialised PlanResumePayload (plan + remaining_steps + scratchpad + failed_step + failure_reason).
preview_text TEXT Human-readable description shown to the user
chat_id TEXT Telegram chat ID that triggered the action
message_id INTEGER Telegram message ID of the preview message (set after send, used by expiry loop to edit)
status TEXT "pending", "confirmed", "cancelled", "expired", or "failed"
notified BOOLEAN true once the user has seen an expiry notice
edit_count INTEGER Number of times the user has used the Edit button on this action
original_user_message TEXT The user's original message, preserved for the Edit re-run prompt
trace_id TEXT Nullable. OTEL trace ID captured at gate time so the eventual Confirm/Cancel/Edit decision can attach a user_approval / user_edit score to the originating trace asynchronously (Spec 009 — Langfuse observability).
created_at TIMESTAMP UTC
expires_at TIMESTAMP UTC — created_at + 30 min; gate rejects execution after this point

Indexed on (chat_id, status) and (expires_at, status) for efficient per-chat and expiry lookups.

pending_intentions

Soft personal commitments extracted from conversation by the nightly Reflection Engine. Follow-ups delivered proactively.

Column Type Notes
id TEXT (UUID) Primary key
text TEXT The intention text, e.g. "call doctor about knee"
mentioned_at TIMESTAMP UTC — when the intention was first expressed
follow_up_days INTEGER Days until first follow-up (default 3)
follow_up_at TIMESTAMP UTC — next scheduled follow-up delivery
status TEXT pending, snoozed, resolved, dismissed
last_follow_up_at TIMESTAMP UTC — when the last follow-up was sent (nullable)
created_at TIMESTAMP UTC

journal_entries

Private journal entries saved by the agent via save_journal_entry. Included in the nightly Reflection Engine context so they inform the long-term narrative profile and intention extraction. A weekly digest fires via _journal_digest_loop.

Column Type Notes
id TEXT (UUID) Primary key
content TEXT Full text of the entry
title TEXT ≤10-word title capturing the main theme, generated by the model
source TEXT "voice" (default) or "text" — how the entry was created
created_at TIMESTAMP UTC

Indexed on created_at for efficient time-range lookups. Entries are never embedded (too personal/short for vector search) — retrieved by date range only.

health_samples

Wearable / Health Connect samples ingested by the Android bridge (Spec 010). Single normalised, append-only table covering all 11 metric types with a flexible JSON value payload — schema is unchanged when adding a new metric type.

Column Type Notes
id BIGSERIAL (Postgres) / INTEGER AUTOINCREMENT (SQLite) Primary key
metric_type TEXT One of: steps, sleep_session, heart_rate, hrv_rmssd, spo2, resting_hr, respiratory_rate, exercise, body_fat, weight, blood_pressure
start_time TIMESTAMP WITH TIME ZONE UTC
end_time TIMESTAMP WITH TIME ZONE UTC, nullable — null for instantaneous samples (HR, HRV, SpO2, RHR, …); set for sessions (sleep, exercise) and intervals (steps)
value JSONB (Postgres) / TEXT (SQLite) Metric-specific payload — e.g. {"bpm": 62}, {"rmssd_ms": 38}, {"score": 78, "stages": [...]}, {"count": 4231}
source_device TEXT e.g. Galaxy Watch 5 Pro
source_app TEXT e.g. com.sec.android.app.shealth
created_at TIMESTAMP WITH TIME ZONE UTC, when the row landed in the server

Indexes: - UNIQUE (metric_type, start_time, source_device) — idempotency key; duplicate POSTs from the bridge are dropped silently - (metric_type, start_time DESC) — for time-range reads by the agent's health tools

Writes never embed (high volume, low semantic value). Reads are always time-bounded — get_health_samples(metric_type, since, until=None, limit=5000).

capability_gaps

Logged when Kwasi declines a request. Classified by whether the underlying capability exists in the deployed source — see Architecture → Self-Improvement Loop.

Column Type Notes
id TEXT (UUID) Primary key
user_message TEXT The original user message
agent_response TEXT Kwasi's decline response
classification TEXT "gap" (no source matches → real capability hole), "available" (source matches found → routing/prompt failure), or "unknown"
suspected_capability TEXT Short noun phrase extracted by the mini-model classifier, e.g. "linkedin scraping"
classifier_reason TEXT Why this classification — combines grep-result summary + mini-model rationale
grep_evidence TEXT Truncated grep_source output that informed the classification (nullable)
status TEXT "open", "resolved", or "dismissed"
created_at TIMESTAMP UTC
resolved_at TIMESTAMP UTC, nullable

Indexes: (status, created_at DESC) for the digest, (classification) for filtering by class.

proposed_skills

Drafts produced by propose_skill — a complete .py file held in the DB until activation writes it to app/skills/.

Column Type Notes
id TEXT (UUID) Primary key
gap_id TEXT Nullable. Source capability_gap id, if drafted from one
name TEXT Snake-case slug, e.g. "linkedin_summary" — becomes app/skills/{name}.py on activation
description TEXT Free-form description provided by the proposer
code TEXT Full .py file content. Never on disk until activated
validation_status TEXT "pending", "passed", or "failed"
validation_output TEXT Validator report (passed/failed reason)
status TEXT "draft", "activated", or "rejected"
created_at TIMESTAMP UTC
activated_at TIMESTAMP UTC, nullable

Indexes: (status, created_at DESC) for listing drafts, (name) for collision checks.

delegations

Each row tracks one delegate_to_coding_agent or delegate_web_task invocation — see Architecture → Task Delegation.

Column Type Notes
id TEXT (UUID) Primary key
task_description TEXT The task sent to the sandbox
status TEXT "queued", "running", "completed", "failed", "cancelled"
backend TEXT "opencode" (default), "claude_code", or "web_task"
chat_id TEXT Telegram chat to deliver result to (nullable)
session_id TEXT Backend session id (for future --resume; nullable)
started_at / completed_at TIMESTAMP UTC (nullable)
result_summary TEXT Final answer (default '')
files_summary TEXT Comma-separated generated-file paths (default '')
duration_seconds INT Wall-clock (nullable)
input_tokens / output_tokens INT Stream-json telemetry (Claude Code path only; nullable)
estimated_cost_usd REAL/NUMERIC Soft-cap enforced for Claude Code; null otherwise
error TEXT Failure summary (nullable)
credential TEXT web_task only: name of a vault credential used for login — never the secret (nullable)
created_at TIMESTAMP UTC

Indexes: (status, created_at DESC) for listing + recovering orphans on container restart.


vault_credentials

Encrypted website logins for delegate_web_task(credential=...) — see Deployment → Credential Vault. The *_enc columns are Fernet ciphertext (master key from VAULT_MASTER_KEY, env only); domains is plaintext (not a secret — needed for allowed_domains scoping). db_query refuses this table.

Column Type Notes
id TEXT (UUID) Primary key
service TEXT Unique lookup name, e.g. navette
domains TEXT Comma-separated hostnames (plaintext)
username_enc / password_enc TEXT Fernet ciphertext
totp_seed_enc TEXT Fernet ciphertext, nullable
created_at / updated_at TIMESTAMP UTC

Data Models

@dataclass
class SemanticSearchResult:
    source: str              # "notes" | "interactions" | "read_later"
    id: str
    title: str               # note title / "Conversation YYYY-MM-DD" / article title
    snippet: str             # first 200 chars of relevant content
    similarity: float        # cosine similarity 0.0–1.0
    created_at: datetime | None  # used for recency weighting in context injection

StoragePort Protocol

Defined in app/memory/ports.py (the single source of truth — read it directly rather than relying on a copy here). Every method is async. The protocol groups into:

  • Interactionssave_interaction, get_interactions, get_interactions_by_user, get_interactions_since, get_interactions_by_ids, search_interactions
  • Context / facts / learnings / intentionsget_context/save_context; user-fact CRUD + decay_user_facts; save_agent_learning/get_agent_learnings/delete_agent_learning; intention CRUD + get_due_intentions/count_followups_today
  • Notes / tasks / reminders / read-later / journal / news — standard CRUD (+ get_tasks_due/mark_task_notified, mark_read_later_read, get_due_reminders)
  • Scheduled tasks / alert rules / audit log — CRUD + get_request_log/delete_old_audit_entries
  • Pending actions (approval gate), Delegations (save/get/update/count_delegations_today/list_running_delegations), Vault (save_vault_entry/get_vault_entry/list_vault_services/list_vault_meta)
  • Self-improvement — capability-gap + proposed-skill CRUD
  • Health samplessave_health_samples (idempotent), get_health_samples, get_health_freshness
  • Searchsemantic_search, hybrid_search (keyword + dense, RRF-fused)
  • Raw SQL / schemaraw_sql_query (read-only), raw_sql_execute, raw_sql_dryrun, describe_schema
  • close
Representative signatures
class StoragePort(Protocol):
    async def save_interaction(self, interaction: Interaction) -> None: ...
    async def get_interactions(self, limit: int = 50) -> list[Interaction]: ...
    async def get_interactions_by_user(self, user_id: str, limit: int = 20) -> list[Interaction]: ...
    async def get_interactions_since(self, since: datetime) -> list[Interaction]: ...
    async def search_interactions(self, query: str, limit: int = 10) -> list[Interaction]: ...

    # Context
    async def get_context(self, user_id: str) -> UserContext | None: ...
    async def save_context(self, context: UserContext) -> None: ...

    # Notes
    async def save_note(self, note: Note) -> None: ...
    async def get_notes(self) -> list[Note]: ...
    async def search_notes(self, query: str) -> list[Note]: ...
    async def update_note(self, note_id: str, title: str, content: str) -> bool: ...
    async def delete_note(self, note_id: str) -> bool: ...

    # Tasks
    async def create_task(self, task: Task) -> None: ...
    async def list_tasks(self, status: str | None = None) -> list[Task]: ...
    async def search_tasks(self, query: str) -> list[Task]: ...
    async def get_tasks_due(self) -> list[Task]: ...
    async def mark_task_notified(self, task_id: str) -> None: ...
    async def update_task(self, task_id: str, title: str | None, due_date: str | None) -> bool: ...
    async def complete_task(self, task_id: str) -> bool: ...
    async def delete_task(self, task_id: str) -> bool: ...

    # Reminders
    async def create_reminder(self, reminder: Reminder) -> None: ...
    async def get_due_reminders(self) -> list[Reminder]: ...
    async def list_reminders(self, user_id: str | None = None) -> list[Reminder]: ...
    async def update_reminder_status(self, reminder_id: str, status: str) -> None: ...
    async def cancel_reminder(self, reminder_id: str) -> bool: ...

    # Scheduled tasks
    async def create_scheduled_task(self, task: ScheduledTask) -> None: ...
    async def list_scheduled_tasks(self) -> list[ScheduledTask]: ...
    async def update_scheduled_task(self, task_id: str, **kwargs) -> bool: ...
    async def delete_scheduled_task(self, task_id: str) -> bool: ...
    async def update_scheduled_task_last_run(self, task_id: str, last_run: datetime) -> None: ...

    # Audit log
    async def log_audit_entry(self, entry: AuditEntry) -> None: ...
    async def get_audit_log(self, limit: int = 50) -> list[AuditEntry]: ...

    # News
    async def add_news_topic(self, topic: NewsTopic) -> None: ...
    async def remove_news_topic(self, topic: str) -> bool: ...
    async def list_news_topics(self) -> list[NewsTopic]: ...
    async def add_seen_story(self, story: SeenStory) -> None: ...
    async def get_seen_urls(self) -> set[str]: ...
    async def cleanup_seen_stories(self, days: int = 7) -> None: ...

    # Read Later
    async def save_read_later(self, item: ReadLaterItem) -> None: ...
    async def list_read_later(self) -> list[ReadLaterItem]: ...
    async def delete_read_later(self, item_id: str) -> None: ...

    # User Facts
    async def save_user_fact(self, fact: UserFact) -> None: ...      # upsert on key
    async def get_user_fact(self, key: str) -> UserFact | None: ...
    async def get_all_user_facts(self) -> list[UserFact]: ...        # ordered by category, key
    async def search_user_facts(self, query: str) -> list[UserFact]: ...
    async def delete_user_fact(self, key: str) -> bool: ...

    # Alert Rules
    async def create_alert_rule(self, rule: AlertRule) -> None: ...
    async def list_alert_rules(self, enabled_only: bool = False) -> list[AlertRule]: ...
    async def update_alert_rule(self, rule_id: str, **kwargs) -> bool: ...
    async def delete_alert_rule(self, rule_id: str) -> bool: ...
    async def update_alert_rule_last_fired(self, rule_id: str, fired_at: datetime) -> None: ...

    # Pending Intentions
    async def create_intention(self, intention: PendingIntention) -> None: ...
    async def list_intentions(self, status: str | None = None) -> list[PendingIntention]: ...
    async def get_due_intentions(self) -> list[PendingIntention]: ...
    async def update_intention(self, intention_id: str, **kwargs) -> bool: ...
    async def count_followups_today(self) -> int: ...

    # Agent Learnings
    async def save_agent_learning(self, learning: AgentLearning) -> None: ...  # upsert on rule
    async def get_agent_learnings(self, status: str | None = None) -> list[AgentLearning]: ...
    async def delete_agent_learning(self, learning_id: str) -> bool: ...

    # Pending Actions (approval flow)
    async def save_pending_action(self, action: PendingAction) -> None: ...
    async def get_pending_action(self, action_id: str) -> PendingAction | None: ...
    async def update_pending_action(self, action_id: str, **kwargs) -> bool: ...
    async def list_expired_pending_actions(self) -> list[PendingAction]: ...  # pending rows past expires_at
    async def get_pending_actions_for_chat(self, chat_id: str, status: str = "pending") -> list[PendingAction]: ...
    async def delete_old_pending_actions(self, days: int = 7) -> int: ...    # prune confirmed/cancelled/expired rows
    async def delete_old_audit_entries(self, days: int = 90) -> int: ...     # prune old audit_log rows

    # Journal Entries
    async def save_journal_entry(self, entry: JournalEntry) -> None: ...
    async def list_journal_entries(self, days: int | None = None) -> list[JournalEntry]: ...

    # Health Samples (Spec 010 — Wearable / Health Connect ingest)
    async def save_health_samples(self, samples: list[dict]) -> tuple[int, int]: ...
        # Idempotent on (metric_type, start_time, source_device); returns (accepted, duplicates).
    async def get_health_samples(
        self,
        metric_type: str,
        since: datetime,
        until: datetime | None = None,
        limit: int = 5000,
    ) -> list[dict]: ...

    # Semantic Search
    async def semantic_search(
        self,
        embedding: list[float],
        sources: list[str] | None = None,  # default: all three
        limit: int = 5,
    ) -> list[SemanticSearchResult]: ...

    async def close(self) -> None: ...

Adapter Differences

Feature SQLite PostgreSQL
Case-insensitive search lower(column) LIKE lower(pattern) — fully case-insensitive for all Unicode ILIKE — always case-insensitive
Timestamps Stored as ISO 8601 strings Native TIMESTAMP WITH TIME ZONE
Connection Single persistent connection asyncpg connection pool (1–10)
Schema init CREATE TABLE IF NOT EXISTS on first connect Same, plus SSL fallback
Migrations ALTER TABLE ... ADD COLUMN in try/except ALTER TABLE ... ADD COLUMN IF NOT EXISTS
Boolean INTEGER (0/1) Native BOOLEAN
Vector search Python cosine similarity (full scan) pgvector <=> cosine operator with HNSW index
Embedding storage JSON text array Native halfvec(3072) type (16-bit; requires pgvector ≥0.7 extension)

Both adapters implement semantic_search() which finds content by meaning rather than keywords. The query text is embedded via embed_text() in app/tools/embedding.py (Gemini gemini-embedding-001, 3072 dimensions) before calling this method.

SQLite (local dev): Fetches all rows with non-null embeddings, computes Python cosine similarity in memory, sorts and truncates to limit. Streams row-by-row via cursor — does not load all embeddings into memory at once. Acceptable for a personal assistant's data volumes.

PostgreSQL (production): Uses the pgvector extension with the <=> cosine distance operator and an HNSW index (halfvec_cosine_ops) on each table. Each source independently returns its best limit candidates; results are merged and re-ranked before returning the top limit overall. Embeddings are stored as halfvec(3072) (16-bit floats) — full-precision vector(3072) exceeds pgvector's 2000-dim HNSW limit and would force a sequential scan.

Graceful degradation: If GOOGLE_API_KEY is not set, embed_text() returns None and no embedding is stored or queried — keyword search continues to work normally. If the pgvector extension is unavailable on Postgres, the CREATE EXTENSION and ADD COLUMN statements fail silently and semantic_search() returns an empty list.

Backfilling: Existing rows without embeddings can be embedded in bulk via POST /embed-backfill (protected by X-Reflection-Secret). Rate-limited to ~10 rows/second.


Schema Migrations

Neither adapter uses a migration framework. New columns are added via ALTER TABLE statements run on every startup. New tables are created with CREATE TABLE IF NOT EXISTS. On Postgres these use ADD COLUMN IF NOT EXISTS (idempotent); on SQLite (no IF NOT EXISTS for columns) the loop catches only the "duplicate column name" OperationalError and re-raises anything else, so a genuinely broken migration no longer silently leaves a half-built schema. No rollback support.

Current migrations applied at startup:

  • tasks.notified — due-date notification tracking
  • audit_log.duration_ms, audit_log.tokens_used — request timing and token tracking
  • read_later.tags — keyword tags for relevance matching; read_later.read_at — read-state filtering
  • reminders.channel — delivery channel (telegram/whatsapp)
  • pending_actions.trace_id — Langfuse trace id for delayed approval scoring
  • delegations.backend, delegations.credential — backend label + vault-credential name (web_task)
  • notes.embedding, interactions.embedding, read_later.embedding — vector embeddings for semantic search (added as TEXT in SQLite, halfvec(3072) in Postgres via pgvector ≥0.7 extension; legacy vector(3072) columns auto-migrate in-place on first pool init)
  • health_samples, vault_credentials, capability_gaps, proposed_skills — created on first connect for both adapters