Skip to content

Architecture

Module layout changed (2026-06 cleanup)

Behavior is unchanged, but the code was decomposed: agent tools → app/agent_tools/<domain>.py, background loops → app/background.py, HTTP routes → app/routers/api.py, Telegram helpers/media/commands split out of bot.py, prompt text → app/prompt_text.py, AgentDeps/build_depsapp/deps.py. Original modules re-export their public symbols. See the banner on the home page and CLAUDE.md → Recent Changes.

System Context

Zoom-out view of the runtime topology — who talks to Kwasi, where Kwasi lives, and every external service it depends on. Useful for "if X goes down, what breaks?" and "where does my data leave the container?"

%%{init: {"flowchart": {"defaultRenderer": "elk", "nodeSpacing": 25, "rankSpacing": 45, "wrappingWidth": 180, "elk": {"mergeEdges": true, "nodePlacementStrategy": "NETWORK_SIMPLEX"}}}}%%
graph TB
    subgraph Edge["Users and Devices"]
        direction LR
        U[Lawrence\nTelegram client]
        PHONE["Android phone\nHealth Connect bridge\n+ HTTP Shortcuts"]
        WATCH[Samsung Watch /\nwearable]
        DASHU[Dashboard browser]
    end

    subgraph Railway["Railway (Docker container)"]
        KWASI["Kwasi\nFastAPI + Telegram bot\n+ background loops"]
        PG[("PostgreSQL\n+ pgvector\nRailway managed service")]
        CRON["Railway Cron\nPOST /reflect 2 AM UTC"]
        KWASI <--> PG
        CRON -.->|X-Reflection-Secret| KWASI
    end

    subgraph AI["AI / LLM Providers"]
        direction LR
        GEM["Google Gemini\nLLM · Vision · STT · embeddings"]
        ANT["Anthropic Claude\noptional primary\n+ delegation backend"]
        OAI["OpenAI\noptional primary"]
        E2B["E2B Cloud\nsandboxed Python\n+ delegation VMs"]
    end

    subgraph Obs["Observability (Cloud)"]
        direction LR
        LF["Logfire Cloud\ninfra · FastAPI · loops"]
        LFC["Langfuse Cloud\nLLM generations · prompts · scores"]
    end

    subgraph Personal["Google / Microsoft APIs"]
        direction LR
        GMAIL["Gmail API\npersonal + work OAuth2"]
        GCAL["Google Calendar API"]
        GDRIVE["Google Drive API\npersonal + work OAuth2"]
        MSG["Microsoft Graph\nOutlook mail · calendar · To Do\nMSAL"]
    end

    subgraph Work["Messaging / Productivity APIs"]
        direction LR
        TG[Telegram BotAPI\nlong-polling]
        SLACK[Slack Web API]
        JIRA[Jira REST API]
        GH[GitHub API\nPyGitHub]
    end

    subgraph Utility["Utility APIs"]
        direction LR
        TAV[Tavily\nweb search]
        WX[WeatherAPI]
        GMAP[Google Maps\ntransit + places]
        YV[YouVersion API\nverse of the day]
    end

    U <--> TG
    TG <--> KWASI
    PHONE -->|"X-Health-Secret\nPOST /health/ingest"| KWASI
    PHONE -->|"X-API-Token\nPOST /message"| KWASI
    WATCH --> PHONE
    DASHU -->|X-Dashboard-Secret| KWASI

    KWASI --> GEM
    KWASI -.-> ANT
    KWASI -.-> OAI
    KWASI --> E2B

    KWASI -->|OTEL spans| LF
    KWASI -->|OTEL spans + scores| LFC

    KWASI --> GMAIL
    KWASI --> GCAL
    KWASI --> GDRIVE
    KWASI --> MSG
    KWASI -.-> YV

    KWASI --> SLACK
    KWASI --> JIRA
    KWASI --> GH

    KWASI --> TAV
    KWASI --> WX
    KWASI --> GMAP

Reading the diagram:

  • Solid arrows are always-on dependencies. Dotted arrows are optional/feature-flagged (Anthropic, OpenAI, YouVersion verse-of-the-day, Railway Cron triggering /reflect).
  • Railway boundary = one container + one Postgres service. Everything else is somebody else's cloud. The container is stateless aside from in-flight asyncio tasks; all durable state lives in Postgres.
  • Three trust boundaries cross the Railway edge:
  • Inbound from users/devices, gated by token headers: X-Health-Secret (Android health bridge), X-API-Token (Android HTTP Shortcuts → POST /message), X-Dashboard-Secret (dashboard browser), X-Reflection-Secret (Railway Cron → POST /reflect). Telegram is gated by ALLOWED_TELEGRAM_USER_IDS at handler level.
  • Outbound to AI providers, gated by API keys in env. Gemini is the only one that's load-bearing on the default config; the others are alternative primary models or alternative delegation backends.
  • Outbound to personal/work APIs, gated by per-service OAuth refresh tokens (Google, Microsoft) or PATs (GitHub, Jira, Slack).
  • What breaks if X is down:
  • Gemini down → no LLM responses, no embeddings, no Vision/STT. Hard failure.
  • Postgres down → no history, no facts, no approval gate. Hard failure.
  • Telegram BotAPI down → can't receive or reply. Background loops still run (and queue notifications that will fail to send).
  • Logfire or Langfuse down → observability gap, app keeps working (OTEL exporter retries in background).
  • E2B downexecute_python and delegate_to_coding_agent fail; everything else fine.
  • Any single personal/work API down (Gmail, Slack, etc.) → that domain agent's tools return errors; routing falls through to the full agent which retries or summarises the failure to the user.

System Overview

