Skip to content

Storage Layer

Storage is abstracted behind a StoragePort protocol. The agent and all tools talk to the protocol — never to a specific adapter directly.


Architecture

graph TD
    AG[Agent tools] --> SP[StoragePort protocol\napp/memory/ports.py]
    BG[Background loops] --> SP
    DASH[Dashboard router] --> SP
    SP --> SQ[SqliteAdapter\napp/memory/adapters/sqlite.py]
    SP --> PG[PostgresAdapter\napp/memory/adapters/postgres.py]
    SQ --> DB1[(kwasi.db\nlocal dev)]
    PG --> DB2[(PostgreSQL\nRailway prod)]

Switch adapters with a single env var:

STORAGE_BACKEND=sqlite   # default, uses DATABASE_URL=sqlite:///kwasi.db
STORAGE_BACKEND=postgres # uses DATABASE_URL from Railway

Schema

interactions

Every conversation exchange, from any channel.

Column Type Notes
id TEXT (UUID) Primary key
user_message TEXT What you sent
agent_response TEXT What Kwasi replied
tools_used TEXT JSON array of tool call records
channel TEXT "telegram", "cli", or "whatsapp"
user_id TEXT Telegram user ID (nullable)
created_at TIMESTAMP UTC
embedding TEXT (SQLite) / halfvec(3072) (Postgres) 3072-float embedding from Gemini gemini-embedding-001 (16-bit on Postgres). NULL until embedded. Used by semantic_search.

notes

Column Type Notes
id TEXT (UUID) Primary key
title TEXT
content TEXT
created_at TIMESTAMP UTC
embedding TEXT (SQLite) / halfvec(3072) (Postgres) 3072-float embedding from Gemini gemini-embedding-001 (16-bit on Postgres). NULL until embedded. Used by semantic_search.

tasks

Column Type Notes
id TEXT (UUID) Primary key
title TEXT
status TEXT "todo" or "done"
due_date TEXT ISO date string (YYYY-MM-DD), nullable
notified BOOLEAN true once a due-date notification has been sent
created_at TIMESTAMP UTC

reminders

Column Type Notes
id TEXT (UUID) Primary key
user_id TEXT Telegram user ID or WhatsApp phone number
chat_id TEXT Delivery target: Telegram chat ID or WhatsApp phone number
message TEXT The reminder text
remind_at TIMESTAMP UTC — when to fire
status TEXT "pending", "sent", or "cancelled"
channel TEXT "telegram" or "whatsapp" — delivery channel
created_at TIMESTAMP UTC

context

Stores the long-term user profile produced by the Reflection Engine, plus system cooldown state.

Column Type Notes
user_id TEXT Primary key — "global" for the user profile; "system:briefing" for daily briefing dedup; "system:stale_check" for the weekly stale-facts nudge cooldown
content TEXT Markdown profile ≤550 words (for "global"); ISO date string for system keys
last_updated TIMESTAMP UTC

user_facts

Permanent key-value facts about the user. Never pruned. Written by the agent (remember_fact tool) or auto-extracted by nightly reflection.

Column Type Notes
id TEXT (UUID) Primary key
key TEXT UNIQUE. snake_case identifier, e.g. home_address, partner_name
value TEXT The actual fact value
category TEXT location, personal, preference, work, health, or general
source TEXT "agent" (proactive/explicit) or "reflection" (auto-extracted nightly)
created_at TIMESTAMP UTC — set on first insert, never changed
updated_at TIMESTAMP UTC — refreshed on every upsert

The unique index on key means save_user_fact is always an upsert — writing the same key a second time overwrites the value.

news_topics

Explicitly followed news topics.

Column Type Notes
id TEXT (UUID) Primary key
topic TEXT Unique, stored lowercase
created_at TIMESTAMP UTC

seen_news

Rolling 7-day dedup store for fetched news stories.

Column Type Notes
url TEXT Primary key
title TEXT Story headline
seen_at TIMESTAMP UTC — entries older than 7 days are purged on next get_news call

scheduled_tasks

User-defined recurring jobs evaluated by _user_scheduled_tasks_loop every 60 seconds.

