Storage Layer¶
Storage is abstracted behind a StoragePort protocol. The agent and all tools talk to the protocol — never to a specific adapter directly.
Architecture¶
graph TD
AG[Agent tools] --> SP[StoragePort protocol\napp/memory/ports.py]
BG[Background loops] --> SP
DASH[Dashboard router] --> SP
SP --> SQ[SqliteAdapter\napp/memory/adapters/sqlite.py]
SP --> PG[PostgresAdapter\napp/memory/adapters/postgres.py]
SQ --> DB1[(kwasi.db\nlocal dev)]
PG --> DB2[(PostgreSQL\nRailway prod)]
Switch adapters with a single env var:
STORAGE_BACKEND=sqlite # default, uses DATABASE_URL=sqlite:///kwasi.db
STORAGE_BACKEND=postgres # uses DATABASE_URL from Railway
Schema¶
interactions¶
Every conversation exchange, from any channel.
| Column | Type | Notes |
|---|---|---|
id |
TEXT (UUID) | Primary key |
user_message |
TEXT | What you sent |
agent_response |
TEXT | What Kwasi replied |
tools_used |
TEXT | JSON array of tool call records |
channel |
TEXT | "telegram", "cli", or "whatsapp" |
user_id |
TEXT | Telegram user ID (nullable) |
created_at |
TIMESTAMP | UTC |
embedding |
TEXT (SQLite) / halfvec(3072) (Postgres) | 3072-float embedding from Gemini gemini-embedding-001 (16-bit on Postgres). NULL until embedded. Used by semantic_search. |
notes¶
| Column | Type | Notes |
|---|---|---|
id |
TEXT (UUID) | Primary key |
title |
TEXT | |
content |
TEXT | |
created_at |
TIMESTAMP | UTC |
embedding |
TEXT (SQLite) / halfvec(3072) (Postgres) | 3072-float embedding from Gemini gemini-embedding-001 (16-bit on Postgres). NULL until embedded. Used by semantic_search. |
tasks¶
| Column | Type | Notes |
|---|---|---|
id |
TEXT (UUID) | Primary key |
title |
TEXT | |
status |
TEXT | "todo" or "done" |
due_date |
TEXT | ISO date string (YYYY-MM-DD), nullable |
notified |
BOOLEAN | true once a due-date notification has been sent |
created_at |
TIMESTAMP | UTC |
reminders¶
| Column | Type | Notes |
|---|---|---|
id |
TEXT (UUID) | Primary key |
user_id |
TEXT | Telegram user ID or WhatsApp phone number |
chat_id |
TEXT | Delivery target: Telegram chat ID or WhatsApp phone number |
message |
TEXT | The reminder text |
remind_at |
TIMESTAMP | UTC — when to fire |
status |
TEXT | "pending", "sent", or "cancelled" |
channel |
TEXT | "telegram" or "whatsapp" — delivery channel |
created_at |
TIMESTAMP | UTC |
context¶
Stores the long-term user profile produced by the Reflection Engine, plus system cooldown state.
| Column | Type | Notes |
|---|---|---|
user_id |
TEXT | Primary key — "global" for the user profile; "system:briefing" for daily briefing dedup; "system:stale_check" for the weekly stale-facts nudge cooldown |
content |
TEXT | Markdown profile ≤550 words (for "global"); ISO date string for system keys |
last_updated |
TIMESTAMP | UTC |
user_facts¶
Permanent key-value facts about the user. Never pruned. Written by the agent (remember_fact tool) or auto-extracted by nightly reflection.
| Column | Type | Notes |
|---|---|---|
id |
TEXT (UUID) | Primary key |
key |
TEXT | UNIQUE. snake_case identifier, e.g. home_address, partner_name |
value |
TEXT | The actual fact value |
category |
TEXT | location, personal, preference, work, health, or general |
source |
TEXT | "agent" (proactive/explicit) or "reflection" (auto-extracted nightly) |
created_at |
TIMESTAMP | UTC — set on first insert, never changed |
updated_at |
TIMESTAMP | UTC — refreshed on every upsert |
The unique index on key means save_user_fact is always an upsert — writing the same key a second time overwrites the value.
news_topics¶
Explicitly followed news topics.
| Column | Type | Notes |
|---|---|---|
id |
TEXT (UUID) | Primary key |
topic |
TEXT | Unique, stored lowercase |
created_at |
TIMESTAMP | UTC |
seen_news¶
Rolling 7-day dedup store for fetched news stories.
| Column | Type | Notes |
|---|---|---|
url |
TEXT | Primary key |
title |
TEXT | Story headline |
seen_at |
TIMESTAMP | UTC — entries older than 7 days are purged on next get_news call |
scheduled_tasks¶
User-defined recurring jobs evaluated by _user_scheduled_tasks_loop every 60 seconds.
| Column | Type | Notes |
|---|---|---|
id |
TEXT (UUID) | Primary key |
name |
TEXT | Human-readable label |
prompt |
TEXT | What to run the agent with when the task fires |
cron_expression |
TEXT | Standard 5-field cron, e.g. 0 9 * * 1 = Monday 9am |
timezone |
TEXT | IANA timezone (defaults to USER_TIMEZONE) |
enabled |
BOOLEAN | true to run, false to pause |
created_by |
TEXT | "user" or "system" |
last_run |
TIMESTAMP | UTC — last time this task successfully fired |
created_at |
TIMESTAMP | UTC |
audit_log¶
Every tool call the agent makes, with sanitised arguments and truncated output.
| Column | Type | Notes |
|---|---|---|
id |
TEXT (UUID) | Primary key |
timestamp |
TIMESTAMP | UTC |
tool_name |
TEXT | Name of the tool called |
arguments |
TEXT | JSON — keys matching key, token, secret, password are redacted to "***" |
result_summary |
TEXT | First 200 characters of tool output |
channel |
TEXT | "telegram", "whatsapp", or "cli" |
user_id |
TEXT | Nullable |
read_later¶
Articles saved by the user via the Content Curator skill.
| Column | Type | Notes |
|---|---|---|
id |
TEXT (UUID) | Primary key (prefix match supported in delete_read_later) |
url |
TEXT | Original URL |
title |
TEXT | Article title |
summary |
TEXT | Auto-generated at save time via LLM summarisation |
note |
TEXT | Optional user annotation |
tags |
TEXT | Comma-separated keywords extracted from title + summary at save time (frequency-ranked, stop-word filtered, up to 10). Used by find_relevant_read_later() to surface contextually relevant articles. |
added_at |
TIMESTAMP | UTC |
embedding |
TEXT (SQLite) / halfvec(3072) (Postgres) | 3072-float embedding from Gemini gemini-embedding-001 (16-bit on Postgres). NULL until embedded. Used by semantic_search. |
alert_rules¶
Proactive alert rules evaluated every 5 minutes by _alert_loop. Two system defaults seeded on first run.
| Column | Type | Notes |
|---|---|---|
id |
TEXT (UUID) | Primary key |
name |
TEXT | Human-readable label |
trigger_type |
TEXT | task_due_today, task_overdue (phase 2: meeting_soon, email_arrived) |
conditions |
TEXT | JSON object of trigger-specific params (e.g. {"fire_hour": 11}) |
cooldown_hours |
INTEGER | Minimum hours between consecutive firings — prevents spam |
enabled |
BOOLEAN | true to evaluate, false to pause |
created_by |
TEXT | "user" or "system" |
last_fired |
TIMESTAMP | UTC — last time this rule triggered a message |
created_at |
TIMESTAMP | UTC |
agent_learnings¶
Behavioral corrections extracted by the nightly Reflection Engine. Promoted from "candidate" to "active" when seen in ≥2 reflection cycles.
| Column | Type | Notes |
|---|---|---|
id |
TEXT (UUID) | Primary key |
rule |
TEXT | UNIQUE (normalized lowercase). Short imperative sentence ≤200 chars, e.g. "always confirm before deleting any item" |
category |
TEXT | response_format, tool_usage, decision_making, tone, or scope |
recurrence_count |
INTEGER | Incremented each time the same rule is re-extracted — triggers promotion at ≥2 |
status |
TEXT | "candidate" (new, not yet confirmed) or "active" (shown in system prompt) |
first_seen |
TIMESTAMP | UTC |
last_seen |
TIMESTAMP | UTC |
pending_actions¶
Holds consequential actions awaiting user confirmation via the Telegram inline approval flow. Created by approval_gate() when the agent calls a gated tool; resolved by the user tapping Confirm, Cancel, or Edit.
| Column | Type | Notes |
|---|---|---|
id |
TEXT (UUID) | Primary key |
tool_name |
TEXT | e.g. "create_task", "send_email_wrapper", "__plan__", "__plan_resume__" |
action_type |
TEXT | "task", "reminder", "note", "email", "calendar", "todo", "slack", "plan" (Spec 008 — multi-step plan awaiting Confirm), or "plan_resume" (Spec 008 — partial-failure resume) |
payload |
TEXT | JSON-serialized kwargs to pass to the executor on Confirm. For "plan" this is the serialised ExecutionPlan; for "plan_resume" it is the serialised PlanResumePayload (plan + remaining_steps + scratchpad + failed_step + failure_reason). |
preview_text |
TEXT | Human-readable description shown to the user |
chat_id |
TEXT | Telegram chat ID that triggered the action |
message_id |
INTEGER | Telegram message ID of the preview message (set after send, used by expiry loop to edit) |
status |
TEXT | "pending", "confirmed", "cancelled", "expired", or "failed" |
notified |
BOOLEAN | true once the user has seen an expiry notice |
edit_count |
INTEGER | Number of times the user has used the Edit button on this action |
original_user_message |
TEXT | The user's original message, preserved for the Edit re-run prompt |
trace_id |
TEXT | Nullable. OTEL trace ID captured at gate time so the eventual Confirm/Cancel/Edit decision can attach a user_approval / user_edit score to the originating trace asynchronously (Spec 009 — Langfuse observability). |
created_at |
TIMESTAMP | UTC |
expires_at |
TIMESTAMP | UTC — created_at + 30 min; gate rejects execution after this point |
Indexed on (chat_id, status) and (expires_at, status) for efficient per-chat and expiry lookups.
pending_intentions¶
Soft personal commitments extracted from conversation by the nightly Reflection Engine. Follow-ups delivered proactively.
| Column | Type | Notes |
|---|---|---|
id |
TEXT (UUID) | Primary key |
text |
TEXT | The intention text, e.g. "call doctor about knee" |
mentioned_at |
TIMESTAMP | UTC — when the intention was first expressed |
follow_up_days |
INTEGER | Days until first follow-up (default 3) |
follow_up_at |
TIMESTAMP | UTC — next scheduled follow-up delivery |
status |
TEXT | pending, snoozed, resolved, dismissed |
last_follow_up_at |
TIMESTAMP | UTC — when the last follow-up was sent (nullable) |
created_at |
TIMESTAMP | UTC |
journal_entries¶
Private journal entries saved by the agent via save_journal_entry. Included in the nightly Reflection Engine context so they inform the long-term narrative profile and intention extraction. A weekly digest fires via _journal_digest_loop.
| Column | Type | Notes |
|---|---|---|
id |
TEXT (UUID) | Primary key |
content |
TEXT | Full text of the entry |
title |
TEXT | ≤10-word title capturing the main theme, generated by the model |
source |
TEXT | "voice" (default) or "text" — how the entry was created |
created_at |
TIMESTAMP | UTC |
Indexed on created_at for efficient time-range lookups. Entries are never embedded (too personal/short for vector search) — retrieved by date range only.
health_samples¶
Wearable / Health Connect samples ingested by the Android bridge (Spec 010). Single normalised, append-only table covering all 11 metric types with a flexible JSON value payload — schema is unchanged when adding a new metric type.
| Column | Type | Notes |
|---|---|---|
id |
BIGSERIAL (Postgres) / INTEGER AUTOINCREMENT (SQLite) | Primary key |
metric_type |
TEXT | One of: steps, sleep_session, heart_rate, hrv_rmssd, spo2, resting_hr, respiratory_rate, exercise, body_fat, weight, blood_pressure |
start_time |
TIMESTAMP WITH TIME ZONE | UTC |
end_time |
TIMESTAMP WITH TIME ZONE | UTC, nullable — null for instantaneous samples (HR, HRV, SpO2, RHR, …); set for sessions (sleep, exercise) and intervals (steps) |
value |
JSONB (Postgres) / TEXT (SQLite) | Metric-specific payload — e.g. {"bpm": 62}, {"rmssd_ms": 38}, {"score": 78, "stages": [...]}, {"count": 4231} |
source_device |
TEXT | e.g. Galaxy Watch 5 Pro |
source_app |
TEXT | e.g. com.sec.android.app.shealth |
created_at |
TIMESTAMP WITH TIME ZONE | UTC, when the row landed in the server |
Indexes:
- UNIQUE (metric_type, start_time, source_device) — idempotency key; duplicate POSTs from the bridge are dropped silently
- (metric_type, start_time DESC) — for time-range reads by the agent's health tools
Writes never embed (high volume, low semantic value). Reads are always time-bounded — get_health_samples(metric_type, since, until=None, limit=5000).
Data Models¶
@dataclass
class SemanticSearchResult:
source: str # "notes" | "interactions" | "read_later"
id: str
title: str # note title / "Conversation YYYY-MM-DD" / article title
snippet: str # first 200 chars of relevant content
similarity: float # cosine similarity 0.0–1.0
created_at: datetime | None # used for recency weighting in context injection
StoragePort Protocol¶
Defined in app/memory/ports.py. Every method is async.
class StoragePort(Protocol):
# Interactions
async def save_interaction(self, interaction: Interaction) -> None: ...
async def get_interactions(self, limit: int = 50) -> list[Interaction]: ...
async def get_interactions_by_user(self, user_id: str, limit: int = 20) -> list[Interaction]: ...
async def get_interactions_since(self, since: datetime) -> list[Interaction]: ...
async def search_interactions(self, query: str, limit: int = 10) -> list[Interaction]: ...
# Context
async def get_context(self, user_id: str) -> UserContext | None: ...
async def save_context(self, context: UserContext) -> None: ...
# Notes
async def save_note(self, note: Note) -> None: ...
async def get_notes(self) -> list[Note]: ...
async def search_notes(self, query: str) -> list[Note]: ...
async def update_note(self, note_id: str, title: str, content: str) -> bool: ...
async def delete_note(self, note_id: str) -> bool: ...
# Tasks
async def create_task(self, task: Task) -> None: ...
async def list_tasks(self, status: str | None = None) -> list[Task]: ...
async def search_tasks(self, query: str) -> list[Task]: ...
async def get_tasks_due(self) -> list[Task]: ...
async def mark_task_notified(self, task_id: str) -> None: ...
async def update_task(self, task_id: str, title: str | None, due_date: str | None) -> bool: ...
async def complete_task(self, task_id: str) -> bool: ...
async def delete_task(self, task_id: str) -> bool: ...
# Reminders
async def create_reminder(self, reminder: Reminder) -> None: ...
async def get_due_reminders(self) -> list[Reminder]: ...
async def list_reminders(self, user_id: str | None = None) -> list[Reminder]: ...
async def update_reminder_status(self, reminder_id: str, status: str) -> None: ...
async def cancel_reminder(self, reminder_id: str) -> bool: ...
# Scheduled tasks
async def create_scheduled_task(self, task: ScheduledTask) -> None: ...
async def list_scheduled_tasks(self) -> list[ScheduledTask]: ...
async def update_scheduled_task(self, task_id: str, **kwargs) -> bool: ...
async def delete_scheduled_task(self, task_id: str) -> bool: ...
async def update_scheduled_task_last_run(self, task_id: str, last_run: datetime) -> None: ...
# Audit log
async def log_audit_entry(self, entry: AuditEntry) -> None: ...
async def get_audit_log(self, limit: int = 50) -> list[AuditEntry]: ...
# News
async def add_news_topic(self, topic: NewsTopic) -> None: ...
async def remove_news_topic(self, topic: str) -> bool: ...
async def list_news_topics(self) -> list[NewsTopic]: ...
async def add_seen_story(self, story: SeenStory) -> None: ...
async def get_seen_urls(self) -> set[str]: ...
async def cleanup_seen_stories(self, days: int = 7) -> None: ...
# Read Later
async def save_read_later(self, item: ReadLaterItem) -> None: ...
async def list_read_later(self) -> list[ReadLaterItem]: ...
async def delete_read_later(self, item_id: str) -> None: ...
# User Facts
async def save_user_fact(self, fact: UserFact) -> None: ... # upsert on key
async def get_user_fact(self, key: str) -> UserFact | None: ...
async def get_all_user_facts(self) -> list[UserFact]: ... # ordered by category, key
async def search_user_facts(self, query: str) -> list[UserFact]: ...
async def delete_user_fact(self, key: str) -> bool: ...
# Alert Rules
async def create_alert_rule(self, rule: AlertRule) -> None: ...
async def list_alert_rules(self, enabled_only: bool = False) -> list[AlertRule]: ...
async def update_alert_rule(self, rule_id: str, **kwargs) -> bool: ...
async def delete_alert_rule(self, rule_id: str) -> bool: ...
async def update_alert_rule_last_fired(self, rule_id: str, fired_at: datetime) -> None: ...
# Pending Intentions
async def create_intention(self, intention: PendingIntention) -> None: ...
async def list_intentions(self, status: str | None = None) -> list[PendingIntention]: ...
async def get_due_intentions(self) -> list[PendingIntention]: ...
async def update_intention(self, intention_id: str, **kwargs) -> bool: ...
async def count_followups_today(self) -> int: ...
# Agent Learnings
async def save_agent_learning(self, learning: AgentLearning) -> None: ... # upsert on rule
async def get_agent_learnings(self, status: str | None = None) -> list[AgentLearning]: ...
async def delete_agent_learning(self, learning_id: str) -> bool: ...
# Pending Actions (approval flow)
async def save_pending_action(self, action: PendingAction) -> None: ...
async def get_pending_action(self, action_id: str) -> PendingAction | None: ...
async def update_pending_action(self, action_id: str, **kwargs) -> bool: ...
async def list_expired_pending_actions(self) -> list[PendingAction]: ... # pending rows past expires_at
async def get_pending_actions_for_chat(self, chat_id: str, status: str = "pending") -> list[PendingAction]: ...
async def delete_old_pending_actions(self, days: int = 7) -> int: ... # prune confirmed/cancelled/expired rows
async def delete_old_audit_entries(self, days: int = 90) -> int: ... # prune old audit_log rows
# Journal Entries
async def save_journal_entry(self, entry: JournalEntry) -> None: ...
async def list_journal_entries(self, days: int | None = None) -> list[JournalEntry]: ...
# Health Samples (Spec 010 — Wearable / Health Connect ingest)
async def save_health_samples(self, samples: list[dict]) -> tuple[int, int]: ...
# Idempotent on (metric_type, start_time, source_device); returns (accepted, duplicates).
async def get_health_samples(
self,
metric_type: str,
since: datetime,
until: datetime | None = None,
limit: int = 5000,
) -> list[dict]: ...
# Semantic Search
async def semantic_search(
self,
embedding: list[float],
sources: list[str] | None = None, # default: all three
limit: int = 5,
) -> list[SemanticSearchResult]: ...
async def close(self) -> None: ...
Adapter Differences¶
| Feature | SQLite | PostgreSQL |
|---|---|---|
| Case-insensitive search | lower(column) LIKE lower(pattern) — fully case-insensitive for all Unicode |
ILIKE — always case-insensitive |
| Timestamps | Stored as ISO 8601 strings | Native TIMESTAMP WITH TIME ZONE |
| Connection | Single persistent connection | asyncpg connection pool (1–10) |
| Schema init | CREATE TABLE IF NOT EXISTS on first connect |
Same, plus SSL fallback |
| Migrations | ALTER TABLE ... ADD COLUMN in try/except |
ALTER TABLE ... ADD COLUMN IF NOT EXISTS |
| Boolean | INTEGER (0/1) |
Native BOOLEAN |
| Vector search | Python cosine similarity (full scan) | pgvector <=> cosine operator with HNSW index |
| Embedding storage | JSON text array | Native halfvec(3072) type (16-bit; requires pgvector ≥0.7 extension) |
Semantic Search¶
Both adapters implement semantic_search() which finds content by meaning rather than keywords. The query text is embedded via embed_text() in app/tools/embedding.py (Gemini gemini-embedding-001, 3072 dimensions) before calling this method.
SQLite (local dev): Fetches all rows with non-null embeddings, computes Python cosine similarity in memory, sorts and truncates to limit. Streams row-by-row via cursor — does not load all embeddings into memory at once. Acceptable for a personal assistant's data volumes.
PostgreSQL (production): Uses the pgvector extension with the <=> cosine distance operator and an HNSW index (halfvec_cosine_ops) on each table. Each source independently returns its best limit candidates; results are merged and re-ranked before returning the top limit overall. Embeddings are stored as halfvec(3072) (16-bit floats) — full-precision vector(3072) exceeds pgvector's 2000-dim HNSW limit and would force a sequential scan.
Graceful degradation: If GOOGLE_API_KEY is not set, embed_text() returns None and no embedding is stored or queried — keyword search continues to work normally. If the pgvector extension is unavailable on Postgres, the CREATE EXTENSION and ADD COLUMN statements fail silently and semantic_search() returns an empty list.
Backfilling: Existing rows without embeddings can be embedded in bulk via POST /embed-backfill (protected by X-Reflection-Secret). Rate-limited to ~10 rows/second.
Schema Migrations¶
Neither adapter uses a migration framework. New columns are added via ALTER TABLE statements run on every startup, guarded so they're no-ops if the column already exists. New tables are created with CREATE TABLE IF NOT EXISTS. This keeps deployment simple at the cost of no rollback support.
Current migrations applied at startup:
tasks.notified— due-date notification trackingaudit_log.duration_ms,audit_log.tokens_used— request timing and token trackingread_later.tags— keyword tags for relevance matchingreminders.channel— delivery channel (telegram/whatsapp)notes.embedding,interactions.embedding,read_later.embedding— vector embeddings for semantic search (added as TEXT in SQLite,halfvec(3072)in Postgres via pgvector ≥0.7 extension; legacyvector(3072)columns auto-migrate in-place on first pool init)health_samples— Spec 010 — created on first connect for both adapters;valueisJSONBin Postgres and JSON-encoded TEXT in SQLite