%%{init: {"flowchart": {"defaultRenderer": "elk", "nodeSpacing": 30, "rankSpacing": 50, "wrappingWidth": 180}}}%%
graph TB
    subgraph Interfaces
        TG[Telegram\npython-telegram-bot polling]
        WA[WhatsApp\nwebhook]
        CLI[CLI\nlocal REPL]
        DASH[Dashboard\n/dashboard/]
    end

    subgraph Planning["Planning Layer (app/planning/) — Spec 008"]
        CC[classify_complexity\nregex pre-filter]
        GP[generate_plan\nplanner agent]
        EP[execute_plan\nstep loop · scratchpad threading]
    end

    subgraph Routing["Intent Routing (app/tools/router.py + app/routing/agents.py)"]
        CI[classify_intent\nkeyword → semantic top-K → context_hint]
        SA[select_agent\ndispatcher]
        DA["Domain Agents (13)\nemail · calendar · memory · github · news\nslack · jira · drive · meetings\ndiagnostics · health · utility · database"]
        CA[Composed Agent\nambiguous / cross-domain\nlru_cache]
        FA2[Full Agent\nfallback]
    end

    subgraph Core["Core (FastAPI + asyncio)"]
        FAST[FastAPI\napp/main.py]
        AG[Pydantic AI Agent\napp/agent.py]
        SP[System Prompt\nbuild_system_prompt]
    end

    subgraph Tools["Native Tools"]
        T1[search_web\nTavily]
        T2[get_weather\nWeatherAPI]
        T3[get_datetime]
        T4[summarize_url]
        T5[transcribe_audio\nGemini STT]
        T6[analyze_image\nGemini Vision]
        T7[browse_web\nbrowser_submit_form\nPlaywright]
        T8[github_*\nPyGitHub]
    end

    subgraph Skills["Skills (file-drop plugins)\napp/skills/"]
        S1[content_curator\nread_later.py]
        S2[travel_briefing]
        S3[cv]
        S4[deep_research\nresearch.py]
        S5[meeting_notes\nget_meeting_insights · list_recent_meetings]
    end

    subgraph MCPTools["MCP Tools"]
        M1[Gmail personal + work\ngmail_* tools]
        M2[Outlook Email\noutlook_* tools]
        M3[Google Calendar\ngoogle_* tools]
        M4[Outlook Calendar\noutlook_calendar_* tools]
        M5[Microsoft To Do\ntodo_* tools]
        M6[Google Drive\ndrive_* tools]
        M7[Logfire\nlogfire_* tools]
    end

    subgraph Memory["Memory Tools (agent tools over storage)"]
        N[Notes\nsave · get · search · edit · delete · semantic]
        TK[Tasks\ncreate · list · search · complete · edit · delete]
        RM[Reminders\nset · list · cancel]
        SC[Scheduled Tasks\ncreate · list · update · delete]
        HX[History\nsearch_history · semantic_search]
        UF[User Facts\nremember_fact · recall_facts · forget_fact]
    end

    subgraph Storage["Storage (StoragePort)"]
        SQ[(SQLite\nlocal dev)]
        PG[(PostgreSQL\nRailway prod)]
    end

    subgraph HealthIngest["Health Data Ingest (Spec 010 — push model, no server-side loop)"]
        WATCH[Samsung Watch / wearable]
        HC[Google Health Connect\non-device]
        BR["bridge-android/\nKotlin · WorkManager 15 min"]
        IN["POST /health/ingest\nX-Health-Secret · idempotent"]
        WATCH --> HC
        HC --> BR
        BR --> IN
        IN --> Storage
    end

    subgraph BG["Background Loops"]
        RL[Reminder Loop\nevery 5 min]
        BL[Briefing Loop\ndaily at BRIEFING_TIME]
        ST[Scheduled Tasks Loop\nevery 60 sec]
        RF[Reflection Loop\ndaily at 2 AM UTC]
        ER[Evening Recap Loop\ndaily local evening]
        WR[Weekly Recap Loop\nFriday local]
        WP[Weekly Prep Loop\nSunday local]
        RLD[Read-Later Digest\nSaturday morning]
        JD[Journal Digest\nSunday evening]
        EI[Email Intelligence\ndaily at EMAIL_INTEL_TIME]
        AL[Alert + Intentions Loop\nevery 5 min]
        MP[Meeting Prep Loop\nevery 5 min]
        AE[Approval Expiry Loop\nevery 5 min]
        PRL[PR Review Loop\ndaily at PR_REVIEW_TIME]
    end

    subgraph Models["LLM Models"]
        LLM[Primary LLM\nMODEL_NAME\nGemini / Claude / GPT-4]
        MINI[Mini LLM\nMINI_MODEL_NAME\nskill synthesis]
        REFL[Reflection LLM\nREFLECTION_MODEL_NAME\nnightly reflection]
    end

    subgraph SelfImprove["Self-Improvement Loop (PR 1 + PR 2)"]
        RS[read_source_file · grep_source\napp/tools/source.py\non every domain agent]
        GD[detect_and_save_capability_gap\npost-turn fire-and-forget\nregex → mini-classify → grep-verify]
        PR[propose_skill\nprimary model draft + static validate]
        AC["activate_proposed_skill\napproval-gated → write app/skills/&lt;name&gt;.py\n+ hot_reload onto every skill-bearing agent"]
        GD --> PR
        PR --> AC
    end

    TG --> CC
    CC -- complex --> GP
    GP -- 2+ steps --> EP
    CC -- simple --> CI
    GP -- 0–1 steps --> CI
    EP -- per step --> CI
    WA --> FAST
    CLI --> AG
    CI --> SA
    SA --> DA
    SA --> CA
    SA --> FA2
    DA --> LLM
    CA --> LLM
    FA2 --> LLM
    AG --> RS
    AG -- post-turn fire-and-forget --> GD
    GD --> Storage
    PR --> LLM
    AC --> Skills
    FAST --> FAST
    DASH --> Storage
    AG --> SP
    SP --> Storage
    AG --> LLM
    AG --> Tools
    AG --> Skills
    AG --> MCPTools
    AG --> Memory
    Memory --> Storage
    BG --> Storage
    BG --> TG
    Skills --> MINI
    BG --> REFL

Component Breakdown

FastAPI (app/main.py)

The web process entry point. Responsibilities:

  • Mounts the WhatsApp webhook router, dashboard router, and health router (/health/ingest, /health/backfill, /health/status)
  • Exposes GET /health and POST /reflect
  • Exposes POST /embed-backfill — bulk-embeds existing rows without embeddings; protected by X-Reflection-Secret; rate-limited to ~10 rows/sec
  • Exposes POST /health/ingest — idempotent batch ingest for wearable samples from the Android bridge; protected by X-Health-Secret (Spec 010)
  • Exposes POST /health/backfill — one-shot upload of a Samsung Health "Download Personal Data" CSV/JSON archive to seed historical samples; protected by X-Health-Secret
  • Exposes GET /health/status — per-metric ingest freshness summary for monitoring the bridge from outside the device. Returns {now, latest_ingest, lag_minutes, metrics: [...]} with one entry per known metric type (zeros when never ingested) plus per-metric ingest_lag_minutes. Protected by X-Health-Secret so polling it doesn't expose data shape (Spec 010)
  • Exposes POST /message — external message endpoint for Android HTTP Shortcuts and other clients. Accepts text and/or images (multipart/form-data or JSON with base64). Protected by X-API-Token header (API_TOKEN env var). Processes input through the full agent pipeline (intent routing, read-later injection, message history). Delivers response to Telegram (BRIEFING_CHAT_ID) and returns it in the JSON body. Conversation history is unified with Telegram — both interfaces use BRIEFING_CHAT_ID as the user ID so context is shared across surfaces.
  • Manages the FastAPI lifespan context — starts and stops the Telegram bot and all background asyncio tasks
  • Exports API keys to os.environ so LLM SDKs can find them

Agent (app/agent.py)

The brain. Built with Pydantic AI. Responsibilities:

  • Defines AgentDeps — the dependency container passed to every tool and the system prompt
  • build_deps() — single factory function used by all interfaces to create AgentDeps
  • build_system_prompt() — async function decorated with @agent.system_prompt, assembles the full prompt from the static instruction sections + dynamic storage-fetched sections (behavioral learnings, narrative profile, live DB schema, and — if the vault is enabled — stored web logins). Permanent facts are not injected; the prompt tells the model to call recall_facts(). See System Prompt Architecture below.
  • build_briefing_system_prompt() — extends build_system_prompt by appending all briefing structural templates (_MORNING_BRIEFING, _EVENING_RECAP, _WEEKLY_RECAP, _WEEKLY_PREP, _MEETING_PREP) at the system level. Used exclusively by briefing_agent.
  • Registers all native tools via @agent.tool
  • Calls load_skills(agent) at startup to auto-register skills from app/skills/