Column Type Notes
id TEXT (UUID) Primary key
name TEXT Human-readable label
prompt TEXT What to run the agent with when the task fires
cron_expression TEXT Standard 5-field cron, e.g. 0 9 * * 1 = Monday 9am
timezone TEXT IANA timezone (defaults to USER_TIMEZONE)
enabled BOOLEAN true to run, false to pause
created_by TEXT "user" or "system"
last_run TIMESTAMP UTC — last time this task successfully fired
created_at TIMESTAMP UTC

audit_log

Every tool call the agent makes, with sanitised arguments and truncated output.

Column Type Notes
id TEXT (UUID) Primary key
timestamp TIMESTAMP UTC
tool_name TEXT Name of the tool called
arguments TEXT JSON — keys matching key, token, secret, password are redacted to "***"
result_summary TEXT First 200 characters of tool output
channel TEXT "telegram", "whatsapp", or "cli"
user_id TEXT Nullable

read_later

Articles saved by the user via the Content Curator skill.

Column Type Notes
id TEXT (UUID) Primary key (prefix match supported in delete_read_later)
url TEXT Original URL
title TEXT Article title
summary TEXT Auto-generated at save time via LLM summarisation
note TEXT Optional user annotation
tags TEXT Comma-separated keywords extracted from title + summary at save time (frequency-ranked, stop-word filtered, up to 10). Used by find_relevant_read_later() to surface contextually relevant articles.
added_at TIMESTAMP UTC
embedding TEXT (SQLite) / halfvec(3072) (Postgres) 3072-float embedding from Gemini gemini-embedding-001 (16-bit on Postgres). NULL until embedded. Used by semantic_search.

alert_rules

Proactive alert rules evaluated every 5 minutes by _alert_loop. Two system defaults seeded on first run.

Column Type Notes
id TEXT (UUID) Primary key
name TEXT Human-readable label
trigger_type TEXT task_due_today, task_overdue (phase 2: meeting_soon, email_arrived)
conditions TEXT JSON object of trigger-specific params (e.g. {"fire_hour": 11})
cooldown_hours INTEGER Minimum hours between consecutive firings — prevents spam
enabled BOOLEAN true to evaluate, false to pause
created_by TEXT "user" or "system"
last_fired TIMESTAMP UTC — last time this rule triggered a message
created_at TIMESTAMP UTC

agent_learnings

Behavioral corrections extracted by the nightly Reflection Engine. Promoted from "candidate" to "active" when seen in ≥2 reflection cycles.

Column Type Notes
id TEXT (UUID) Primary key
rule TEXT UNIQUE (normalized lowercase). Short imperative sentence ≤200 chars, e.g. "always confirm before deleting any item"
category TEXT response_format, tool_usage, decision_making, tone, or scope
recurrence_count INTEGER Incremented each time the same rule is re-extracted — triggers promotion at ≥2
status TEXT "candidate" (new, not yet confirmed) or "active" (shown in system prompt)
first_seen TIMESTAMP UTC
last_seen TIMESTAMP UTC

pending_actions

Holds consequential actions awaiting user confirmation via the Telegram inline approval flow. Created by approval_gate() when the agent calls a gated tool; resolved by the user tapping Confirm, Cancel, or Edit.

Column Type Notes
id TEXT (UUID) Primary key
tool_name TEXT e.g. "create_task", "send_email_wrapper", "__plan__", "__plan_resume__"
action_type TEXT "task", "reminder", "note", "email", "calendar", "todo", "slack", "plan" (Spec 008 — multi-step plan awaiting Confirm), or "plan_resume" (Spec 008 — partial-failure resume)
payload TEXT JSON-serialized kwargs to pass to the executor on Confirm. For "plan" this is the serialised ExecutionPlan; for "plan_resume" it is the serialised PlanResumePayload (plan + remaining_steps + scratchpad + failed_step + failure_reason).
preview_text TEXT Human-readable description shown to the user
chat_id TEXT Telegram chat ID that triggered the action
message_id INTEGER Telegram message ID of the preview message (set after send, used by expiry loop to edit)
status TEXT "pending", "confirmed", "cancelled", "expired", or "failed"
notified BOOLEAN true once the user has seen an expiry notice
edit_count INTEGER Number of times the user has used the Edit button on this action
original_user_message TEXT The user's original message, preserved for the Edit re-run prompt
trace_id TEXT Nullable. OTEL trace ID captured at gate time so the eventual Confirm/Cancel/Edit decision can attach a user_approval / user_edit score to the originating trace asynchronously (Spec 009 — Langfuse observability).
created_at TIMESTAMP UTC
expires_at TIMESTAMP UTC — created_at + 30 min; gate rejects execution after this point

