Architecture¶
Module layout changed (2026-06 cleanup)
Behavior is unchanged, but the code was decomposed: agent tools → app/agent_tools/<domain>.py, background loops → app/background.py, HTTP routes → app/routers/api.py, Telegram helpers/media/commands split out of bot.py, prompt text → app/prompt_text.py, AgentDeps/build_deps → app/deps.py. Original modules re-export their public symbols. See the banner on the home page and CLAUDE.md → Recent Changes.
System Context¶
Zoom-out view of the runtime topology — who talks to Kwasi, where Kwasi lives, and every external service it depends on. Useful for "if X goes down, what breaks?" and "where does my data leave the container?"
%%{init: {"flowchart": {"defaultRenderer": "elk", "nodeSpacing": 25, "rankSpacing": 45, "wrappingWidth": 180, "elk": {"mergeEdges": true, "nodePlacementStrategy": "NETWORK_SIMPLEX"}}}}%%
graph TB
subgraph Edge["Users and Devices"]
direction LR
U[Lawrence\nTelegram client]
PHONE["Android phone\nHealth Connect bridge\n+ HTTP Shortcuts"]
WATCH[Samsung Watch /\nwearable]
DASHU[Dashboard browser]
end
subgraph Railway["Railway (Docker container)"]
KWASI["Kwasi\nFastAPI + Telegram bot\n+ background loops"]
PG[("PostgreSQL\n+ pgvector\nRailway managed service")]
CRON["Railway Cron\nPOST /reflect 2 AM UTC"]
KWASI <--> PG
CRON -.->|X-Reflection-Secret| KWASI
end
subgraph AI["AI / LLM Providers"]
direction LR
GEM["Google Gemini\nLLM · Vision · STT · embeddings"]
ANT["Anthropic Claude\noptional primary\n+ delegation backend"]
OAI["OpenAI\noptional primary"]
E2B["E2B Cloud\nsandboxed Python\n+ delegation VMs"]
end
subgraph Obs["Observability (Cloud)"]
direction LR
LF["Logfire Cloud\ninfra · FastAPI · loops"]
LFC["Langfuse Cloud\nLLM generations · prompts · scores"]
end
subgraph Personal["Google / Microsoft APIs"]
direction LR
GMAIL["Gmail API\npersonal + work OAuth2"]
GCAL["Google Calendar API"]
GDRIVE["Google Drive API\npersonal + work OAuth2"]
MSG["Microsoft Graph\nOutlook mail · calendar · To Do\nMSAL"]
end
subgraph Work["Messaging / Productivity APIs"]
direction LR
TG[Telegram BotAPI\nlong-polling]
SLACK[Slack Web API]
JIRA[Jira REST API]
GH[GitHub API\nPyGitHub]
end
subgraph Utility["Utility APIs"]
direction LR
TAV[Tavily\nweb search]
WX[WeatherAPI]
GMAP[Google Maps\ntransit + places]
YV[YouVersion API\nverse of the day]
end
U <--> TG
TG <--> KWASI
PHONE -->|"X-Health-Secret\nPOST /health/ingest"| KWASI
PHONE -->|"X-API-Token\nPOST /message"| KWASI
WATCH --> PHONE
DASHU -->|X-Dashboard-Secret| KWASI
KWASI --> GEM
KWASI -.-> ANT
KWASI -.-> OAI
KWASI --> E2B
KWASI -->|OTEL spans| LF
KWASI -->|OTEL spans + scores| LFC
KWASI --> GMAIL
KWASI --> GCAL
KWASI --> GDRIVE
KWASI --> MSG
KWASI -.-> YV
KWASI --> SLACK
KWASI --> JIRA
KWASI --> GH
KWASI --> TAV
KWASI --> WX
KWASI --> GMAP
Reading the diagram:
- Solid arrows are always-on dependencies. Dotted arrows are optional/feature-flagged (Anthropic, OpenAI, YouVersion verse-of-the-day, Railway Cron triggering
/reflect). - Railway boundary = one container + one Postgres service. Everything else is somebody else's cloud. The container is stateless aside from in-flight asyncio tasks; all durable state lives in Postgres.
- Three trust boundaries cross the Railway edge:
- Inbound from users/devices, gated by token headers:
X-Health-Secret(Android health bridge),X-API-Token(Android HTTP Shortcuts →POST /message),X-Dashboard-Secret(dashboard browser),X-Reflection-Secret(Railway Cron →POST /reflect). Telegram is gated byALLOWED_TELEGRAM_USER_IDSat handler level. - Outbound to AI providers, gated by API keys in env. Gemini is the only one that's load-bearing on the default config; the others are alternative primary models or alternative delegation backends.
- Outbound to personal/work APIs, gated by per-service OAuth refresh tokens (Google, Microsoft) or PATs (GitHub, Jira, Slack).
- What breaks if X is down:
- Gemini down → no LLM responses, no embeddings, no Vision/STT. Hard failure.
- Postgres down → no history, no facts, no approval gate. Hard failure.
- Telegram BotAPI down → can't receive or reply. Background loops still run (and queue notifications that will fail to send).
- Logfire or Langfuse down → observability gap, app keeps working (OTEL exporter retries in background).
- E2B down →
execute_pythonanddelegate_to_coding_agentfail; everything else fine. - Any single personal/work API down (Gmail, Slack, etc.) → that domain agent's tools return errors; routing falls through to the full agent which retries or summarises the failure to the user.
System Overview¶
%%{init: {"flowchart": {"defaultRenderer": "elk", "nodeSpacing": 30, "rankSpacing": 50, "wrappingWidth": 180}}}%%
graph TB
subgraph Interfaces
TG[Telegram\npython-telegram-bot polling]
WA[WhatsApp\nwebhook]
CLI[CLI\nlocal REPL]
DASH[Dashboard\n/dashboard/]
end
subgraph Planning["Planning Layer (app/planning/) — Spec 008"]
CC[classify_complexity\nregex pre-filter]
GP[generate_plan\nplanner agent]
EP[execute_plan\nstep loop · scratchpad threading]
end
subgraph Routing["Intent Routing (app/tools/router.py + app/routing/agents.py)"]
CI[classify_intent\nkeyword → semantic top-K → context_hint]
SA[select_agent\ndispatcher]
DA["Domain Agents (13)\nemail · calendar · memory · github · news\nslack · jira · drive · meetings\ndiagnostics · health · utility · database"]
CA[Composed Agent\nambiguous / cross-domain\nlru_cache]
FA2[Full Agent\nfallback]
end
subgraph Core["Core (FastAPI + asyncio)"]
FAST[FastAPI\napp/main.py]
AG[Pydantic AI Agent\napp/agent.py]
SP[System Prompt\nbuild_system_prompt]
end
subgraph Tools["Native Tools"]
T1[search_web\nTavily]
T2[get_weather\nWeatherAPI]
T3[get_datetime]
T4[summarize_url]
T5[transcribe_audio\nGemini STT]
T6[analyze_image\nGemini Vision]
T7[browse_web\nbrowser_submit_form\nPlaywright]
T8[github_*\nPyGitHub]
end
subgraph Skills["Skills (file-drop plugins)\napp/skills/"]
S1[content_curator\nread_later.py]
S2[travel_briefing]
S3[cv]
S4[deep_research\nresearch.py]
S5[meeting_notes\nget_meeting_insights · list_recent_meetings]
end
subgraph MCPTools["MCP Tools"]
M1[Gmail personal + work\ngmail_* tools]
M2[Outlook Email\noutlook_* tools]
M3[Google Calendar\ngoogle_* tools]
M4[Outlook Calendar\noutlook_calendar_* tools]
M5[Microsoft To Do\ntodo_* tools]
M6[Google Drive\ndrive_* tools]
M7[Logfire\nlogfire_* tools]
end
subgraph Memory["Memory Tools (agent tools over storage)"]
N[Notes\nsave · get · search · edit · delete · semantic]
TK[Tasks\ncreate · list · search · complete · edit · delete]
RM[Reminders\nset · list · cancel]
SC[Scheduled Tasks\ncreate · list · update · delete]
HX[History\nsearch_history · semantic_search]
UF[User Facts\nremember_fact · recall_facts · forget_fact]
end
subgraph Storage["Storage (StoragePort)"]
SQ[(SQLite\nlocal dev)]
PG[(PostgreSQL\nRailway prod)]
end
subgraph HealthIngest["Health Data Ingest (Spec 010 — push model, no server-side loop)"]
WATCH[Samsung Watch / wearable]
HC[Google Health Connect\non-device]
BR["bridge-android/\nKotlin · WorkManager 15 min"]
IN["POST /health/ingest\nX-Health-Secret · idempotent"]
WATCH --> HC
HC --> BR
BR --> IN
IN --> Storage
end
subgraph BG["Background Loops"]
RL[Reminder Loop\nevery 5 min]
BL[Briefing Loop\ndaily at BRIEFING_TIME]
ST[Scheduled Tasks Loop\nevery 60 sec]
RF[Reflection Loop\ndaily at 2 AM UTC]
ER[Evening Recap Loop\ndaily local evening]
WR[Weekly Recap Loop\nFriday local]
WP[Weekly Prep Loop\nSunday local]
RLD[Read-Later Digest\nSaturday morning]
JD[Journal Digest\nSunday evening]
EI[Email Intelligence\ndaily at EMAIL_INTEL_TIME]
AL[Alert + Intentions Loop\nevery 5 min]
MP[Meeting Prep Loop\nevery 5 min]
AE[Approval Expiry Loop\nevery 5 min]
PRL[PR Review Loop\ndaily at PR_REVIEW_TIME]
end
subgraph Models["LLM Models"]
LLM[Primary LLM\nMODEL_NAME\nGemini / Claude / GPT-4]
MINI[Mini LLM\nMINI_MODEL_NAME\nskill synthesis]
REFL[Reflection LLM\nREFLECTION_MODEL_NAME\nnightly reflection]
end
subgraph SelfImprove["Self-Improvement Loop (PR 1 + PR 2)"]
RS[read_source_file · grep_source\napp/tools/source.py\non every domain agent]
GD[detect_and_save_capability_gap\npost-turn fire-and-forget\nregex → mini-classify → grep-verify]
PR[propose_skill\nprimary model draft + static validate]
AC["activate_proposed_skill\napproval-gated → write app/skills/<name>.py\n+ hot_reload onto every skill-bearing agent"]
GD --> PR
PR --> AC
end
TG --> CC
CC -- complex --> GP
GP -- 2+ steps --> EP
CC -- simple --> CI
GP -- 0–1 steps --> CI
EP -- per step --> CI
WA --> FAST
CLI --> AG
CI --> SA
SA --> DA
SA --> CA
SA --> FA2
DA --> LLM
CA --> LLM
FA2 --> LLM
AG --> RS
AG -- post-turn fire-and-forget --> GD
GD --> Storage
PR --> LLM
AC --> Skills
FAST --> FAST
DASH --> Storage
AG --> SP
SP --> Storage
AG --> LLM
AG --> Tools
AG --> Skills
AG --> MCPTools
AG --> Memory
Memory --> Storage
BG --> Storage
BG --> TG
Skills --> MINI
BG --> REFL
Component Breakdown¶
FastAPI (app/main.py)¶
The web process entry point. Responsibilities:
- Mounts the WhatsApp webhook router, dashboard router, and health router (
/health/ingest,/health/backfill,/health/status) - Exposes
GET /healthandPOST /reflect - Exposes
POST /embed-backfill— bulk-embeds existing rows without embeddings; protected byX-Reflection-Secret; rate-limited to ~10 rows/sec - Exposes
POST /health/ingest— idempotent batch ingest for wearable samples from the Android bridge; protected byX-Health-Secret(Spec 010) - Exposes
POST /health/backfill— one-shot upload of a Samsung Health "Download Personal Data" CSV/JSON archive to seed historical samples; protected byX-Health-Secret - Exposes
GET /health/status— per-metric ingest freshness summary for monitoring the bridge from outside the device. Returns{now, latest_ingest, lag_minutes, metrics: [...]}with one entry per known metric type (zeros when never ingested) plus per-metricingest_lag_minutes. Protected byX-Health-Secretso polling it doesn't expose data shape (Spec 010) - Exposes
POST /message— external message endpoint for Android HTTP Shortcuts and other clients. Accepts text and/or images (multipart/form-data or JSON with base64). Protected byX-API-Tokenheader (API_TOKENenv var). Processes input through the full agent pipeline (intent routing, read-later injection, message history). Delivers response to Telegram (BRIEFING_CHAT_ID) and returns it in the JSON body. Conversation history is unified with Telegram — both interfaces useBRIEFING_CHAT_IDas the user ID so context is shared across surfaces. - Manages the FastAPI lifespan context — starts and stops the Telegram bot and all background asyncio tasks
- Exports API keys to
os.environso LLM SDKs can find them
Agent (app/agent.py)¶
The brain. Built with Pydantic AI. Responsibilities:
- Defines
AgentDeps— the dependency container passed to every tool and the system prompt build_deps()— single factory function used by all interfaces to createAgentDepsbuild_system_prompt()— async function decorated with@agent.system_prompt, assembles the full prompt from the static instruction sections + dynamic storage-fetched sections (behavioral learnings, narrative profile, live DB schema, and — if the vault is enabled — stored web logins). Permanent facts are not injected; the prompt tells the model to callrecall_facts(). See System Prompt Architecture below.build_briefing_system_prompt()— extendsbuild_system_promptby appending all briefing structural templates (_MORNING_BRIEFING,_EVENING_RECAP,_WEEKLY_RECAP,_WEEKLY_PREP,_MEETING_PREP) at the system level. Used exclusively bybriefing_agent.- Registers all native tools via
@agent.tool - Calls
load_skills(agent)at startup to auto-register skills fromapp/skills/
Multi-Model Architecture¶
Three model roles are configured independently via env vars. Each is optimised for a different cost/capability trade-off:
| Role | Env var | Default | Used for |
|---|---|---|---|
| Primary | MODEL_NAME |
google-gla:gemini-2.5-flash |
Main conversational agent and all 13 domain agents — every interactive user message |
| Mini | MINI_MODEL_NAME |
google-gla:gemini-3-1-flash-lite-preview |
Skill synthesis: CV extraction, deep research synthesis, read-later summarisation, travel briefing, post-conversation fact extraction, PR review structuring, capability-gap classification |
| Reflection | REFLECTION_MODEL_NAME |
(empty — falls back to MODEL_NAME) |
Nightly reflection run only — set to a larger/more capable model for deeper analysis without affecting interactive latency |
settings.effective_reflection_model resolves the fallback: if REFLECTION_MODEL_NAME is empty, it returns MODEL_NAME.
Model binding is per-request, not at agent construction. The Agent instances are built with no model (Agent(deps_type=AgentDeps, retries=3)); every call passes model=ctx.deps.settings.model_name at invocation time. Because settings and deps.settings are the same object, mutating deps.settings.model_name in memory changes the model on the next message with no restart — this is the mechanism that makes set_runtime_config("model_name", …) instant (see Self-Management Subsystem). (Limitation: a few skill-synthesis "mini" agents bind mini_model_name at import time, so a runtime change to the mini model only affects call sites that read deps.settings.mini_model_name dynamically until the next restart.)
Why separate models? The mini model handles high-frequency, low-stakes synthesis tasks (run after every message) where cost and latency matter most. The primary model handles user-facing responses where quality matters. The reflection model is a deliberate override slot — if you want a frontier model for nightly analysis without paying for it on every chat message, set only REFLECTION_MODEL_NAME.
Telegram Interface (app/interfaces/telegram/bot.py)¶
The primary user-facing interface. Handles:
- Text messages — streamed response with live edit every 1.5s (Telegram rate limit)
- Voice/audio — downloads from Telegram, transcribes via Gemini STT, runs agent, replies with a voice note (TTS) for responses ≤500 words
- Photos — downloads highest resolution, runs Gemini Vision, runs agent
- Documents — PDFs via Gemini Vision, text files decoded directly
- Slash commands —
/start,/help,/notes,/tasks,/reminders(bypass the agent, hit storage directly) - Allowlist —
ALLOWED_TELEGRAM_USER_IDSchecked on every handler; blocks all if unset - Audit logging —
_write_audit_entries()writes sanitised tool call records toaudit_logafter each interaction - Inline approval flow — after streaming, scans tool call outputs for sentinel tokens (
[APPROVAL_PENDING:<uuid>]); edits the sent message to add Confirm/Cancel/Edit inline keyboard buttons.CallbackQueryHandlerhandles button taps — Confirm executes viaACTION_REGISTRY, Cancel marks cancelled, Edit prompts for a correction and re-runs the agent. Edit state is tracked incontext.user_data. - Post-conversation background tasks — after audit logging, fires
asyncio.create_task()calls for: (1)extract_facts_from_exchange()to immediately extract explicitly stated user facts; (2)detect_and_save_capability_gap()to log routing/capability gaps for the self-improvement loop. Both are fire-and-forget. (The earlier 30-minsummarise_sessionwas removed 2026-05-09 — topic summaries now come only from the nightly reflection.) See Memory & Reflection.
Browser Tools (app/tools/browser.py)¶
Playwright-based headless Chromium. Features:
- Chrome user agent spoofing to reduce bot detection
- Asset blocking (images, fonts, media) for faster loads
- Semantic content extraction — targets
<main>,<article>,[role="main"]before falling back to<body> - Headline extraction from h1/h2/h3 in the content zone
- URL security: blocks
file://, localhost, and private IPs; optionalBROWSER_ALLOWED_DOMAINSallowlist - Graceful degradation if Playwright is not installed
GitHub Tools (app/tools/github.py)¶
PyGitHub-based integration. All API calls are synchronous and wrapped with asyncio.to_thread(). Gracefully returns a clear message if GITHUB_TOKEN is not configured or PyGitHub is not installed.
Embedding (app/tools/embedding.py)¶
embed_text(text, api_key) → list[float] | None — calls the Gemini gemini-embedding-001 REST API directly via httpx (3072 dimensions). Called internally by both storage adapters after each write to notes, interactions, and read_later. Returns None when GOOGLE_API_KEY is not set or the API call fails — never raises. Falls back to gemini-embedding-2-preview if the primary model returns 404.
Slack Tools (app/tools/slack.py)¶
slack-sdk WebClient integration. All API calls are synchronous and wrapped with asyncio.to_thread(). Username and channel name resolution are cached within each tool call. Gracefully returns a clear message if SLACK_BOT_TOKEN is not configured or slack-sdk is not installed. Requires the bot to be invited to any channel it reads or posts in.
Intent Router (app/tools/router.py + app/routing/agents.py)¶
Keyword-based intent classification and domain agent dispatch. Runs on every text message (Telegram and WhatsApp) before the agent is invoked.
Taxonomy (TOOL_CATEGORIES) — maps thirteen domain names to their canonical tool lists: utility, memory, email, calendar, github, news, slack, jira, drive, meetings, diagnostics, health, database. TOOL_TO_CATEGORY is the reverse lookup used in tests.
classify_intent(message, context_hint=None) — three-stage classifier:
- Keyword match (microseconds, zero LLM cost) — checks the message against per-domain keyword lists. Returns matched categories with
"utility"always added when any domain hits.keyword_hitsper category is logged to the Langfuse span. - Semantic fallback with confidence bands — when keywords find nothing, embed the message against pre-computed per-domain anchor embeddings (cached lazily). Returns top-K based on cosine score:
≥ 0.65(high confidence) → single best domain0.45 – 0.65(ambiguous) → top-K within 0.10 of best, capped at 3 →composed_agentso the LLM picks< 0.45→ empty (no confident match) Each routing decision logssemantic_top3: [{domain, score}]so misroutes are debuggable from traces.- Context-hint inheritance — when both keyword and semantic miss, an optional
context_hint: set[str](single non-utility domain from the previous turn) is inherited so short follow-ups like "anything else?" continue in the same domain agent.
The three-band semantic stage replaced an earlier binary ≥ 0.60 cutoff that caused misroute hallucinations: borderline queries scored just below threshold and fell through to the wrong agent, leading the model to claim "I don't have a tool" for capabilities that existed.
select_agent(categories) — dispatches to the right agent:
- Empty set → full_agent (conversational fallback / no confident match)
- {"utility"} only → utility_agent
- One non-utility domain (e.g. {"email", "utility"}) → email_agent
- Multiple non-utility domains → _get_composed_agent(frozenset(categories)) — a dynamically built agent whose tool set is the union of all matched domains' tools, cached with lru_cache(maxsize=32). With the three-band semantic stage above, this path is used not just for explicit cross-domain requests but also for ambiguous single-domain queries where multiple anchors scored within 0.10 of each other — the LLM picks across the candidates rather than the router guessing wrong.
Domain agents — 13 interactive Agent instances + 1 briefing_agent, all built once at import time in app/routing/agents.py. Each carries only its relevant tool subset, reducing the tool list the LLM sees. They share build_system_prompt and the active_categories focus hint injected into AgentDeps per request. Several universal meta-toolsets are registered on every domain agent via _UTILITY_FNS — they are meta, not domain-scoped, because the model often reaches for them from non-matching contexts:
- Source introspection —
read_source_file,grep_source("do I have a tool for X?"). - Raw DB ops —
db_query(read-only),db_execute(approval-gated).database/diagnosticsexist as focused-attention modes for the same tools. - Runtime config + self-management —
set_runtime_config/get/list/clear_runtime_config,railway_set_env/railway_redeploy(approval-gated,action_typeconfig/deployment), and read-onlyrailway_deployment_status/railway_deploy_logs/diagnose_self. See Self-Management Subsystem. - LLM trace introspection —
langfuse_recent_traces/langfuse_trace_details(Logfire's read tools live ondiagnostics_agent).
So "switch your model", "redeploy", or "any errors lately?" route to the right universal tool from any domain rather than being declined.
The jira_agent handles Jira queries (tickets, sprints, projects) via the six jira_* tools. Requires JIRA_BASE_URL, JIRA_EMAIL, and JIRA_API_TOKEN.
The drive_agent handles Google Drive queries (search, read, list recent, transcripts) via the five Drive MCP tools. Supports personal (GOOGLE_DRIVE_REFRESH_TOKEN) and work (GOOGLE_DRIVE_WORK_REFRESH_TOKEN) accounts.
The meetings_agent handles meeting intelligence queries via get_meeting_insights and list_recent_meetings skills (from app/skills/meeting_notes.py), plus Drive MCP tools and calendar MCP tools for context. Intent keywords include "recap the meeting", "notes from the", "what came out of", and "standup notes".
The diagnostics_agent handles self-diagnosis queries (errors, performance, loop health) via the three Logfire MCP tools. Its intent keywords use specific multi-word phrases (e.g. "your errors", "did the briefing run") to avoid false positives on common words like "error".
The health_agent (Spec 010) handles wearable / Samsung Watch queries via four read-only tools (get_recent_health, get_sleep_summary, get_hrv_trend, get_health_snapshot). Routed by intent keywords like "how did i sleep", "my hrv", "resting heart rate", "step count". Data is ingested separately by the Android bridge — the agent only reads.
The dba_agent (database category) is structurally identical to utility_agent — its value is the focused system-prompt mode (_DB_OPS_RULES, plus a live "## Live database schema" section introspected via describe_schema() rather than a hand-maintained appendix) for SQL-flavored requests. The two raw-DB tools themselves are universal on every domain agent.
Planning Layer (app/planning/) — Spec 008¶
A multi-step decomposition pass that runs before intent routing on Telegram messages, so requests like "check my email, then post a summary in #standup, then add a follow-up task" execute as a sequenced plan instead of one ambiguous tool dance.
planner.py—classify_complexity(text)is a regex pre-filter (≥30 chars + connectives like"and then","after that","also send") that gates the LLM call.generate_plan(text, deps)runs a tool-less_planner_agent: Agent[None, ExecutionPlan]. ReturnsNonewhenneeds_planning=Falseorlen(steps) < 2— the message then falls through to the normal single-step flow.executor.py—format_plan_preview(plan)renders the plan as a Telegram approval message.execute_plan(plan, deps, send_progress)runs steps sequentially; each step is independently routed viaclassify_intent+select_agent, with truncated outputs of prior steps threaded forward as a scratchpad so later steps can build on earlier results. A live▶/✓progress checklist is edited into the same message after each step.- Resume on failure — when a step throws,
execute_plansaves aPendingAction(action_type="plan_resume")with aPlanResumePayload(plan, remaining_steps, scratchpad, failed_step, failure_reason)and stops. Confirm restarts fromremaining_steps; Cancel aborts cleanly.
The plan itself is always behind the standard inline approval keyboard, even when none of the constituent tools would normally be gated. Full sequence: Multi-step plan flow.
Observability — Logfire + Langfuse + W&B Weave (app/observability.py) — Spec 009¶
Up to three observability stacks share a single OpenTelemetry tracer provider and run side-by-side without duplicating instrumentation:
graph LR
PA[Pydantic AI agent.run / run_stream] --> OTEL[OTEL TracerProvider]
FAST[FastAPI requests] --> OTEL
OTEL --> LF[Logfire processor\ninfra · FastAPI routes · loop spans]
OTEL --> LFC[Langfuse processor\nLLM generations · sessions · scores]
OTEL --> WB[Weave OTLP exporter\nagent + tool spans · optional]
LF --> LFCloud[(Logfire Cloud)]
LFC --> LFCloud2[(Langfuse Cloud)]
WB --> WBCloud[(W&B Weave Cloud)]
LF -. self-diagnosis MCP .-> AG[Agent — diagnostics_agent]
init_observability(settings, app) is called once from create_app() before any agent runs. It first installs an SDK TracerProvider as the OTEL global (_ensure_sdk_tracer_provider, so processors aren't silently swallowed by the default proxy provider), then configures Logfire (logfire.configure() + instrument_fastapi) when LOGFIRE_TOKEN is set, and calls Agent.instrument_all() from Pydantic AI when Langfuse is enabled so spans land on the global provider where all processors observe them. (When only Logfire is enabled, logfire.instrument_pydantic_ai() is used instead — it routes through Logfire's internal tracer, which Langfuse would not see.)
W&B Weave (optional, off by default). When WEAVE_ENABLED=true (plus WEAVE_API_KEY + WEAVE_PROJECT as <entity>/<project>), _init_weave() does two things: calls weave.init() so the Weave SDK auto-patches supported LLM clients, and attaches an OTLP BatchSpanProcessor to the shared provider that ships Pydantic AI spans to https://trace.wandb.ai/otel/v1/traces. So agent + tool traces land in the Weave UI alongside Logfire and Langfuse — all three observe the same OTEL spans. Weave init fails soft (a warning, never a boot failure) if the key/project are missing.
Per-turn trace grouping. langfuse_root_span(name, session_id, user_id, …) is an async context manager wrapping the whole turn so every agent.run() inside it nests under one Langfuse trace instead of producing one trace per call.
Fire-and-forget nesting. spawn_with_otel_context(coro) is an asyncio.create_task() variant that propagates the current OTEL span context, so post-conversation background work (fact extraction, capability-gap detection, delegation orchestration, the self-redeploy marker) nests under the originating turn trace instead of appearing as an orphan. Genuinely independent work (scheduled loops) keeps bare create_task().
Stage-level spans inside a turn. Pydantic AI's Agent.instrument_all() covers LLM generations + tool calls automatically, but the preprocessing and routing stages would otherwise be invisible. The following functions are decorated with Langfuse @observe so they appear as nested children of telegram.turn:
preprocess.stt—app.tools.voice.transcribe_audio(audio bytes → transcript)preprocess.vision—app.tools.vision.analyze_image(image bytes → description)embed_text—app.tools.embedding.embed_text(sanitized: only chars/dims/model captured, never the vector or api_key)routing.classify_intent—app.tools.router.classify_intent(output:{categories, method}where method iskeyword | semantic | context_hint | none)context_injection—app.utils.message_utils.inject_context(output: per-layer counts + 1500-char preview of what was prepended to the user message)message_history_retrieval—app.utils.message_utils.fetch_message_history(metadata:{mode: chronological|semantic, recent_count, semantic_count, semantic_enabled};mode=errorindicates the semantic path failed and chronological was used)
update_current_span(input=, output=, metadata=) in observability.py is the helper they use to attach sanitized payloads; it degrades to a no-op when Langfuse is disabled.
Asynchronous trace scoring. When a Confirm/Cancel/Edit happens minutes after the original turn closed, score_trace(action.trace_id, name, value) attaches a quality score to the original trace. PendingAction.trace_id is captured at gate time via OTEL context (_current_otel_trace_id() in app/approval.py) precisely so this delayed score can land. Three score names are emitted today: user_approval (Confirm 1.0 / Cancel 0.0), user_edit (Edit tap), and agent_error (in-flight exception, written via score_current_trace).
Langfuse-managed Prompts (app/prompts.py + prompts.lock.json) — Spec 009¶
Nine system prompts can be tuned in the Langfuse UI without redeploying — get_prompt(name, fallback) returns the production-labeled Langfuse version when reachable, otherwise the code constant. prompts.lock.json (repo root) pins each constant's sha256, and check_drift() runs at startup to warn when code has been edited without being pushed. The sync workflow lives in scripts/sync_prompts.py; full prompt list and CLI usage in Tools → Langfuse-managed Prompts.
Briefing Agent (app/routing/agents.py → briefing_agent)¶
A dedicated agent used exclusively by the scheduled briefing loops (morning, evening, weekly). It is never invoked for interactive user messages.
Tool set: memory (tasks, notes, reminders) + utility (weather, datetime, search, URL, maps) + all email MCP tools + all calendar MCP tools + Microsoft To Do (todo_*) MCP tools.
Why separate? The interactive agents are designed around one domain. The morning briefing legitimately needs email + calendar + tasks + weather simultaneously — a cross-domain request that would normally fall back to the full 45-tool full_agent. briefing_agent gives it a pre-composed, focused tool set without the overhead of GitHub, news, or browser tools that are irrelevant for briefings.
Skill Registry (app/skills/)¶
File-drop plugin architecture. Any .py file placed in app/skills/ with a function decorated @skill is auto-discovered and registered as an agent tool on startup via load_skills(agent). Modules starting with _ are ignored. Loading is idempotent — calling load_skills() twice registers nothing new.
Built-in skills:
- read_later.py — Content Curator (save_to_read_later, list_read_later, delete_read_later, get_read_later_digest)
- travel_briefing.py — get_travel_briefing — on-demand travel summary (weather + maps/transit + LLM synthesis)
- cv.py — store_cv, get_cv — parse and persist CV as structured user facts for job application workflows
- research.py — deep_research — multi-step web research: generates sub-questions, fetches and synthesises sources in parallel, saves result as a "Research: <topic>" note
- meeting_notes.py — get_meeting_insights, list_recent_meetings — extract decisions/actions/key points from Drive transcripts via mini-model; save as notes. Source-agnostic: _SOURCE_REGISTRY maps source name to fetcher, allowing Teams or Notion to be wired in later.
Self-Improvement Loop (app/tools/source.py + app/memory/capability_gaps.py + app/skills/proposer.py)¶
A four-stage pipeline that lets Kwasi inspect its own deployed code, log its own failures, draft new skills from those failures, and (with approval) activate them.
flowchart LR
subgraph turn[Each Telegram turn]
A[agent response] --> B[regex prefilter]
B -->|negation match| C[mini-model classify]
C --> D[grep_source verify capability]
D -->|no source matches| E[gap]
D -->|matches found| F[available]
end
E & F -->|fire-and-forget| G[(capability_gaps)]
subgraph propose[Propose flow]
H[propose_skill description/gap_id] --> I[primary model drafts .py]
I --> J[static validate: compile + AST + structure]
J --> K[(proposed_skills, status=draft)]
end
subgraph activate[Activate flow]
L[activate_proposed_skill] --> M[approval_gate Telegram]
M -->|confirm| N[re-validate]
N --> O[write app/skills/name.py]
O --> P[hot_reload_new_skill onto full + 9 domain agents]
P --> Q[skill callable, status=activated]
end
Stage 1 — Source introspection. read_source_file(path, offset, limit) and grep_source(pattern, path_glob) in app/tools/source.py let any agent read the deployed repo. REPO_ROOT auto-detects via pyproject.toml walk-up so it works identically in local dev, CLI, and Railway containers. Paths resolve under REPO_ROOT and reject escapes. In production, tests/, specs/, docs/, .env* are excluded by .dockerignore. Registered in _UTILITY_FNS so every domain agent has them — introspection is meta, not domain-scoped.
Stage 2 — Capability-gap detection. detect_and_save_capability_gap() runs fire-and-forget after every Telegram turn (alongside extract_facts_from_exchange). Pipeline: regex prefilter on the response (negation patterns like "I don't have a tool", "as a language model") → mini-model decline classifier → grep_source for the suspected capability → classify as gap (no source matches → real capability hole, candidate for a new skill) or available (source matches found → capability exists; the decline was a routing or prompt-level failure). Stored in capability_gaps with status lifecycle (open | resolved | dismissed). Surfaced via get_capability_gap_digest on the diagnostics agent.
Stage 3 — Skill proposer. propose_skill(description, gap_id?) calls the primary model with a skill template + the originating gap context (if any), receives a complete .py file, then runs static validation:
compile()— syntax check- AST walk — rejects imports of
subprocess,shutil,ctypes,marshal,pickle,socket,smtplib,ftplib,pty; calls ofos.system,os.popen,os.exec*,os.fork,os.spawn; theeval()andexec()builtins - Structure check — exactly one
@skill-decorated async function,ctxas first param, non-empty docstring, name matches expected slug
Passes save as ProposedSkill(status='draft', validation_status='passed') — code lives in the DB, never on disk. Failures save with validation_status='failed' and the error report so the user can inspect.
Stage 4 — Activation (approval-gated). activate_proposed_skill(skill_id) is the only path that writes to disk. It re-validates first (defense if the DB row was tampered with after proposal), goes through approval_gate(action_type='skill') for the Telegram inline keyboard, on Confirm writes app/skills/{name}.py (refusing to overwrite), then calls hot_reload_new_skill(name, agents) in app/skills/__init__.py to register the new function on the full agent + 9 domain agents that include skills. The activation response includes an explicit "ephemeral until committed to git" warning — the file is on the running container's disk only and disappears on next Railway redeploy. Automated PyGitHub PR persistence is planned but not shipped.
Why no E2B sandbox for code testing: the static-validation gauntlet plus mandatory human approval on Telegram covers the same risk surface E2B would address. Skills run in the same trust boundary as the rest of the app — the meaningful safety control is the human review gate, not a runtime sandbox. E2B remains useful for the existing execute_python tool (untrusted code Kwasi runs at user request), just not for skill validation.
System Prompt Architecture¶
build_system_prompt() assembles the agent's full context from three categories of sections, always in this order:
graph TD
subgraph Static["Static Instruction Sections (identical across requests — cache-friendly)"]
direction LR
A1[_PERSONA\nVoice · banned openers · cultural context]
A2[_DECISION_MAKING\nWhen to act vs. confirm · tool priority]
A3[_CONTEXT_USAGE\nHow to apply the profile silently]
A4[_MEMORY_INSTRUCTIONS\nFacts store · search priority order]
A5[_JOURNAL_INSTRUCTIONS\nWhen to call save_journal_entry]
A6[_TONE_CALIBRATION\nSituational energy matching]
A7[_OUTPUT_FORMAT\nMobile formatting · brevity rules]
A8[_ERROR_HANDLING\nRetry-first · plain-English failure summaries]
A9[_TOOLS_AND_ACCOUNTS\nTool guidance scoped to active domain]
A10[_TOOL_USAGE_EXAMPLES\nExample flows]
A11[_APPROVAL_INSTRUCTIONS\nSentinel handling · never show raw token]
A12[_DOCS_REFERENCE\nSelf-introspection · anti-poisoning rules]
A13[_DB_OPS_RULES\nRaw-SQL guardrails · read-live-schema rule]
A14[_TOOL_DISCIPLINE\nWhen to reach for which tool]
A15[_DELEGATION_RULES\nWhen to delegate · backend choice]
A16[_SELF_CONFIG_RULES\nSelf-config / Railway control bounds]
A17[focus_section\nActive domain focus hint if scoped]
end
subgraph Dynamic["Dynamic Sections (fetched from storage per request)"]
direction LR
B2[learnings_section\nagent_learnings table status=active\nBehavioral Guidelines header]
B3[context_section\ncontext table — narrative profile\ncapped at 4,800 chars ≈ 1,200 tokens]
B4[schema_section\nLive database schema via describe_schema()\nintrospected · process-cached · fail-safe to empty]
B5[web_logins_section\nvault service names + domains never secrets\nStored web logins header · dropped if vault off]
end
Static --> Prompt[System Prompt\n~3,000–6,000 tokens]
Dynamic --> Prompt
Key design constraints:
- Datetime is NOT in the system prompt. It is prepended to the user turn (as
[Thursday, April 24, 2026 — 09:15 AM Europe/Paris]) so the stable system prompt sections form a consistent prefix across requests and qualify for Gemini's implicit prompt cache. A changed datetime would bust the cache on every request. - Profile is capped at 4,800 chars (~1,200 tokens). The nightly reflection profile grows over time — the cap prevents unbounded system prompt growth while keeping the most recent content intact.
- Tool sections are scoped. When
active_categoriesis set (i.e. a domain agent is running), only the relevant_DOMAIN_TOOL_SECTIONSentries are included — cutting ~600–1,000 tokens of irrelevant tool guidance from each domain-routed request. - Briefing agent uses
build_briefing_system_prompt(), which extends the base prompt by appending_MORNING_BRIEFING,_EVENING_RECAP,_WEEKLY_RECAP,_WEEKLY_PREP, and_MEETING_PREPtemplates. These arrive at the system level so the model's structural understanding of briefing formats is cached, not re-parsed per call.
Personality directives (_PERSONA + _ERROR_HANDLING):
Two sections encode Kwasi's distinctive character:
_PERSONA— Direct, warm-but-not-performative voice. Natural Ghanaian cultural familiarity. Light humor where it lands. Banned openers list:"I'm sorry","Unfortunately","I apologise","I wasn't able to","I'm afraid"— the model must reframe as what it CAN do._ERROR_HANDLING— Retry-first directive: when a tool fails, attempt at least one alternative (different tool, rephrased query, decomposed sub-task) before reporting failure. When all paths are exhausted: structured ≤3-bullet plain-English summary of what was tried + one concrete next step. No raw error codes or exception names in any user-facing response.
Self-Management Subsystem (app/tools/config_registry.py)¶
Kwasi can change select settings about itself and manage its own Railway deployment, from Telegram. The design centres on one principle: a single declarative registry is the source of truth, and everything else derives from it.
Config registry. CONFIG_REGISTRY in app/tools/config_registry.py is a list of ConfigField(field, env, validator, coerce, runtime_settable, env_settable). Both allowlists derive from it, so they can never drift:
ALLOWED_CONFIG_KEYS = {f.field: (f.validator, f.coerce) for f in CONFIG_REGISTRY if f.runtime_settable}— consumed byruntime_config.py.RAILWAY_ENV_ALLOWLIST = {f.env for f in CONFIG_REGISTRY if f.env_settable}— consumed byrailway.py.
Adding a self-manageable setting is one registry row. An independent deny-substring gate (TOKEN/SECRET/KEY/PASSWORD/DATABASE_URL/URL/CREDENTIAL) in is_allowed_env blocks secret-shaped variables regardless of the allowlist — so RAILWAY_API_TOKEN and friends can never be agent-set. Some fields are env_settable but not runtime_settable (e.g. weave_enabled, read only at startup) — the registry encodes that honestly so a runtime override can't silently no-op.
Two control planes.
| Tool | Mechanism | Effect |
|---|---|---|
set_runtime_config(key, value) |
Mutates deps.settings in memory and persists to the context KV at system:config:<field>. load_overrides(deps) re-applies persisted overrides at boot. |
Instant — next message, no restart. Survives reboots (override > env). |
railway_set_env(name, value, redeploy=True) |
variableUpsert via the Railway GraphQL API (backboard.railway.com/graphql/v2). |
Bakes into the real environment; redeploys (~1–2 min). Use for boot-only settings or to make a change permanent. |
railway_redeploy() |
deploymentRedeploy. |
Restart with no config change. |
Both mutating planes are approval-gated (action_type config / deployment). railway_set_env runs the field's registry validator (validate_env_value) before upsert, so a bad MODEL_NAME can't bake in and crash-loop the new container. Read-only tools — railway_deployment_status, railway_deploy_logs, diagnose_self — are ungated. diagnose_self bundles a health snapshot: effective config (with override marks), recent Logfire exceptions, background-loop heartbeats (read from fixed system:<loop>:heartbeat keys), and Railway deploy status.
Post-redeploy confirmation (app/self_redeploy.py). When the agent redeploys itself, the container dies mid-conversation — so the loop is closed across the restart:
- After a successful Railway call, the executor writes a
system:pending_self_redeploymarker (chat to notify, expected config, andprev_deploy_idcaptured at trigger time). Writing after success means a failed call leaves no orphan. - On the next boot, the lifespan calls
confirm_pending_self_redeploy(deps, telegram_app)(right afterrecover_orphan_delegations, soload_overrideshas already applied the new effective config). It compares the live latest-deployment id againstprev_deploy_id— if unchanged, no redeploy actually happened (failed build / no-op value), so it clears the marker silently. Otherwise it verifies the expected config took effect and posts a proactive ✅ (or ⚠️ on mismatch / unusually long restart), then consumes the marker (cleared before delivery → at-most-once, no duplicate notify on a boot loop). - If the build fails, the container never reaches step 2 and Railway keeps the previous healthy deploy serving — so the agent stays reachable and
diagnose_selfsurfaces the stuck marker + FAILED status.
Task Delegation (app/delegate.py)¶
delegate_to_coding_agent(task, backend?, max_minutes?) spawns an external coding agent inside an E2B sandbox for complex tasks no existing tool covers. Three backends:
opencode(default) — Gemini-backed, dramatically cheaper, no per-tool-use telemetry. Uses theopencodeE2B template; runsopencode run --model {DELEGATION_OPENCODE_MODEL} <task>withGEMINI_API_KEY+GOOGLE_GENERATIVE_AI_API_KEYset; captures plain-text stdout as the final answer.claude_code— Anthropic-backed, more capable for hard agentic coding. Uses theclaudetemplate +--output-format stream-json --verbose; full JSONL parsing for tool-use counts, token usage, total cost, and mid-run cost-cap enforcement. Auth isANTHROPIC_API_KEYonly (subscription OAuth is not used, for compliance).web_task— DOM-driven browser automation (browser-use + Gemini) for JS/SPA scraping + multi-step form-fills. Reached via a dedicateddelegate_web_task(task, credential?, max_minutes?)tool (not abackend=arg, so the model routes browser intents correctly). Needs a prebuiltWEB_TASK_TEMPLATEE2B image. Withcredential="<name>"it logs into a real account using a credential-vault entry: the tool references the cred by name only;run_delegationdecrypts it at run time into browser-use domain-scopedsensitive_data(placeholders the model never resolves) + apyotpTOTP code, written to the sandbox viafiles.write— the LLM never sees the plaintext.
Approval-gated (action_type="delegation"); the executor (shared by both tools) creates a Delegation row, spawns asyncio.create_task(run_delegation(...)), and returns a "working on it" sentinel — results land via a separate Telegram message when done. Image artifacts come back as Telegram photos. Caps: delegation_max_per_day (5), delegation_max_minutes (10), delegation_max_cost_usd (Claude Code path only). Container-restart recovery (recover_orphan_delegations) marks running rows as failed and notifies. Each delegation persists a Note tagged #delegation for searchable history.
Credential vault (
app/vault.py): Fernet-encrypted website logins in thevault_credentialstable (master key fromVAULT_MASTER_KEYenv only — never in the DB, never agent-settable). Enrolled out of band viascripts/vault_add.py;build_system_promptsurfaces the enrolled service names (never secrets) so the model knows whichdelegate_web_task(credential=...)names exist.db_queryrefuses thevault_credentialsandcontexttables.
Inbound PR Review (app/pr_review.py)¶
When Lawrence is tagged as a reviewer on a PR in an allowlisted org, Kwasi fetches it via PyGithub (get_pr_full — title + body + per-file diff, capped 8 KB/file & 60 KB total) and runs a tool-less mini-model Agent[None, PRReview] to produce structured summary / risks / questions / description_check / test_observation / verdict. Output is rendered as Telegram markdown by Python (deterministic, testable) and persisted as a Note (#pr-review #<owner> #<repo>) for searchability. Discovery is dual-path — Gmail from:notifications@github.com "review requested" and GitHub's is:pr is:open review-requested:<login> — deduped by (owner, repo, number). Org-only allowlist via PR_REVIEW_ORGS (case-insensitive, fail-closed when empty). Three callers share one pipeline: the _pr_review_loop (daily at PR_REVIEW_TIME), and two manual agent tools github_review_pr / github_pending_pr_reviews on github_agent. Per-PR dedup via system:pr_review:<owner>/<repo>#<N> context keys with an 18-hour TTL. Read-only by design — no writing back to GitHub, no E2B sandbox.
Dashboard (app/routers/dashboard.py)¶
FastAPI router mounted at /dashboard/. Four views (interactions, audit log, scheduled tasks, memory profile). Protected by DASHBOARD_SECRET — disabled entirely if the env var is not set. Auth via X-Dashboard-Secret header or ?token= query param.
MCP Tools (app/interfaces/mcp/client.py)¶
Gmail, Outlook email, Outlook calendar, and Microsoft To Do tools are synchronous (Google/MS APIs). They are wrapped with asyncio.to_thread() to avoid blocking the event loop. get_mcp_tools() returns all wrappers; agents.py filters them into _EMAIL_MCP, _CALENDAR_MCP, and _TODO_MCP sets for domain-specific routing.
gmail_read_email_wrapper — fetches the complete body of a Gmail message (up to 20,000 chars). Walks the MIME part tree recursively, prefers text/plain, strips HTML from text/html fallback. The search_emails_wrapper and summary tools only return a short snippet (~100–200 chars) — always follow up with gmail_read_email_wrapper when the full text is needed.
outlook_read_wrapper — fetches the complete body of an Outlook message (up to 20,000 chars). Strips HTML (Graph API returns HTML by default), includes From/Subject/Date fields.
Google Calendar shared calendars — google_get_todays_schedule_wrapper and google_get_calendar_events_wrapper query all calendars the user has access to (primary, shared, subscribed) by calling calendarList().list() first. Each event is prefixed with [Calendar Name] in the output.
Microsoft To Do tools (todo_*) use the same OutlookClient._make_request pattern as Outlook email and calendar. They require Tasks.ReadWrite scope on the Outlook refresh token.
Memory (app/memory/)¶
ports.py—StoragePortprotocol + all data models (Interaction,Note,Task,Reminder,UserContext,UserFact,ScheduledTask,AuditEntry,ReadLaterItem,NewsTopic,SeenStory,AlertRule,PendingAction,PendingIntention,AgentLearning,JournalEntry,SemanticSearchResult). TheHealthSamplePydantic model lives inapp/health/models.py.adapters/sqlite.py— SQLite implementation (local dev,:memory:for tests). Tables:interactions,notes,tasks,reminders,context,user_facts,scheduled_tasks,audit_log,read_later,news_topics,seen_news,alert_rules,pending_actions,pending_intentions,agent_learnings,journal_entries,health_samples.adapters/postgres.py— asyncpg connection pool (Railway production). Same tables as SQLite.health_samples.valueisJSONBin Postgres and JSON-encoded TEXT in SQLite.- Both adapters embed new records on write via
embed_text()(Gemini gemini-embedding-001, best-effort: row INSERTed first, embeddingawaited and UPDATEd onto the row in a swallowed try/except — so the row persists even if embedding fails) and implementsemantic_search()— Python cosine in SQLite, pgvector<=>in Postgres. logger.py—log_interaction()helper used by all handlerspost_conversation.py—extract_facts_from_exchange()(immediate post-message fact extraction); fire-and-forget viaasyncio.create_task(). The earlier 30-minsummarise_session()was removed 2026-05-09; topic summaries now come only from the nightly reflection.reflection.py—ReflectionService— nightly LLM pass producing four outputs: updatedUserContextnarrative profile, auto-extractedUserFactrecords, newPendingIntentionrecords, andAgentLearningcorrections
Background Loops¶
All background loops run as asyncio tasks within the FastAPI lifespan.
| Loop | Trigger condition | Schedule |
|---|---|---|
| Reflection | REFLECTION_SECRET set |
2 AM UTC daily |
| Reminders | Telegram token configured | Every 5 minutes |
| Daily Briefing | BRIEFING_CHAT_ID set |
BRIEFING_TIME UTC daily (default 08:00) |
| Evening Recap | BRIEFING_CHAT_ID set |
EVENING_RECAP_TIME local (default 21:00) |
| Weekly Recap | BRIEFING_CHAT_ID set |
WEEKLY_RECAP_DAY/WEEKLY_RECAP_TIME local (default Friday 18:00) |
| Weekly Prep | BRIEFING_CHAT_ID set |
WEEKLY_PREP_DAY/WEEKLY_PREP_TIME local (default Sunday 18:00) |
| Read-Later Digest | BRIEFING_CHAT_ID set |
READ_LATER_DIGEST_DAY/READ_LATER_DIGEST_TIME local (default Saturday 09:00) |
| Journal Digest | BRIEFING_CHAT_ID set |
JOURNAL_DIGEST_DAY/JOURNAL_DIGEST_TIME local (default Sunday 19:00) |
| Email Intelligence | BRIEFING_CHAT_ID + email creds |
EMAIL_INTEL_TIME local (default 09:30); set "" to disable |
| User Scheduled Tasks | BRIEFING_CHAT_ID set |
Every 60 seconds (croniter-based evaluation) |
| Alert Rules + Intentions | BRIEFING_CHAT_ID set |
Every 5 minutes |
| Meeting Prep | BRIEFING_CHAT_ID set |
Every 5 minutes (fires ≈MEETING_PREP_LEAD_MINUTES before each meeting) |
| Meeting Follow-up | BRIEFING_CHAT_ID + GMAIL_WORK_REFRESH_TOKEN |
Every MEETING_FOLLOWUP_POLL_MINUTES (default 30) — polls work Gmail for Gemini meeting notes → Telegram card |
| PR Review | BRIEFING_CHAT_ID + GITHUB_TOKEN + PR_REVIEW_ORGS |
PR_REVIEW_TIME local (default 09:00); set "" to disable |
| Approval Expiry | Telegram bot active | Every 5 minutes |
Key Design Decisions¶
Why Pydantic AI?¶
Pydantic AI handles the agent loop (tool calls → LLM → tool calls → ...), type-safe tool definitions, streaming, and model switching. The Agent class abstracts over different LLM providers — switching from Gemini to Claude is a single env var change.
Why polling instead of webhook for Telegram?¶
Long-polling (updater.start_polling) is simpler to deploy on Railway — no public URL needed for the bot itself, no TLS certificate management. The WhatsApp webhook still uses a proper HTTP endpoint since the Meta platform requires it.
Why asyncio tasks for background loops?¶
Avoids the complexity of a separate worker process, Celery, or a job scheduler. Since the app is single-user and the loops are low-frequency (5 min / daily / 60 sec), asyncio tasks inside the FastAPI lifespan are sufficient and operationally simple.
Why MCP for email/calendar?¶
MCP (Model Context Protocol via fastmcp) provides a clean abstraction for external tool servers. Gmail and Outlook have separate auth flows (OAuth2 with Google vs. MSAL with Microsoft) — MCP isolates this complexity from the agent.
Why native PyGitHub instead of the official GitHub MCP server?¶
The existing "MCP" tools in this project are local Python wrappers, not external server processes. PyGitHub follows the same pattern as other tools (weather, search, news) — in-process, async-wrapped, consistent output formatting. The official GitHub MCP server is a Go binary designed for IDE integrations, not embedded service use.
Why a file-drop skill registry?¶
Allows extending the agent's capabilities without touching agent.py. New skills can be developed, tested, and dropped in independently. The registry is idempotent and fail-safe — a broken skill file is skipped with a logged error.
Why keyword-based intent routing instead of LLM-based routing?¶
The routing step runs on every message, so it must have near-zero latency and zero cost. Keyword matching is deterministic, testable, and trivially debuggable — if it misroutes a message, you can see exactly why by reading the keyword list. An LLM-based router would add a full extra inference step per message, significantly increasing latency and cost for a single-user assistant. The keyword approach covers ~95% of real messages correctly; the remaining 5% fall through to the full agent, which is always the safe fallback.
Why a sentinel token instead of pausing the agent mid-run for approval?¶
The agent runs to completion — it always produces a text response. Pausing mid-run would require a resumable coroutine, persistent agent state, and a way to re-inject the approval result back into an in-progress LLM call. The sentinel pattern avoids all of this: the agent sees "[APPROVAL_PENDING:uuid]" as a normal tool result, writes a response like "your action is pending approval", and exits. handle_message post-processes the result synchronously. On Confirm, a simple direct executor call (no LLM re-run) performs the action. This keeps the agent loop stateless and the approval flow outside the LLM's critical path.
Why a separate app/approval.py module?¶
approval.py imports only from app/memory/ports — it has no knowledge of agent.py, bot.py, or client.py. This breaks the potential circular import chain: agent.py imports approval.py, bot.py imports approval.py, and client.py imports approval.py — but none of these files import each other. If the gate lived in agent.py or bot.py, it would create a cycle.
Why domain agents instead of a single agent with all tools?¶
Reducing the tool list the LLM sees for each request reduces token usage per call and improves model focus — a model with 8 tools performs more reliably than one with 50. Domain agents are built once at import time (zero per-request cost), share the same system prompt function, and fall back transparently to the full agent for anything ambiguous. The full agent is always the safety net.
Why both Logfire and Langfuse?¶
Different jobs. Logfire owns infrastructure and request-shape tracing — FastAPI routes, background-loop ticks, exceptions, latency by file path. The diagnostics_agent uses it via Logfire's MCP read tool to answer "did the briefing run?" or "which tool is slowest?". Langfuse owns the LLM layer — per-generation token usage and cost, prompt versions, session/user grouping, trace-level quality scores tied to user actions (Confirm/Cancel/Edit). W&B Weave is an optional third backend (WEAVE_ENABLED) for teams already on the W&B ecosystem who want agent traces and evaluations in Weave. All processors hang off the same OTEL tracer provider in app/observability.py, so Pydantic AI is instrumented once and spans flow to every enabled backend without duplication. Any one can be disabled (skip its env vars) and the others keep working.
Why a push model for health data instead of a server-side polling loop?¶
Neither Samsung Health nor Google Health Connect exposes a cloud / server-side REST API — both store data on-device, encrypted, behind explicit user consent. There is no OAuth Kwasi could call from FastAPI to pull samples. Two options remained: (a) integrate a paid third-party aggregator (Terra/Rook/Thryve, ~$400/mo minimum) which itself still requires an Android piece, or (b) write a tiny sideloaded Android bridge (bridge-android/) that reads Health Connect locally and POSTs to /health/ingest. The bridge is one Kotlin Gradle project, ~600 LOC, and never goes through the Play Store — so we sidestep Google's sensitive-permissions review entirely. Per-metric watermarks in DataStore prevent re-sends and gaps across reboots; the server is idempotent on (metric_type, start_time, source_device) so retries are safe. Result: zero ongoing cost, full data ownership, and the rest of Kwasi's read path is identical to any other agent tool.
Why semantic search instead of upgrading keyword search?¶
Keyword (LIKE / ILIKE) search is fast and good enough for exact recall. Semantic search complements it — it finds content by meaning when the user's words don't match the stored text (e.g. "anything about burnout" finding a note titled "work-life balance thoughts"). Rather than replacing keyword search, the agent uses both: keyword first (zero latency, no API call), semantic as a fallback or in parallel via find_everything. Embeddings are stored at write time so queries are instant. The embed_text call at write time costs one Gemini API call per saved record — negligible for a personal assistant.