Multi-Model Architecture

Three model roles are configured independently via env vars. Each is optimised for a different cost/capability trade-off:

Role Env var Default Used for
Primary MODEL_NAME google-gla:gemini-2.5-flash Main conversational agent and all 13 domain agents — every interactive user message
Mini MINI_MODEL_NAME google-gla:gemini-3-1-flash-lite-preview Skill synthesis: CV extraction, deep research synthesis, read-later summarisation, travel briefing, post-conversation fact extraction, PR review structuring, capability-gap classification
Reflection REFLECTION_MODEL_NAME (empty — falls back to MODEL_NAME) Nightly reflection run only — set to a larger/more capable model for deeper analysis without affecting interactive latency

settings.effective_reflection_model resolves the fallback: if REFLECTION_MODEL_NAME is empty, it returns MODEL_NAME.

Model binding is per-request, not at agent construction. The Agent instances are built with no model (Agent(deps_type=AgentDeps, retries=3)); every call passes model=ctx.deps.settings.model_name at invocation time. Because settings and deps.settings are the same object, mutating deps.settings.model_name in memory changes the model on the next message with no restart — this is the mechanism that makes set_runtime_config("model_name", …) instant (see Self-Management Subsystem). (Limitation: a few skill-synthesis "mini" agents bind mini_model_name at import time, so a runtime change to the mini model only affects call sites that read deps.settings.mini_model_name dynamically until the next restart.)

Why separate models? The mini model handles high-frequency, low-stakes synthesis tasks (run after every message) where cost and latency matter most. The primary model handles user-facing responses where quality matters. The reflection model is a deliberate override slot — if you want a frontier model for nightly analysis without paying for it on every chat message, set only REFLECTION_MODEL_NAME.

Telegram Interface (app/interfaces/telegram/bot.py)

The primary user-facing interface. Handles:

  • Text messages — streamed response with live edit every 1.5s (Telegram rate limit)
  • Voice/audio — downloads from Telegram, transcribes via Gemini STT, runs agent, replies with a voice note (TTS) for responses ≤500 words
  • Photos — downloads highest resolution, runs Gemini Vision, runs agent
  • Documents — PDFs via Gemini Vision, text files decoded directly
  • Slash commands/start, /help, /notes, /tasks, /reminders (bypass the agent, hit storage directly)
  • AllowlistALLOWED_TELEGRAM_USER_IDS checked on every handler; blocks all if unset
  • Audit logging_write_audit_entries() writes sanitised tool call records to audit_log after each interaction
  • Inline approval flow — after streaming, scans tool call outputs for sentinel tokens ([APPROVAL_PENDING:<uuid>]); edits the sent message to add Confirm/Cancel/Edit inline keyboard buttons. CallbackQueryHandler handles button taps — Confirm executes via ACTION_REGISTRY, Cancel marks cancelled, Edit prompts for a correction and re-runs the agent. Edit state is tracked in context.user_data.
  • Post-conversation background tasks — after audit logging, fires asyncio.create_task() calls for: (1) extract_facts_from_exchange() to immediately extract explicitly stated user facts; (2) detect_and_save_capability_gap() to log routing/capability gaps for the self-improvement loop. Both are fire-and-forget. (The earlier 30-min summarise_session was removed 2026-05-09 — topic summaries now come only from the nightly reflection.) See Memory & Reflection.

Browser Tools (app/tools/browser.py)

Playwright-based headless Chromium. Features:

  • Chrome user agent spoofing to reduce bot detection
  • Asset blocking (images, fonts, media) for faster loads
  • Semantic content extraction — targets <main>, <article>, [role="main"] before falling back to <body>
  • Headline extraction from h1/h2/h3 in the content zone
  • URL security: blocks file://, localhost, and private IPs; optional BROWSER_ALLOWED_DOMAINS allowlist
  • Graceful degradation if Playwright is not installed

GitHub Tools (app/tools/github.py)

PyGitHub-based integration. All API calls are synchronous and wrapped with asyncio.to_thread(). Gracefully returns a clear message if GITHUB_TOKEN is not configured or PyGitHub is not installed.

Embedding (app/tools/embedding.py)

embed_text(text, api_key) → list[float] | None — calls the Gemini gemini-embedding-001 REST API directly via httpx (3072 dimensions). Called internally by both storage adapters after each write to notes, interactions, and read_later. Returns None when GOOGLE_API_KEY is not set or the API call fails — never raises. Falls back to gemini-embedding-2-preview if the primary model returns 404.

Slack Tools (app/tools/slack.py)

slack-sdk WebClient integration. All API calls are synchronous and wrapped with asyncio.to_thread(). Username and channel name resolution are cached within each tool call. Gracefully returns a clear message if SLACK_BOT_TOKEN is not configured or slack-sdk is not installed. Requires the bot to be invited to any channel it reads or posts in.

Intent Router (app/tools/router.py + app/routing/agents.py)

Keyword-based intent classification and domain agent dispatch. Runs on every text message (Telegram and WhatsApp) before the agent is invoked.

Taxonomy (TOOL_CATEGORIES) — maps thirteen domain names to their canonical tool lists: utility, memory, email, calendar, github, news, slack, jira, drive, meetings, diagnostics, health, database. TOOL_TO_CATEGORY is the reverse lookup used in tests.

classify_intent(message, context_hint=None) — three-stage classifier:

  1. Keyword match (microseconds, zero LLM cost) — checks the message against per-domain keyword lists. Returns matched categories with "utility" always added when any domain hits. keyword_hits per category is logged to the Langfuse span.
  2. Semantic fallback with confidence bands — when keywords find nothing, embed the message against pre-computed per-domain anchor embeddings (cached lazily). Returns top-K based on cosine score:
  3. ≥ 0.65 (high confidence) → single best domain
  4. 0.45 – 0.65 (ambiguous) → top-K within 0.10 of best, capped at 3 → composed_agent so the LLM picks
  5. < 0.45 → empty (no confident match) Each routing decision logs semantic_top3: [{domain, score}] so misroutes are debuggable from traces.
  6. Context-hint inheritance — when both keyword and semantic miss, an optional context_hint: set[str] (single non-utility domain from the previous turn) is inherited so short follow-ups like "anything else?" continue in the same domain agent.

The three-band semantic stage replaced an earlier binary ≥ 0.60 cutoff that caused misroute hallucinations: borderline queries scored just below threshold and fell through to the wrong agent, leading the model to claim "I don't have a tool" for capabilities that existed.

select_agent(categories) — dispatches to the right agent: - Empty set → full_agent (conversational fallback / no confident match) - {"utility"} only → utility_agent - One non-utility domain (e.g. {"email", "utility"}) → email_agent - Multiple non-utility domains → _get_composed_agent(frozenset(categories)) — a dynamically built agent whose tool set is the union of all matched domains' tools, cached with lru_cache(maxsize=32). With the three-band semantic stage above, this path is used not just for explicit cross-domain requests but also for ambiguous single-domain queries where multiple anchors scored within 0.10 of each other — the LLM picks across the candidates rather than the router guessing wrong.