Indexed on (chat_id, status) and (expires_at, status) for efficient per-chat and expiry lookups.

pending_intentions

Soft personal commitments extracted from conversation by the nightly Reflection Engine. Follow-ups delivered proactively.

Column Type Notes
id TEXT (UUID) Primary key
text TEXT The intention text, e.g. "call doctor about knee"
mentioned_at TIMESTAMP UTC — when the intention was first expressed
follow_up_days INTEGER Days until first follow-up (default 3)
follow_up_at TIMESTAMP UTC — next scheduled follow-up delivery
status TEXT pending, snoozed, resolved, dismissed
last_follow_up_at TIMESTAMP UTC — when the last follow-up was sent (nullable)
created_at TIMESTAMP UTC

journal_entries

Private journal entries saved by the agent via save_journal_entry. Included in the nightly Reflection Engine context so they inform the long-term narrative profile and intention extraction. A weekly digest fires via _journal_digest_loop.

Column Type Notes
id TEXT (UUID) Primary key
content TEXT Full text of the entry
title TEXT ≤10-word title capturing the main theme, generated by the model
source TEXT "voice" (default) or "text" — how the entry was created
created_at TIMESTAMP UTC

Indexed on created_at for efficient time-range lookups. Entries are never embedded (too personal/short for vector search) — retrieved by date range only.

health_samples

Wearable / Health Connect samples ingested by the Android bridge (Spec 010). Single normalised, append-only table covering all 11 metric types with a flexible JSON value payload — schema is unchanged when adding a new metric type.

Column Type Notes
id BIGSERIAL (Postgres) / INTEGER AUTOINCREMENT (SQLite) Primary key
metric_type TEXT One of: steps, sleep_session, heart_rate, hrv_rmssd, spo2, resting_hr, respiratory_rate, exercise, body_fat, weight, blood_pressure
start_time TIMESTAMP WITH TIME ZONE UTC
end_time TIMESTAMP WITH TIME ZONE UTC, nullable — null for instantaneous samples (HR, HRV, SpO2, RHR, …); set for sessions (sleep, exercise) and intervals (steps)
value JSONB (Postgres) / TEXT (SQLite) Metric-specific payload — e.g. {"bpm": 62}, {"rmssd_ms": 38}, {"score": 78, "stages": [...]}, {"count": 4231}
source_device TEXT e.g. Galaxy Watch 5 Pro
source_app TEXT e.g. com.sec.android.app.shealth
created_at TIMESTAMP WITH TIME ZONE UTC, when the row landed in the server

Indexes: - UNIQUE (metric_type, start_time, source_device) — idempotency key; duplicate POSTs from the bridge are dropped silently - (metric_type, start_time DESC) — for time-range reads by the agent's health tools

Writes never embed (high volume, low semantic value). Reads are always time-bounded — get_health_samples(metric_type, since, until=None, limit=5000).


Data Models

@dataclass
class SemanticSearchResult:
    source: str              # "notes" | "interactions" | "read_later"
    id: str
    title: str               # note title / "Conversation YYYY-MM-DD" / article title
    snippet: str             # first 200 chars of relevant content
    similarity: float        # cosine similarity 0.0–1.0
    created_at: datetime | None  # used for recency weighting in context injection

StoragePort Protocol

Defined in app/memory/ports.py. Every method is async.

class StoragePort(Protocol):
    # Interactions
    async def save_interaction(self, interaction: Interaction) -> None: ...
    async def get_interactions(self, limit: int = 50) -> list[Interaction]: ...
    async def get_interactions_by_user(self, user_id: str, limit: int = 20) -> list[Interaction]: ...
    async def get_interactions_since(self, since: datetime) -> list[Interaction]: ...
    async def search_interactions(self, query: str, limit: int = 10) -> list[Interaction]: ...

    # Context
    async def get_context(self, user_id: str) -> UserContext | None: ...
    async def save_context(self, context: UserContext) -> None: ...

    # Notes
    async def save_note(self, note: Note) -> None: ...
    async def get_notes(self) -> list[Note]: ...
    async def search_notes(self, query: str) -> list[Note]: ...
    async def update_note(self, note_id: str, title: str, content: str) -> bool: ...
    async def delete_note(self, note_id: str) -> bool: ...

    # Tasks
    async def create_task(self, task: Task) -> None: ...
    async def list_tasks(self, status: str | None = None) -> list[Task]: ...
    async def search_tasks(self, query: str) -> list[Task]: ...
    async def get_tasks_due(self) -> list[Task]: ...
    async def mark_task_notified(self, task_id: str) -> None: ...
    async def update_task(self, task_id: str, title: str | None, due_date: str | None) -> bool: ...
    async def complete_task(self, task_id: str) -> bool: ...
    async def delete_task(self, task_id: str) -> bool: ...

    # Reminders
    async def create_reminder(self, reminder: Reminder) -> None: ...
    async def get_due_reminders(self) -> list[Reminder]: ...
    async def list_reminders(self, user_id: str | None = None) -> list[Reminder]: ...
    async def update_reminder_status(self, reminder_id: str, status: str) -> None: ...
    async def cancel_reminder(self, reminder_id: str) -> bool: ...

    # Scheduled tasks
    async def create_scheduled_task(self, task: ScheduledTask) -> None: ...
    async def list_scheduled_tasks(self) -> list[ScheduledTask]: ...
    async def update_scheduled_task(self, task_id: str, **kwargs) -> bool: ...
    async def delete_scheduled_task(self, task_id: str) -> bool: ...
    async def update_scheduled_task_last_run(self, task_id: str, last_run: datetime) -> None: ...

    # Audit log
    async def log_audit_entry(self, entry: AuditEntry) -> None: ...
    async def get_audit_log(self, limit: int = 50) -> list[AuditEntry]: ...

    # News
    async def add_news_topic(self, topic: NewsTopic) -> None: ...
    async def remove_news_topic(self, topic: str) -> bool: ...
    async def list_news_topics(self) -> list[NewsTopic]: ...
    async def add_seen_story(self, story: SeenStory) -> None: ...
    async def get_seen_urls(self) -> set[str]: ...
    async def cleanup_seen_stories(self, days: int = 7) -> None: ...

    # Read Later
    async def save_read_later(self, item: ReadLaterItem) -> None: ...
    async def list_read_later(self) -> list[ReadLaterItem]: ...
    async def delete_read_later(self, item_id: str) -> None: ...

    # User Facts
    async def save_user_fact(self, fact: UserFact) -> None: ...      # upsert on key
    async def get_user_fact(self, key: str) -> UserFact | None: ...
    async def get_all_user_facts(self) -> list[UserFact]: ...        # ordered by category, key
    async def search_user_facts(self, query: str) -> list[UserFact]: ...
    async def delete_user_fact(self, key: str) -> bool: ...

    # Alert Rules
    async def create_alert_rule(self, rule: AlertRule) -> None: ...
    async def list_alert_rules(self, enabled_only: bool = False) -> list[AlertRule]: ...
    async def update_alert_rule(self, rule_id: str, **kwargs) -> bool: ...
    async def delete_alert_rule(self, rule_id: str) -> bool: ...
    async def update_alert_rule_last_fired(self, rule_id: str, fired_at: datetime) -> None: ...

    # Pending Intentions
    async def create_intention(self, intention: PendingIntention) -> None: ...
    async def list_intentions(self, status: str | None = None) -> list[PendingIntention]: ...
    async def get_due_intentions(self) -> list[PendingIntention]: ...
    async def update_intention(self, intention_id: str, **kwargs) -> bool: ...
    async def count_followups_today(self) -> int: ...

    # Agent Learnings
    async def save_agent_learning(self, learning: AgentLearning) -> None: ...  # upsert on rule
    async def get_agent_learnings(self, status: str | None = None) -> list[AgentLearning]: ...
    async def delete_agent_learning(self, learning_id: str) -> bool: ...

    # Pending Actions (approval flow)
    async def save_pending_action(self, action: PendingAction) -> None: ...
    async def get_pending_action(self, action_id: str) -> PendingAction | None: ...
    async def update_pending_action(self, action_id: str, **kwargs) -> bool: ...
    async def list_expired_pending_actions(self) -> list[PendingAction]: ...  # pending rows past expires_at
    async def get_pending_actions_for_chat(self, chat_id: str, status: str = "pending") -> list[PendingAction]: ...
    async def delete_old_pending_actions(self, days: int = 7) -> int: ...    # prune confirmed/cancelled/expired rows
    async def delete_old_audit_entries(self, days: int = 90) -> int: ...     # prune old audit_log rows

    # Journal Entries
    async def save_journal_entry(self, entry: JournalEntry) -> None: ...
    async def list_journal_entries(self, days: int | None = None) -> list[JournalEntry]: ...

    # Health Samples (Spec 010 — Wearable / Health Connect ingest)
    async def save_health_samples(self, samples: list[dict]) -> tuple[int, int]: ...
        # Idempotent on (metric_type, start_time, source_device); returns (accepted, duplicates).
    async def get_health_samples(
        self,
        metric_type: str,
        since: datetime,
        until: datetime | None = None,
        limit: int = 5000,
    ) -> list[dict]: ...

    # Semantic Search
    async def semantic_search(
        self,
        embedding: list[float],
        sources: list[str] | None = None,  # default: all three
        limit: int = 5,
    ) -> list[SemanticSearchResult]: ...

    async def close(self) -> None: ...