Domain agents — 13 interactive Agent instances + 1 briefing_agent, all built once at import time in app/routing/agents.py. Each carries only its relevant tool subset, reducing the tool list the LLM sees. They share build_system_prompt and the active_categories focus hint injected into AgentDeps per request. Several universal meta-toolsets are registered on every domain agent via _UTILITY_FNS — they are meta, not domain-scoped, because the model often reaches for them from non-matching contexts:

  • Source introspectionread_source_file, grep_source ("do I have a tool for X?").
  • Raw DB opsdb_query (read-only), db_execute (approval-gated). database / diagnostics exist as focused-attention modes for the same tools.
  • Runtime config + self-managementset_runtime_config / get/list/clear_runtime_config, railway_set_env / railway_redeploy (approval-gated, action_type config / deployment), and read-only railway_deployment_status / railway_deploy_logs / diagnose_self. See Self-Management Subsystem.
  • LLM trace introspectionlangfuse_recent_traces / langfuse_trace_details (Logfire's read tools live on diagnostics_agent).

So "switch your model", "redeploy", or "any errors lately?" route to the right universal tool from any domain rather than being declined.

The jira_agent handles Jira queries (tickets, sprints, projects) via the six jira_* tools. Requires JIRA_BASE_URL, JIRA_EMAIL, and JIRA_API_TOKEN.

The drive_agent handles Google Drive queries (search, read, list recent, transcripts) via the five Drive MCP tools. Supports personal (GOOGLE_DRIVE_REFRESH_TOKEN) and work (GOOGLE_DRIVE_WORK_REFRESH_TOKEN) accounts.

The meetings_agent handles meeting intelligence queries via get_meeting_insights and list_recent_meetings skills (from app/skills/meeting_notes.py), plus Drive MCP tools and calendar MCP tools for context. Intent keywords include "recap the meeting", "notes from the", "what came out of", and "standup notes".

The diagnostics_agent handles self-diagnosis queries (errors, performance, loop health) via the three Logfire MCP tools. Its intent keywords use specific multi-word phrases (e.g. "your errors", "did the briefing run") to avoid false positives on common words like "error".

The health_agent (Spec 010) handles wearable / Samsung Watch queries via four read-only tools (get_recent_health, get_sleep_summary, get_hrv_trend, get_health_snapshot). Routed by intent keywords like "how did i sleep", "my hrv", "resting heart rate", "step count". Data is ingested separately by the Android bridge — the agent only reads.

The dba_agent (database category) is structurally identical to utility_agent — its value is the focused system-prompt mode (_DB_OPS_RULES, plus a live "## Live database schema" section introspected via describe_schema() rather than a hand-maintained appendix) for SQL-flavored requests. The two raw-DB tools themselves are universal on every domain agent.

Planning Layer (app/planning/) — Spec 008

A multi-step decomposition pass that runs before intent routing on Telegram messages, so requests like "check my email, then post a summary in #standup, then add a follow-up task" execute as a sequenced plan instead of one ambiguous tool dance.

  • planner.pyclassify_complexity(text) is a regex pre-filter (≥30 chars + connectives like "and then", "after that", "also send") that gates the LLM call. generate_plan(text, deps) runs a tool-less _planner_agent: Agent[None, ExecutionPlan]. Returns None when needs_planning=False or len(steps) < 2 — the message then falls through to the normal single-step flow.
  • executor.pyformat_plan_preview(plan) renders the plan as a Telegram approval message. execute_plan(plan, deps, send_progress) runs steps sequentially; each step is independently routed via classify_intent + select_agent, with truncated outputs of prior steps threaded forward as a scratchpad so later steps can build on earlier results. A live / progress checklist is edited into the same message after each step.
  • Resume on failure — when a step throws, execute_plan saves a PendingAction(action_type="plan_resume") with a PlanResumePayload(plan, remaining_steps, scratchpad, failed_step, failure_reason) and stops. Confirm restarts from remaining_steps; Cancel aborts cleanly.

The plan itself is always behind the standard inline approval keyboard, even when none of the constituent tools would normally be gated. Full sequence: Multi-step plan flow.

Observability — Logfire + Langfuse + W&B Weave (app/observability.py) — Spec 009

Up to three observability stacks share a single OpenTelemetry tracer provider and run side-by-side without duplicating instrumentation:

graph LR
    PA[Pydantic AI agent.run / run_stream] --> OTEL[OTEL TracerProvider]
    FAST[FastAPI requests] --> OTEL
    OTEL --> LF[Logfire processor\ninfra · FastAPI routes · loop spans]
    OTEL --> LFC[Langfuse processor\nLLM generations · sessions · scores]
    OTEL --> WB[Weave OTLP exporter\nagent + tool spans · optional]
    LF --> LFCloud[(Logfire Cloud)]
    LFC --> LFCloud2[(Langfuse Cloud)]
    WB --> WBCloud[(W&B Weave Cloud)]
    LF -. self-diagnosis MCP .-> AG[Agent — diagnostics_agent]

init_observability(settings, app) is called once from create_app() before any agent runs. It first installs an SDK TracerProvider as the OTEL global (_ensure_sdk_tracer_provider, so processors aren't silently swallowed by the default proxy provider), then configures Logfire (logfire.configure() + instrument_fastapi) when LOGFIRE_TOKEN is set, and calls Agent.instrument_all() from Pydantic AI when Langfuse is enabled so spans land on the global provider where all processors observe them. (When only Logfire is enabled, logfire.instrument_pydantic_ai() is used instead — it routes through Logfire's internal tracer, which Langfuse would not see.)

W&B Weave (optional, off by default). When WEAVE_ENABLED=true (plus WEAVE_API_KEY + WEAVE_PROJECT as <entity>/<project>), _init_weave() does two things: calls weave.init() so the Weave SDK auto-patches supported LLM clients, and attaches an OTLP BatchSpanProcessor to the shared provider that ships Pydantic AI spans to https://trace.wandb.ai/otel/v1/traces. So agent + tool traces land in the Weave UI alongside Logfire and Langfuse — all three observe the same OTEL spans. Weave init fails soft (a warning, never a boot failure) if the key/project are missing.

Per-turn trace grouping. langfuse_root_span(name, session_id, user_id, …) is an async context manager wrapping the whole turn so every agent.run() inside it nests under one Langfuse trace instead of producing one trace per call.

Fire-and-forget nesting. spawn_with_otel_context(coro) is an asyncio.create_task() variant that propagates the current OTEL span context, so post-conversation background work (fact extraction, capability-gap detection, delegation orchestration, the self-redeploy marker) nests under the originating turn trace instead of appearing as an orphan. Genuinely independent work (scheduled loops) keeps bare create_task().

Stage-level spans inside a turn. Pydantic AI's Agent.instrument_all() covers LLM generations + tool calls automatically, but the preprocessing and routing stages would otherwise be invisible. The following functions are decorated with Langfuse @observe so they appear as nested children of telegram.turn:

  • preprocess.sttapp.tools.voice.transcribe_audio (audio bytes → transcript)
  • preprocess.visionapp.tools.vision.analyze_image (image bytes → description)
  • embed_textapp.tools.embedding.embed_text (sanitized: only chars/dims/model captured, never the vector or api_key)
  • routing.classify_intentapp.tools.router.classify_intent (output: {categories, method} where method is keyword | semantic | context_hint | none)
  • context_injectionapp.utils.message_utils.inject_context (output: per-layer counts + 1500-char preview of what was prepended to the user message)
  • message_history_retrievalapp.utils.message_utils.fetch_message_history (metadata: {mode: chronological|semantic, recent_count, semantic_count, semantic_enabled}; mode=error indicates the semantic path failed and chronological was used)

update_current_span(input=, output=, metadata=) in observability.py is the helper they use to attach sanitized payloads; it degrades to a no-op when Langfuse is disabled.

Asynchronous trace scoring. When a Confirm/Cancel/Edit happens minutes after the original turn closed, score_trace(action.trace_id, name, value) attaches a quality score to the original trace. PendingAction.trace_id is captured at gate time via OTEL context (_current_otel_trace_id() in app/approval.py) precisely so this delayed score can land. Three score names are emitted today: user_approval (Confirm 1.0 / Cancel 0.0), user_edit (Edit tap), and agent_error (in-flight exception, written via score_current_trace).

Langfuse-managed Prompts (app/prompts.py + prompts.lock.json) — Spec 009

Nine system prompts can be tuned in the Langfuse UI without redeploying — get_prompt(name, fallback) returns the production-labeled Langfuse version when reachable, otherwise the code constant. prompts.lock.json (repo root) pins each constant's sha256, and check_drift() runs at startup to warn when code has been edited without being pushed. The sync workflow lives in scripts/sync_prompts.py; full prompt list and CLI usage in Tools → Langfuse-managed Prompts.

Briefing Agent (app/routing/agents.pybriefing_agent)

A dedicated agent used exclusively by the scheduled briefing loops (morning, evening, weekly). It is never invoked for interactive user messages.

Tool set: memory (tasks, notes, reminders) + utility (weather, datetime, search, URL, maps) + all email MCP tools + all calendar MCP tools + Microsoft To Do (todo_*) MCP tools.

Why separate? The interactive agents are designed around one domain. The morning briefing legitimately needs email + calendar + tasks + weather simultaneously — a cross-domain request that would normally fall back to the full 45-tool full_agent. briefing_agent gives it a pre-composed, focused tool set without the overhead of GitHub, news, or browser tools that are irrelevant for briefings.

Skill Registry (app/skills/)

File-drop plugin architecture. Any .py file placed in app/skills/ with a function decorated @skill is auto-discovered and registered as an agent tool on startup via load_skills(agent). Modules starting with _ are ignored. Loading is idempotent — calling load_skills() twice registers nothing new.

Built-in skills: - read_later.py — Content Curator (save_to_read_later, list_read_later, delete_read_later, get_read_later_digest) - travel_briefing.pyget_travel_briefing — on-demand travel summary (weather + maps/transit + LLM synthesis) - cv.pystore_cv, get_cv — parse and persist CV as structured user facts for job application workflows - research.pydeep_research — multi-step web research: generates sub-questions, fetches and synthesises sources in parallel, saves result as a "Research: <topic>" note - meeting_notes.pyget_meeting_insights, list_recent_meetings — extract decisions/actions/key points from Drive transcripts via mini-model; save as notes. Source-agnostic: _SOURCE_REGISTRY maps source name to fetcher, allowing Teams or Notion to be wired in later.

Self-Improvement Loop (app/tools/source.py + app/memory/capability_gaps.py + app/skills/proposer.py)

A four-stage pipeline that lets Kwasi inspect its own deployed code, log its own failures, draft new skills from those failures, and (with approval) activate them.

flowchart LR
    subgraph turn[Each Telegram turn]
        A[agent response] --> B[regex prefilter]
        B -->|negation match| C[mini-model classify]
        C --> D[grep_source verify capability]
        D -->|no source matches| E[gap]
        D -->|matches found| F[available]
    end
    E & F -->|fire-and-forget| G[(capability_gaps)]

    subgraph propose[Propose flow]
        H[propose_skill description/gap_id] --> I[primary model drafts .py]
        I --> J[static validate: compile + AST + structure]
        J --> K[(proposed_skills, status=draft)]
    end

    subgraph activate[Activate flow]
        L[activate_proposed_skill] --> M[approval_gate Telegram]
        M -->|confirm| N[re-validate]
        N --> O[write app/skills/name.py]
        O --> P[hot_reload_new_skill onto full + 9 domain agents]
        P --> Q[skill callable, status=activated]
    end

Stage 1 — Source introspection. read_source_file(path, offset, limit) and grep_source(pattern, path_glob) in app/tools/source.py let any agent read the deployed repo. REPO_ROOT auto-detects via pyproject.toml walk-up so it works identically in local dev, CLI, and Railway containers. Paths resolve under REPO_ROOT and reject escapes. In production, tests/, specs/, docs/, .env* are excluded by .dockerignore. Registered in _UTILITY_FNS so every domain agent has them — introspection is meta, not domain-scoped.

Stage 2 — Capability-gap detection. detect_and_save_capability_gap() runs fire-and-forget after every Telegram turn (alongside extract_facts_from_exchange). Pipeline: regex prefilter on the response (negation patterns like "I don't have a tool", "as a language model") → mini-model decline classifier → grep_source for the suspected capability → classify as gap (no source matches → real capability hole, candidate for a new skill) or available (source matches found → capability exists; the decline was a routing or prompt-level failure). Stored in capability_gaps with status lifecycle (open | resolved | dismissed). Surfaced via get_capability_gap_digest on the diagnostics agent.

Stage 3 — Skill proposer. propose_skill(description, gap_id?) calls the primary model with a skill template + the originating gap context (if any), receives a complete .py file, then runs static validation:

  • compile() — syntax check
  • AST walk — rejects imports of subprocess, shutil, ctypes, marshal, pickle, socket, smtplib, ftplib, pty; calls of os.system, os.popen, os.exec*, os.fork, os.spawn; the eval() and exec() builtins
  • Structure check — exactly one @skill-decorated async function, ctx as first param, non-empty docstring, name matches expected slug

Passes save as ProposedSkill(status='draft', validation_status='passed') — code lives in the DB, never on disk. Failures save with validation_status='failed' and the error report so the user can inspect.

Stage 4 — Activation (approval-gated). activate_proposed_skill(skill_id) is the only path that writes to disk. It re-validates first (defense if the DB row was tampered with after proposal), goes through approval_gate(action_type='skill') for the Telegram inline keyboard, on Confirm writes app/skills/{name}.py (refusing to overwrite), then calls hot_reload_new_skill(name, agents) in app/skills/__init__.py to register the new function on the full agent + 9 domain agents that include skills. The activation response includes an explicit "ephemeral until committed to git" warning — the file is on the running container's disk only and disappears on next Railway redeploy. Automated PyGitHub PR persistence is planned but not shipped.

Why no E2B sandbox for code testing: the static-validation gauntlet plus mandatory human approval on Telegram covers the same risk surface E2B would address. Skills run in the same trust boundary as the rest of the app — the meaningful safety control is the human review gate, not a runtime sandbox. E2B remains useful for the existing execute_python tool (untrusted code Kwasi runs at user request), just not for skill validation.

System Prompt Architecture

build_system_prompt() assembles the agent's full context from three categories of sections, always in this order:

graph TD
    subgraph Static["Static Instruction Sections (identical across requests — cache-friendly)"]
        direction LR
        A1[_PERSONA\nVoice · banned openers · cultural context]
        A2[_DECISION_MAKING\nWhen to act vs. confirm · tool priority]
        A3[_CONTEXT_USAGE\nHow to apply the profile silently]
        A4[_MEMORY_INSTRUCTIONS\nFacts store · search priority order]
        A5[_JOURNAL_INSTRUCTIONS\nWhen to call save_journal_entry]
        A6[_TONE_CALIBRATION\nSituational energy matching]
        A7[_OUTPUT_FORMAT\nMobile formatting · brevity rules]
        A8[_ERROR_HANDLING\nRetry-first · plain-English failure summaries]
        A9[_TOOLS_AND_ACCOUNTS\nTool guidance scoped to active domain]
        A10[_TOOL_USAGE_EXAMPLES\nExample flows]
        A11[_APPROVAL_INSTRUCTIONS\nSentinel handling · never show raw token]
        A12[_DOCS_REFERENCE\nSelf-introspection · anti-poisoning rules]
        A13[_DB_OPS_RULES\nRaw-SQL guardrails · read-live-schema rule]
        A14[_TOOL_DISCIPLINE\nWhen to reach for which tool]
        A15[_DELEGATION_RULES\nWhen to delegate · backend choice]
        A16[_SELF_CONFIG_RULES\nSelf-config / Railway control bounds]
        A17[focus_section\nActive domain focus hint if scoped]
    end

    subgraph Dynamic["Dynamic Sections (fetched from storage per request)"]
        direction LR
        B2[learnings_section\nagent_learnings table status=active\nBehavioral Guidelines header]
        B3[context_section\ncontext table — narrative profile\ncapped at 4,800 chars ≈ 1,200 tokens]
        B4[schema_section\nLive database schema via describe_schema()\nintrospected · process-cached · fail-safe to empty]
        B5[web_logins_section\nvault service names + domains never secrets\nStored web logins header · dropped if vault off]
    end

    Static --> Prompt[System Prompt\n~3,000–6,000 tokens]
    Dynamic --> Prompt

Key design constraints:

  • Datetime is NOT in the system prompt. It is prepended to the user turn (as [Thursday, April 24, 2026 — 09:15 AM Europe/Paris]) so the stable system prompt sections form a consistent prefix across requests and qualify for Gemini's implicit prompt cache. A changed datetime would bust the cache on every request.
  • Profile is capped at 4,800 chars (~1,200 tokens). The nightly reflection profile grows over time — the cap prevents unbounded system prompt growth while keeping the most recent content intact.
  • Tool sections are scoped. When active_categories is set (i.e. a domain agent is running), only the relevant _DOMAIN_TOOL_SECTIONS entries are included — cutting ~600–1,000 tokens of irrelevant tool guidance from each domain-routed request.
  • Briefing agent uses build_briefing_system_prompt(), which extends the base prompt by appending _MORNING_BRIEFING, _EVENING_RECAP, _WEEKLY_RECAP, _WEEKLY_PREP, and _MEETING_PREP templates. These arrive at the system level so the model's structural understanding of briefing formats is cached, not re-parsed per call.

Personality directives (_PERSONA + _ERROR_HANDLING):

Two sections encode Kwasi's distinctive character:

  • _PERSONA — Direct, warm-but-not-performative voice. Natural Ghanaian cultural familiarity. Light humor where it lands. Banned openers list: "I'm sorry", "Unfortunately", "I apologise", "I wasn't able to", "I'm afraid" — the model must reframe as what it CAN do.
  • _ERROR_HANDLING — Retry-first directive: when a tool fails, attempt at least one alternative (different tool, rephrased query, decomposed sub-task) before reporting failure. When all paths are exhausted: structured ≤3-bullet plain-English summary of what was tried + one concrete next step. No raw error codes or exception names in any user-facing response.

Self-Management Subsystem (app/tools/config_registry.py)

Kwasi can change select settings about itself and manage its own Railway deployment, from Telegram. The design centres on one principle: a single declarative registry is the source of truth, and everything else derives from it.

Config registry. CONFIG_REGISTRY in app/tools/config_registry.py is a list of ConfigField(field, env, validator, coerce, runtime_settable, env_settable). Both allowlists derive from it, so they can never drift:

  • ALLOWED_CONFIG_KEYS = {f.field: (f.validator, f.coerce) for f in CONFIG_REGISTRY if f.runtime_settable} — consumed by runtime_config.py.
  • RAILWAY_ENV_ALLOWLIST = {f.env for f in CONFIG_REGISTRY if f.env_settable} — consumed by railway.py.

Adding a self-manageable setting is one registry row. An independent deny-substring gate (TOKEN/SECRET/KEY/PASSWORD/DATABASE_URL/URL/CREDENTIAL) in is_allowed_env blocks secret-shaped variables regardless of the allowlist — so RAILWAY_API_TOKEN and friends can never be agent-set. Some fields are env_settable but not runtime_settable (e.g. weave_enabled, read only at startup) — the registry encodes that honestly so a runtime override can't silently no-op.

Two control planes.

Tool Mechanism Effect
set_runtime_config(key, value) Mutates deps.settings in memory and persists to the context KV at system:config:<field>. load_overrides(deps) re-applies persisted overrides at boot. Instant — next message, no restart. Survives reboots (override > env).
railway_set_env(name, value, redeploy=True) variableUpsert via the Railway GraphQL API (backboard.railway.com/graphql/v2). Bakes into the real environment; redeploys (~1–2 min). Use for boot-only settings or to make a change permanent.
railway_redeploy() deploymentRedeploy. Restart with no config change.

Both mutating planes are approval-gated (action_type config / deployment). railway_set_env runs the field's registry validator (validate_env_value) before upsert, so a bad MODEL_NAME can't bake in and crash-loop the new container. Read-only tools — railway_deployment_status, railway_deploy_logs, diagnose_self — are ungated. diagnose_self bundles a health snapshot: effective config (with override marks), recent Logfire exceptions, background-loop heartbeats (read from fixed system:<loop>:heartbeat keys), and Railway deploy status.

Post-redeploy confirmation (app/self_redeploy.py). When the agent redeploys itself, the container dies mid-conversation — so the loop is closed across the restart:

  1. After a successful Railway call, the executor writes a system:pending_self_redeploy marker (chat to notify, expected config, and prev_deploy_id captured at trigger time). Writing after success means a failed call leaves no orphan.
  2. On the next boot, the lifespan calls confirm_pending_self_redeploy(deps, telegram_app) (right after recover_orphan_delegations, so load_overrides has already applied the new effective config). It compares the live latest-deployment id against prev_deploy_id — if unchanged, no redeploy actually happened (failed build / no-op value), so it clears the marker silently. Otherwise it verifies the expected config took effect and posts a proactive ✅ (or ⚠️ on mismatch / unusually long restart), then consumes the marker (cleared before delivery → at-most-once, no duplicate notify on a boot loop).
  3. If the build fails, the container never reaches step 2 and Railway keeps the previous healthy deploy serving — so the agent stays reachable and diagnose_self surfaces the stuck marker + FAILED status.

Task Delegation (app/delegate.py)

delegate_to_coding_agent(task, backend?, max_minutes?) spawns an external coding agent inside an E2B sandbox for complex tasks no existing tool covers. Three backends:

  • opencode (default) — Gemini-backed, dramatically cheaper, no per-tool-use telemetry. Uses the opencode E2B template; runs opencode run --model {DELEGATION_OPENCODE_MODEL} <task> with GEMINI_API_KEY + GOOGLE_GENERATIVE_AI_API_KEY set; captures plain-text stdout as the final answer.
  • claude_code — Anthropic-backed, more capable for hard agentic coding. Uses the claude template + --output-format stream-json --verbose; full JSONL parsing for tool-use counts, token usage, total cost, and mid-run cost-cap enforcement. Auth is ANTHROPIC_API_KEY only (subscription OAuth is not used, for compliance).
  • web_task — DOM-driven browser automation (browser-use + Gemini) for JS/SPA scraping + multi-step form-fills. Reached via a dedicated delegate_web_task(task, credential?, max_minutes?) tool (not a backend= arg, so the model routes browser intents correctly). Needs a prebuilt WEB_TASK_TEMPLATE E2B image. With credential="<name>" it logs into a real account using a credential-vault entry: the tool references the cred by name only; run_delegation decrypts it at run time into browser-use domain-scoped sensitive_data (placeholders the model never resolves) + a pyotp TOTP code, written to the sandbox via files.write — the LLM never sees the plaintext.

Approval-gated (action_type="delegation"); the executor (shared by both tools) creates a Delegation row, spawns asyncio.create_task(run_delegation(...)), and returns a "working on it" sentinel — results land via a separate Telegram message when done. Image artifacts come back as Telegram photos. Caps: delegation_max_per_day (5), delegation_max_minutes (10), delegation_max_cost_usd (Claude Code path only). Container-restart recovery (recover_orphan_delegations) marks running rows as failed and notifies. Each delegation persists a Note tagged #delegation for searchable history.

Credential vault (app/vault.py): Fernet-encrypted website logins in the vault_credentials table (master key from VAULT_MASTER_KEY env only — never in the DB, never agent-settable). Enrolled out of band via scripts/vault_add.py; build_system_prompt surfaces the enrolled service names (never secrets) so the model knows which delegate_web_task(credential=...) names exist. db_query refuses the vault_credentials and context tables.

Inbound PR Review (app/pr_review.py)

When Lawrence is tagged as a reviewer on a PR in an allowlisted org, Kwasi fetches it via PyGithub (get_pr_full — title + body + per-file diff, capped 8 KB/file & 60 KB total) and runs a tool-less mini-model Agent[None, PRReview] to produce structured summary / risks / questions / description_check / test_observation / verdict. Output is rendered as Telegram markdown by Python (deterministic, testable) and persisted as a Note (#pr-review #<owner> #<repo>) for searchability. Discovery is dual-path — Gmail from:notifications@github.com "review requested" and GitHub's is:pr is:open review-requested:<login> — deduped by (owner, repo, number). Org-only allowlist via PR_REVIEW_ORGS (case-insensitive, fail-closed when empty). Three callers share one pipeline: the _pr_review_loop (daily at PR_REVIEW_TIME), and two manual agent tools github_review_pr / github_pending_pr_reviews on github_agent. Per-PR dedup via system:pr_review:<owner>/<repo>#<N> context keys with an 18-hour TTL. Read-only by design — no writing back to GitHub, no E2B sandbox.

Dashboard (app/routers/dashboard.py)

FastAPI router mounted at /dashboard/. Four views (interactions, audit log, scheduled tasks, memory profile). Protected by DASHBOARD_SECRET — disabled entirely if the env var is not set. Auth via X-Dashboard-Secret header or ?token= query param.

MCP Tools (app/interfaces/mcp/client.py)

Gmail, Outlook email, Outlook calendar, and Microsoft To Do tools are synchronous (Google/MS APIs). They are wrapped with asyncio.to_thread() to avoid blocking the event loop. get_mcp_tools() returns all wrappers; agents.py filters them into _EMAIL_MCP, _CALENDAR_MCP, and _TODO_MCP sets for domain-specific routing.

gmail_read_email_wrapper — fetches the complete body of a Gmail message (up to 20,000 chars). Walks the MIME part tree recursively, prefers text/plain, strips HTML from text/html fallback. The search_emails_wrapper and summary tools only return a short snippet (~100–200 chars) — always follow up with gmail_read_email_wrapper when the full text is needed.

outlook_read_wrapper — fetches the complete body of an Outlook message (up to 20,000 chars). Strips HTML (Graph API returns HTML by default), includes From/Subject/Date fields.

Google Calendar shared calendarsgoogle_get_todays_schedule_wrapper and google_get_calendar_events_wrapper query all calendars the user has access to (primary, shared, subscribed) by calling calendarList().list() first. Each event is prefixed with [Calendar Name] in the output.

Microsoft To Do tools (todo_*) use the same OutlookClient._make_request pattern as Outlook email and calendar. They require Tasks.ReadWrite scope on the Outlook refresh token.

Memory (app/memory/)

  • ports.pyStoragePort protocol + all data models (Interaction, Note, Task, Reminder, UserContext, UserFact, ScheduledTask, AuditEntry, ReadLaterItem, NewsTopic, SeenStory, AlertRule, PendingAction, PendingIntention, AgentLearning, JournalEntry, SemanticSearchResult). The HealthSample Pydantic model lives in app/health/models.py.
  • adapters/sqlite.py — SQLite implementation (local dev, :memory: for tests). Tables: interactions, notes, tasks, reminders, context, user_facts, scheduled_tasks, audit_log, read_later, news_topics, seen_news, alert_rules, pending_actions, pending_intentions, agent_learnings, journal_entries, health_samples.
  • adapters/postgres.py — asyncpg connection pool (Railway production). Same tables as SQLite. health_samples.value is JSONB in Postgres and JSON-encoded TEXT in SQLite.
  • Both adapters embed new records on write via embed_text() (Gemini gemini-embedding-001, best-effort: row INSERTed first, embedding awaited and UPDATEd onto the row in a swallowed try/except — so the row persists even if embedding fails) and implement semantic_search() — Python cosine in SQLite, pgvector <=> in Postgres.
  • logger.pylog_interaction() helper used by all handlers
  • post_conversation.pyextract_facts_from_exchange() (immediate post-message fact extraction); fire-and-forget via asyncio.create_task(). The earlier 30-min summarise_session() was removed 2026-05-09; topic summaries now come only from the nightly reflection.
  • reflection.pyReflectionService — nightly LLM pass producing four outputs: updated UserContext narrative profile, auto-extracted UserFact records, new PendingIntention records, and AgentLearning corrections

Background Loops

All background loops run as asyncio tasks within the FastAPI lifespan.

Loop Trigger condition Schedule
Reflection REFLECTION_SECRET set 2 AM UTC daily
Reminders Telegram token configured Every 5 minutes
Daily Briefing BRIEFING_CHAT_ID set BRIEFING_TIME UTC daily (default 08:00)
Evening Recap BRIEFING_CHAT_ID set EVENING_RECAP_TIME local (default 21:00)
Weekly Recap BRIEFING_CHAT_ID set WEEKLY_RECAP_DAY/WEEKLY_RECAP_TIME local (default Friday 18:00)
Weekly Prep BRIEFING_CHAT_ID set WEEKLY_PREP_DAY/WEEKLY_PREP_TIME local (default Sunday 18:00)
Read-Later Digest BRIEFING_CHAT_ID set READ_LATER_DIGEST_DAY/READ_LATER_DIGEST_TIME local (default Saturday 09:00)
Journal Digest BRIEFING_CHAT_ID set JOURNAL_DIGEST_DAY/JOURNAL_DIGEST_TIME local (default Sunday 19:00)
Email Intelligence BRIEFING_CHAT_ID + email creds EMAIL_INTEL_TIME local (default 09:30); set "" to disable
User Scheduled Tasks BRIEFING_CHAT_ID set Every 60 seconds (croniter-based evaluation)
Alert Rules + Intentions BRIEFING_CHAT_ID set Every 5 minutes
Meeting Prep BRIEFING_CHAT_ID set Every 5 minutes (fires ≈MEETING_PREP_LEAD_MINUTES before each meeting)
Meeting Follow-up BRIEFING_CHAT_ID + GMAIL_WORK_REFRESH_TOKEN Every MEETING_FOLLOWUP_POLL_MINUTES (default 30) — polls work Gmail for Gemini meeting notes → Telegram card
PR Review BRIEFING_CHAT_ID + GITHUB_TOKEN + PR_REVIEW_ORGS PR_REVIEW_TIME local (default 09:00); set "" to disable
Approval Expiry Telegram bot active Every 5 minutes

Key Design Decisions

Why Pydantic AI?

Pydantic AI handles the agent loop (tool calls → LLM → tool calls → ...), type-safe tool definitions, streaming, and model switching. The Agent class abstracts over different LLM providers — switching from Gemini to Claude is a single env var change.

Why polling instead of webhook for Telegram?

Long-polling (updater.start_polling) is simpler to deploy on Railway — no public URL needed for the bot itself, no TLS certificate management. The WhatsApp webhook still uses a proper HTTP endpoint since the Meta platform requires it.

Why asyncio tasks for background loops?

Avoids the complexity of a separate worker process, Celery, or a job scheduler. Since the app is single-user and the loops are low-frequency (5 min / daily / 60 sec), asyncio tasks inside the FastAPI lifespan are sufficient and operationally simple.

Why MCP for email/calendar?

MCP (Model Context Protocol via fastmcp) provides a clean abstraction for external tool servers. Gmail and Outlook have separate auth flows (OAuth2 with Google vs. MSAL with Microsoft) — MCP isolates this complexity from the agent.

Why native PyGitHub instead of the official GitHub MCP server?

The existing "MCP" tools in this project are local Python wrappers, not external server processes. PyGitHub follows the same pattern as other tools (weather, search, news) — in-process, async-wrapped, consistent output formatting. The official GitHub MCP server is a Go binary designed for IDE integrations, not embedded service use.

Why a file-drop skill registry?

Allows extending the agent's capabilities without touching agent.py. New skills can be developed, tested, and dropped in independently. The registry is idempotent and fail-safe — a broken skill file is skipped with a logged error.

Why keyword-based intent routing instead of LLM-based routing?

The routing step runs on every message, so it must have near-zero latency and zero cost. Keyword matching is deterministic, testable, and trivially debuggable — if it misroutes a message, you can see exactly why by reading the keyword list. An LLM-based router would add a full extra inference step per message, significantly increasing latency and cost for a single-user assistant. The keyword approach covers ~95% of real messages correctly; the remaining 5% fall through to the full agent, which is always the safe fallback.

Why a sentinel token instead of pausing the agent mid-run for approval?

The agent runs to completion — it always produces a text response. Pausing mid-run would require a resumable coroutine, persistent agent state, and a way to re-inject the approval result back into an in-progress LLM call. The sentinel pattern avoids all of this: the agent sees "[APPROVAL_PENDING:uuid]" as a normal tool result, writes a response like "your action is pending approval", and exits. handle_message post-processes the result synchronously. On Confirm, a simple direct executor call (no LLM re-run) performs the action. This keeps the agent loop stateless and the approval flow outside the LLM's critical path.

Why a separate app/approval.py module?

approval.py imports only from app/memory/ports — it has no knowledge of agent.py, bot.py, or client.py. This breaks the potential circular import chain: agent.py imports approval.py, bot.py imports approval.py, and client.py imports approval.py — but none of these files import each other. If the gate lived in agent.py or bot.py, it would create a cycle.

Why domain agents instead of a single agent with all tools?

Reducing the tool list the LLM sees for each request reduces token usage per call and improves model focus — a model with 8 tools performs more reliably than one with 50. Domain agents are built once at import time (zero per-request cost), share the same system prompt function, and fall back transparently to the full agent for anything ambiguous. The full agent is always the safety net.

Why both Logfire and Langfuse?

Different jobs. Logfire owns infrastructure and request-shape tracing — FastAPI routes, background-loop ticks, exceptions, latency by file path. The diagnostics_agent uses it via Logfire's MCP read tool to answer "did the briefing run?" or "which tool is slowest?". Langfuse owns the LLM layer — per-generation token usage and cost, prompt versions, session/user grouping, trace-level quality scores tied to user actions (Confirm/Cancel/Edit). W&B Weave is an optional third backend (WEAVE_ENABLED) for teams already on the W&B ecosystem who want agent traces and evaluations in Weave. All processors hang off the same OTEL tracer provider in app/observability.py, so Pydantic AI is instrumented once and spans flow to every enabled backend without duplication. Any one can be disabled (skip its env vars) and the others keep working.

Why a push model for health data instead of a server-side polling loop?

Neither Samsung Health nor Google Health Connect exposes a cloud / server-side REST API — both store data on-device, encrypted, behind explicit user consent. There is no OAuth Kwasi could call from FastAPI to pull samples. Two options remained: (a) integrate a paid third-party aggregator (Terra/Rook/Thryve, ~$400/mo minimum) which itself still requires an Android piece, or (b) write a tiny sideloaded Android bridge (bridge-android/) that reads Health Connect locally and POSTs to /health/ingest. The bridge is one Kotlin Gradle project, ~600 LOC, and never goes through the Play Store — so we sidestep Google's sensitive-permissions review entirely. Per-metric watermarks in DataStore prevent re-sends and gaps across reboots; the server is idempotent on (metric_type, start_time, source_device) so retries are safe. Result: zero ongoing cost, full data ownership, and the rest of Kwasi's read path is identical to any other agent tool.

Keyword (LIKE / ILIKE) search is fast and good enough for exact recall. Semantic search complements it — it finds content by meaning when the user's words don't match the stored text (e.g. "anything about burnout" finding a note titled "work-life balance thoughts"). Rather than replacing keyword search, the agent uses both: keyword first (zero latency, no API call), semantic as a fallback or in parallel via find_everything. Embeddings are stored at write time so queries are instant. The embed_text call at write time costs one Gemini API call per saved record — negligible for a personal assistant.