Adapter Differences

Feature SQLite PostgreSQL
Case-insensitive search lower(column) LIKE lower(pattern) — fully case-insensitive for all Unicode ILIKE — always case-insensitive
Timestamps Stored as ISO 8601 strings Native TIMESTAMP WITH TIME ZONE
Connection Single persistent connection asyncpg connection pool (1–10)
Schema init CREATE TABLE IF NOT EXISTS on first connect Same, plus SSL fallback
Migrations ALTER TABLE ... ADD COLUMN in try/except ALTER TABLE ... ADD COLUMN IF NOT EXISTS
Boolean INTEGER (0/1) Native BOOLEAN
Vector search Python cosine similarity (full scan) pgvector <=> cosine operator with HNSW index
Embedding storage JSON text array Native halfvec(3072) type (16-bit; requires pgvector ≥0.7 extension)

Both adapters implement semantic_search() which finds content by meaning rather than keywords. The query text is embedded via embed_text() in app/tools/embedding.py (Gemini gemini-embedding-001, 3072 dimensions) before calling this method.

SQLite (local dev): Fetches all rows with non-null embeddings, computes Python cosine similarity in memory, sorts and truncates to limit. Streams row-by-row via cursor — does not load all embeddings into memory at once. Acceptable for a personal assistant's data volumes.

PostgreSQL (production): Uses the pgvector extension with the <=> cosine distance operator and an HNSW index (halfvec_cosine_ops) on each table. Each source independently returns its best limit candidates; results are merged and re-ranked before returning the top limit overall. Embeddings are stored as halfvec(3072) (16-bit floats) — full-precision vector(3072) exceeds pgvector's 2000-dim HNSW limit and would force a sequential scan.

Graceful degradation: If GOOGLE_API_KEY is not set, embed_text() returns None and no embedding is stored or queried — keyword search continues to work normally. If the pgvector extension is unavailable on Postgres, the CREATE EXTENSION and ADD COLUMN statements fail silently and semantic_search() returns an empty list.

Backfilling: Existing rows without embeddings can be embedded in bulk via POST /embed-backfill (protected by X-Reflection-Secret). Rate-limited to ~10 rows/second.


Schema Migrations

Neither adapter uses a migration framework. New columns are added via ALTER TABLE statements run on every startup, guarded so they're no-ops if the column already exists. New tables are created with CREATE TABLE IF NOT EXISTS. This keeps deployment simple at the cost of no rollback support.

Current migrations applied at startup:

  • tasks.notified — due-date notification tracking
  • audit_log.duration_ms, audit_log.tokens_used — request timing and token tracking
  • read_later.tags — keyword tags for relevance matching
  • reminders.channel — delivery channel (telegram/whatsapp)
  • notes.embedding, interactions.embedding, read_later.embedding — vector embeddings for semantic search (added as TEXT in SQLite, halfvec(3072) in Postgres via pgvector ≥0.7 extension; legacy vector(3072) columns auto-migrate in-place on first pool init)
  • health_samples — Spec 010 — created on first connect for both adapters; value is JSONB in Postgres and JSON-encoded TEXT in SQLite