Architecture¶
System Overview¶
---
config:
layout: elk
---
graph LR
subgraph Interfaces
TG[Telegram\npython-telegram-bot polling]
WA[WhatsApp\nwebhook]
CLI[CLI\nlocal REPL]
DASH[Dashboard\n/dashboard/]
end
subgraph Planning["Planning Layer (app/planning/) — Spec 008"]
CC[classify_complexity\nregex pre-filter]
GP[generate_plan\nplanner agent]
EP[execute_plan\nstep loop · scratchpad threading]
end
subgraph Routing["Intent Routing (app/tools/router.py + app/routing/agents.py)"]
CI[classify_intent\nkeyword matcher]
SA[select_agent\ndispatcher]
DA[Domain Agents\nemail · calendar · memory · github · news\nslack · jira · drive · meetings\ndiagnostics · health · utility]
FA2[Full Agent\nfallback]
end
subgraph Core["Core (FastAPI + asyncio)"]
FAST[FastAPI\napp/main.py]
AG[Pydantic AI Agent\napp/agent.py]
SP[System Prompt\nbuild_system_prompt]
end
subgraph Tools["Native Tools"]
T1[search_web\nTavily]
T2[get_weather\nWeatherAPI]
T3[get_datetime]
T4[summarize_url]
T5[transcribe_audio\nGemini STT]
T6[analyze_image\nGemini Vision]
T7[browse_web\nbrowser_submit_form\nPlaywright]
T8[github_*\nPyGitHub]
end
subgraph Skills["Skills (file-drop plugins)\napp/skills/"]
S1[content_curator\nread_later.py]
S2[travel_briefing]
S3[cv]
S4[deep_research\nresearch.py]
S5[meeting_notes\nget_meeting_insights · list_recent_meetings]
end
subgraph MCPTools["MCP Tools"]
M1[Gmail personal + work\ngmail_* tools]
M2[Outlook Email\noutlook_* tools]
M3[Google Calendar\ngoogle_* tools]
M4[Outlook Calendar\noutlook_calendar_* tools]
M5[Microsoft To Do\ntodo_* tools]
M6[Google Drive\ndrive_* tools]
M7[Logfire\nlogfire_* tools]
end
subgraph Memory["Memory Tools (agent tools over storage)"]
N[Notes\nsave · get · search · edit · delete · semantic]
TK[Tasks\ncreate · list · search · complete · edit · delete]
RM[Reminders\nset · list · cancel]
SC[Scheduled Tasks\ncreate · list · update · delete]
HX[History\nsearch_history · semantic_search]
UF[User Facts\nremember_fact · recall_facts · forget_fact]
end
subgraph Storage["Storage (StoragePort)"]
SQ[(SQLite\nlocal dev)]
PG[(PostgreSQL\nRailway prod)]
end
subgraph HealthIngest["Health Data Ingest (Spec 010 — push model, no server-side loop)"]
WATCH[Samsung Watch / wearable]
HC[Google Health Connect\non-device]
BR["bridge-android/\nKotlin · WorkManager 15 min"]
IN["POST /health/ingest\nX-Health-Secret · idempotent"]
WATCH --> HC
HC --> BR
BR --> IN
IN --> Storage
end
subgraph BG["Background Loops"]
RL[Reminder Loop\nevery 5 min]
BL[Briefing Loop\ndaily at BRIEFING_TIME]
ST[Scheduled Tasks Loop\nevery 60 sec]
RF[Reflection Loop\ndaily at 2 AM UTC]
ER[Evening Recap Loop\ndaily local evening]
WR[Weekly Recap Loop\nFriday local]
WP[Weekly Prep Loop\nSunday local]
RLD[Read-Later Digest\nSaturday morning]
JD[Journal Digest\nSunday evening]
EI[Email Intelligence\ndaily at EMAIL_INTEL_TIME]
AL[Alert + Intentions Loop\nevery 5 min]
MP[Meeting Prep Loop\nevery 5 min]
AE[Approval Expiry Loop\nevery 5 min]
end
subgraph Models["LLM Models"]
LLM[Primary LLM\nMODEL_NAME\nGemini / Claude / GPT-4]
MINI[Mini LLM\nMINI_MODEL_NAME\nskill synthesis]
REFL[Reflection LLM\nREFLECTION_MODEL_NAME\nnightly reflection]
end
TG --> CC
CC -- complex --> GP
GP -- 2+ steps --> EP
CC -- simple --> CI
GP -- 0–1 steps --> CI
EP -- per step --> CI
WA --> FAST
CLI --> AG
CI --> SA
SA --> DA
SA --> FA2
DA --> LLM
FA2 --> LLM
FAST --> FAST
DASH --> Storage
AG --> SP
SP --> Storage
AG --> LLM
AG --> Tools
AG --> Skills
AG --> MCPTools
AG --> Memory
Memory --> Storage
BG --> Storage
BG --> TG
Skills --> MINI
BG --> REFL
Component Breakdown¶
FastAPI (app/main.py)¶
The web process entry point. Responsibilities:
- Mounts the WhatsApp webhook router, dashboard router, and health router (
/health/ingest,/health/backfill,/health/status) - Exposes
GET /healthandPOST /reflect - Exposes
POST /embed-backfill— bulk-embeds existing rows without embeddings; protected byX-Reflection-Secret; rate-limited to ~10 rows/sec - Exposes
POST /health/ingest— idempotent batch ingest for wearable samples from the Android bridge; protected byX-Health-Secret(Spec 010) - Exposes
POST /health/backfill— one-shot upload of a Samsung Health "Download Personal Data" CSV/JSON archive to seed historical samples; protected byX-Health-Secret - Exposes
GET /health/status— per-metric ingest freshness summary for monitoring the bridge from outside the device. Returns{now, latest_ingest, lag_minutes, metrics: [...]}with one entry per known metric type (zeros when never ingested) plus per-metricingest_lag_minutes. Protected byX-Health-Secretso polling it doesn't expose data shape (Spec 010) - Exposes
POST /message— external message endpoint for Android HTTP Shortcuts and other clients. Accepts text and/or images (multipart/form-data or JSON with base64). Protected byX-API-Tokenheader (API_TOKENenv var). Processes input through the full agent pipeline (intent routing, read-later injection, message history). Delivers response to Telegram (BRIEFING_CHAT_ID) and returns it in the JSON body. Conversation history is unified with Telegram — both interfaces useBRIEFING_CHAT_IDas the user ID so context is shared across surfaces. - Manages the FastAPI lifespan context — starts and stops the Telegram bot and all background asyncio tasks
- Exports API keys to
os.environso LLM SDKs can find them
Agent (app/agent.py)¶
The brain. Built with Pydantic AI. Responsibilities:
- Defines
AgentDeps— the dependency container passed to every tool and the system prompt build_deps()— single factory function used by all interfaces to createAgentDepsbuild_system_prompt()— async function decorated with@agent.system_prompt, assembles the full prompt from 8 static instruction sections + 3 dynamic storage-fetched sections (permanent facts, behavioral learnings, narrative profile). See System Prompt Architecture below.build_briefing_system_prompt()— extendsbuild_system_promptby appending all briefing structural templates (_MORNING_BRIEFING,_EVENING_RECAP,_WEEKLY_RECAP,_WEEKLY_PREP,_MEETING_PREP) at the system level. Used exclusively bybriefing_agent.- Registers all native tools via
@agent.tool - Calls
load_skills(agent)at startup to auto-register skills fromapp/skills/
Multi-Model Architecture¶
Three model roles are configured independently via env vars. Each is optimised for a different cost/capability trade-off:
| Role | Env var | Default | Used for |
|---|---|---|---|
| Primary | MODEL_NAME |
google-gla:gemini-2.5-flash |
Main conversational agent and all 12 domain agents — every interactive user message |
| Mini | MINI_MODEL_NAME |
google-gla:gemini-3-1-flash-lite-preview |
Skill synthesis: CV extraction, deep research synthesis, read-later summarisation, travel briefing, post-conversation fact extraction, session summarisation |
| Reflection | REFLECTION_MODEL_NAME |
(empty — falls back to MODEL_NAME) |
Nightly reflection run only — set to a larger/more capable model for deeper analysis without affecting interactive latency |
settings.effective_reflection_model resolves the fallback: if REFLECTION_MODEL_NAME is empty, it returns MODEL_NAME.
Why separate models? The mini model handles high-frequency, low-stakes synthesis tasks (run after every message) where cost and latency matter most. The primary model handles user-facing responses where quality matters. The reflection model is a deliberate override slot — if you want a frontier model for nightly analysis without paying for it on every chat message, set only REFLECTION_MODEL_NAME.
Telegram Interface (app/interfaces/telegram/bot.py)¶
The primary user-facing interface. Handles:
- Text messages — streamed response with live edit every 1.5s (Telegram rate limit)
- Voice/audio — downloads from Telegram, transcribes via Gemini STT, runs agent, replies with a voice note (TTS) for responses ≤500 words
- Photos — downloads highest resolution, runs Gemini Vision, runs agent
- Documents — PDFs via Gemini Vision, text files decoded directly
- Slash commands —
/start,/help,/notes,/tasks,/reminders(bypass the agent, hit storage directly) - Allowlist —
ALLOWED_TELEGRAM_USER_IDSchecked on every handler; blocks all if unset - Audit logging —
_write_audit_entries()writes sanitised tool call records toaudit_logafter each interaction - Inline approval flow — after streaming, scans tool call outputs for sentinel tokens (
[APPROVAL_PENDING:<uuid>]); edits the sent message to add Confirm/Cancel/Edit inline keyboard buttons.CallbackQueryHandlerhandles button taps — Confirm executes viaACTION_REGISTRY, Cancel marks cancelled, Edit prompts for a correction and re-runs the agent. Edit state is tracked incontext.user_data. - Post-conversation background tasks — after audit logging, fires two
asyncio.create_task()calls: (1)extract_facts_from_exchange()immediately extracts explicitly stated user facts; (2)_reschedule_session_close()schedules (or reschedules) a 30-minute delayedsummarise_session()call. Module-level_session_tasks: dict[str, asyncio.Task]tracks one pending task per chat. See Memory & Reflection.
Browser Tools (app/tools/browser.py)¶
Playwright-based headless Chromium. Features:
- Chrome user agent spoofing to reduce bot detection
- Asset blocking (images, fonts, media) for faster loads
- Semantic content extraction — targets
<main>,<article>,[role="main"]before falling back to<body> - Headline extraction from h1/h2/h3 in the content zone
- URL security: blocks
file://, localhost, and private IPs; optionalBROWSER_ALLOWED_DOMAINSallowlist - Graceful degradation if Playwright is not installed
GitHub Tools (app/tools/github.py)¶
PyGitHub-based integration. All API calls are synchronous and wrapped with asyncio.to_thread(). Gracefully returns a clear message if GITHUB_TOKEN is not configured or PyGitHub is not installed.
Embedding (app/tools/embedding.py)¶
embed_text(text, api_key) → list[float] | None — calls the Gemini gemini-embedding-001 REST API directly via httpx (3072 dimensions). Called internally by both storage adapters after each write to notes, interactions, and read_later. Returns None when GOOGLE_API_KEY is not set or the API call fails — never raises. Falls back to gemini-embedding-2-preview if the primary model returns 404.
Slack Tools (app/tools/slack.py)¶
slack-sdk WebClient integration. All API calls are synchronous and wrapped with asyncio.to_thread(). Username and channel name resolution are cached within each tool call. Gracefully returns a clear message if SLACK_BOT_TOKEN is not configured or slack-sdk is not installed. Requires the bot to be invited to any channel it reads or posts in.
Intent Router (app/tools/router.py + app/routing/agents.py)¶
Keyword-based intent classification and domain agent dispatch. Runs on every text message (Telegram and WhatsApp) before the agent is invoked.
Taxonomy (TOOL_CATEGORIES) — maps twelve domain names to their canonical tool lists: utility, memory, email, calendar, github, news, slack, jira, drive, meetings, diagnostics, health. TOOL_TO_CATEGORY is the reverse lookup used in tests.
classify_intent(message, context_hint=None) — lowercases the message and checks it against keyword lists for each domain (e.g. "email", "inbox", "draft" → email). Returns a set[str] of matched categories, always including "utility" if any other domain is matched. Returns an empty set for conversational messages. Zero LLM cost — microsecond latency.
When a message produces no keyword match, an optional context_hint: set[str] (the domain from the previous turn) can be passed in. If the hint is a single non-utility domain, it is inherited — so short follow-ups like "anything else?" continue in the same domain agent instead of falling back to the full 84-tool agent. Multi-domain or utility-only hints are not inherited (too ambiguous). Both handle_message (Telegram) and _process_message (WhatsApp) derive the hint from the most recent stored interaction before calling classify_intent.
select_agent(categories) — dispatches to the right agent:
- Empty set → full_agent (conversational fallback)
- {"utility"} only → utility_agent
- One non-utility domain (e.g. {"email", "utility"}) → email_agent
- Multiple non-utility domains → _get_composed_agent(frozenset(categories)) — a dynamically built agent whose tool set is the union of all matched domains' tools, cached with lru_cache(maxsize=32). The full agent is only used for empty/ambiguous sets.
Domain agents — 12 interactive Agent instances + 1 briefing_agent, all built once at import time in app/routing/agents.py. Each carries only its relevant tool subset, reducing the tool list the LLM sees. They share build_system_prompt and the active_categories focus hint injected into AgentDeps per request.
The jira_agent handles Jira queries (tickets, sprints, projects) via the six jira_* tools. Requires JIRA_BASE_URL, JIRA_EMAIL, and JIRA_API_TOKEN.
The drive_agent handles Google Drive queries (search, read, list recent, transcripts) via the five Drive MCP tools. Supports personal (GOOGLE_DRIVE_REFRESH_TOKEN) and work (GOOGLE_DRIVE_WORK_REFRESH_TOKEN) accounts.
The meetings_agent handles meeting intelligence queries via get_meeting_insights and list_recent_meetings skills (from app/skills/meeting_notes.py), plus Drive MCP tools and calendar MCP tools for context. Intent keywords include "recap the meeting", "notes from the", "what came out of", and "standup notes".
The diagnostics_agent handles self-diagnosis queries (errors, performance, loop health) via the three Logfire MCP tools. Its intent keywords use specific multi-word phrases (e.g. "your errors", "did the briefing run") to avoid false positives on common words like "error".
The health_agent (Spec 010) handles wearable / Samsung Watch queries via four read-only tools (get_recent_health, get_sleep_summary, get_hrv_trend, get_health_snapshot). Routed by intent keywords like "how did i sleep", "my hrv", "resting heart rate", "step count". Data is ingested separately by the Android bridge — the agent only reads.
Planning Layer (app/planning/) — Spec 008¶
A multi-step decomposition pass that runs before intent routing on Telegram messages, so requests like "check my email, then post a summary in #standup, then add a follow-up task" execute as a sequenced plan instead of one ambiguous tool dance.
planner.py—classify_complexity(text)is a regex pre-filter (≥30 chars + connectives like"and then","after that","also send") that gates the LLM call.generate_plan(text, deps)runs a tool-less_planner_agent: Agent[None, ExecutionPlan]. ReturnsNonewhenneeds_planning=Falseorlen(steps) < 2— the message then falls through to the normal single-step flow.executor.py—format_plan_preview(plan)renders the plan as a Telegram approval message.execute_plan(plan, deps, send_progress)runs steps sequentially; each step is independently routed viaclassify_intent+select_agent, with truncated outputs of prior steps threaded forward as a scratchpad so later steps can build on earlier results. A live▶/✓progress checklist is edited into the same message after each step.- Resume on failure — when a step throws,
execute_plansaves aPendingAction(action_type="plan_resume")with aPlanResumePayload(plan, remaining_steps, scratchpad, failed_step, failure_reason)and stops. Confirm restarts fromremaining_steps; Cancel aborts cleanly.
The plan itself is always behind the standard inline approval keyboard, even when none of the constituent tools would normally be gated. Full sequence: Multi-step plan flow.
Observability — Logfire + Langfuse (app/observability.py) — Spec 009¶
Two complementary observability stacks share a single OpenTelemetry tracer provider and run side-by-side without duplicating instrumentation:
graph LR
PA[Pydantic AI agent.run / run_stream] --> OTEL[OTEL TracerProvider]
FAST[FastAPI requests] --> OTEL
OTEL --> LF[Logfire processor\ninfra · FastAPI routes · loop spans]
OTEL --> LFC[Langfuse processor\nLLM generations · sessions · scores]
LF --> LFCloud[(Logfire Cloud)]
LFC --> LFCloud2[(Langfuse Cloud)]
LF -. self-diagnosis MCP .-> AG[Agent — diagnostics_agent]
init_observability(settings, app) is called once from create_app() before any agent runs. It configures Logfire (logfire.configure() + instrument_fastapi) when LOGFIRE_TOKEN is set, and calls Agent.instrument_all() from Pydantic AI when Langfuse is enabled so spans land on the global OTEL provider where both processors observe them. (When only Logfire is enabled, logfire.instrument_pydantic_ai() is used instead — it routes through Logfire's internal tracer, which Langfuse would not see.)
Per-turn trace grouping. langfuse_root_span(name, session_id, user_id, …) is an async context manager wrapping the whole turn so every agent.run() inside it nests under one Langfuse trace instead of producing one trace per call.
Stage-level spans inside a turn. Pydantic AI's Agent.instrument_all() covers LLM generations + tool calls automatically, but the preprocessing and routing stages would otherwise be invisible. The following functions are decorated with Langfuse @observe so they appear as nested children of telegram.turn:
preprocess.stt—app.tools.voice.transcribe_audio(audio bytes → transcript)preprocess.vision—app.tools.vision.analyze_image(image bytes → description)embed_text—app.tools.embedding.embed_text(sanitized: only chars/dims/model captured, never the vector or api_key)routing.classify_intent—app.tools.router.classify_intent(output:{categories, method}where method iskeyword | semantic | context_hint | none)context_injection—app.utils.message_utils.inject_context(output: per-layer counts + 1500-char preview of what was prepended to the user message)message_history_retrieval—app.utils.message_utils.fetch_message_history(metadata:{mode: chronological|semantic, recent_count, semantic_count, semantic_enabled};mode=errorindicates the semantic path failed and chronological was used)
update_current_span(input=, output=, metadata=) in observability.py is the helper they use to attach sanitized payloads; it degrades to a no-op when Langfuse is disabled.
Asynchronous trace scoring. When a Confirm/Cancel/Edit happens minutes after the original turn closed, score_trace(action.trace_id, name, value) attaches a quality score to the original trace. PendingAction.trace_id is captured at gate time via OTEL context (_current_otel_trace_id() in app/approval.py) precisely so this delayed score can land. Three score names are emitted today: user_approval (Confirm 1.0 / Cancel 0.0), user_edit (Edit tap), and agent_error (in-flight exception, written via score_current_trace).
Langfuse-managed Prompts (app/prompts.py + prompts.lock.json) — Spec 009¶
Nine system prompts can be tuned in the Langfuse UI without redeploying — get_prompt(name, fallback) returns the production-labeled Langfuse version when reachable, otherwise the code constant. prompts.lock.json (repo root) pins each constant's sha256, and check_drift() runs at startup to warn when code has been edited without being pushed. The sync workflow lives in scripts/sync_prompts.py; full prompt list and CLI usage in Tools → Langfuse-managed Prompts.
Briefing Agent (app/routing/agents.py → briefing_agent)¶
A dedicated agent used exclusively by the scheduled briefing loops (morning, evening, weekly). It is never invoked for interactive user messages.
Tool set: memory (tasks, notes, reminders) + utility (weather, datetime, search, URL, maps) + all email MCP tools + all calendar MCP tools + Microsoft To Do (todo_*) MCP tools.
Why separate? The interactive agents are designed around one domain. The morning briefing legitimately needs email + calendar + tasks + weather simultaneously — a cross-domain request that would normally fall back to the full 45-tool full_agent. briefing_agent gives it a pre-composed, focused tool set without the overhead of GitHub, news, or browser tools that are irrelevant for briefings.
Skill Registry (app/skills/)¶
File-drop plugin architecture. Any .py file placed in app/skills/ with a function decorated @skill is auto-discovered and registered as an agent tool on startup via load_skills(agent). Modules starting with _ are ignored. Loading is idempotent — calling load_skills() twice registers nothing new.
Built-in skills:
- read_later.py — Content Curator (save_to_read_later, list_read_later, delete_read_later, get_read_later_digest)
- travel_briefing.py — get_travel_briefing — on-demand travel summary (weather + maps/transit + LLM synthesis)
- cv.py — store_cv, get_cv — parse and persist CV as structured user facts for job application workflows
- research.py — deep_research — multi-step web research: generates sub-questions, fetches and synthesises sources in parallel, saves result as a "Research: <topic>" note
- meeting_notes.py — get_meeting_insights, list_recent_meetings — extract decisions/actions/key points from Drive transcripts via mini-model; save as notes. Source-agnostic: _SOURCE_REGISTRY maps source name to fetcher, allowing Teams or Notion to be wired in later.
System Prompt Architecture¶
build_system_prompt() assembles the agent's full context from three categories of sections, always in this order:
graph TD
subgraph Static["Static Instruction Sections (identical across requests — cache-friendly)"]
direction LR
A1[_PERSONA\nVoice · banned openers · cultural context]
A2[_DECISION_MAKING\nWhen to act vs. confirm · tool priority]
A3[_CONTEXT_USAGE\nHow to apply the profile silently]
A4[_MEMORY_INSTRUCTIONS\nFacts store · search priority order]
A5[_JOURNAL_INSTRUCTIONS\nWhen to call save_journal_entry]
A6[_TONE_CALIBRATION\nSituational energy matching]
A7[_OUTPUT_FORMAT\nMobile formatting · brevity rules]
A8[_ERROR_HANDLING\nRetry-first · plain-English failure summaries]
A9[_TOOLS_AND_ACCOUNTS\nTool guidance scoped to active domain]
A10[_TOOL_USAGE_EXAMPLES\nExample flows]
A11[_APPROVAL_INSTRUCTIONS\nSentinel handling · never show raw token]
A12[focus_section\nActive domain focus hint if scoped]
end
subgraph Dynamic["Dynamic Sections (fetched from storage per request)"]
direction LR
B1[facts_section\nuser_facts table — key/value facts\nQuick Reference Facts header]
B2[learnings_section\nagent_learnings table status=active\nBehavioral Guidelines header]
B3[context_section\ncontext table — narrative profile\ncapped at 4,800 chars ≈ 1,200 tokens]
end
Static --> Prompt[System Prompt\n~3,000–6,000 tokens]
Dynamic --> Prompt
Key design constraints:
- Datetime is NOT in the system prompt. It is prepended to the user turn (as
[Thursday, April 24, 2026 — 09:15 AM Europe/Paris]) so the stable system prompt sections form a consistent prefix across requests and qualify for Gemini's implicit prompt cache. A changed datetime would bust the cache on every request. - Profile is capped at 4,800 chars (~1,200 tokens). The nightly reflection profile grows over time — the cap prevents unbounded system prompt growth while keeping the most recent content intact.
- Tool sections are scoped. When
active_categoriesis set (i.e. a domain agent is running), only the relevant_DOMAIN_TOOL_SECTIONSentries are included — cutting ~600–1,000 tokens of irrelevant tool guidance from each domain-routed request. - Briefing agent uses
build_briefing_system_prompt(), which extends the base prompt by appending_MORNING_BRIEFING,_EVENING_RECAP,_WEEKLY_RECAP,_WEEKLY_PREP, and_MEETING_PREPtemplates. These arrive at the system level so the model's structural understanding of briefing formats is cached, not re-parsed per call.
Personality directives (_PERSONA + _ERROR_HANDLING):
Two sections encode Kwasi's distinctive character:
_PERSONA— Direct, warm-but-not-performative voice. Natural Ghanaian cultural familiarity. Light humor where it lands. Banned openers list:"I'm sorry","Unfortunately","I apologise","I wasn't able to","I'm afraid"— the model must reframe as what it CAN do._ERROR_HANDLING— Retry-first directive: when a tool fails, attempt at least one alternative (different tool, rephrased query, decomposed sub-task) before reporting failure. When all paths are exhausted: structured ≤3-bullet plain-English summary of what was tried + one concrete next step. No raw error codes or exception names in any user-facing response.
Dashboard (app/routers/dashboard.py)¶
FastAPI router mounted at /dashboard/. Four views (interactions, audit log, scheduled tasks, memory profile). Protected by DASHBOARD_SECRET — disabled entirely if the env var is not set. Auth via X-Dashboard-Secret header or ?token= query param.
MCP Tools (app/interfaces/mcp/client.py)¶
Gmail, Outlook email, Outlook calendar, and Microsoft To Do tools are synchronous (Google/MS APIs). They are wrapped with asyncio.to_thread() to avoid blocking the event loop. get_mcp_tools() returns all wrappers; agents.py filters them into _EMAIL_MCP, _CALENDAR_MCP, and _TODO_MCP sets for domain-specific routing.
gmail_read_email_wrapper — fetches the complete body of a Gmail message (up to 20,000 chars). Walks the MIME part tree recursively, prefers text/plain, strips HTML from text/html fallback. The search_emails_wrapper and summary tools only return a short snippet (~100–200 chars) — always follow up with gmail_read_email_wrapper when the full text is needed.
outlook_read_wrapper — fetches the complete body of an Outlook message (up to 20,000 chars). Strips HTML (Graph API returns HTML by default), includes From/Subject/Date fields.
Google Calendar shared calendars — google_get_todays_schedule_wrapper and google_get_calendar_events_wrapper query all calendars the user has access to (primary, shared, subscribed) by calling calendarList().list() first. Each event is prefixed with [Calendar Name] in the output.
Microsoft To Do tools (todo_*) use the same OutlookClient._make_request pattern as Outlook email and calendar. They require Tasks.ReadWrite scope on the Outlook refresh token.
Memory (app/memory/)¶
ports.py—StoragePortprotocol + all data models (Interaction,Note,Task,Reminder,UserContext,UserFact,ScheduledTask,AuditEntry,ReadLaterItem,NewsTopic,SeenStory,AlertRule,PendingAction,PendingIntention,AgentLearning,JournalEntry,SemanticSearchResult). TheHealthSamplePydantic model lives inapp/health/models.py.adapters/sqlite.py— SQLite implementation (local dev,:memory:for tests). Tables:interactions,notes,tasks,reminders,context,user_facts,scheduled_tasks,audit_log,read_later,news_topics,seen_news,alert_rules,pending_actions,pending_intentions,agent_learnings,journal_entries,health_samples.adapters/postgres.py— asyncpg connection pool (Railway production). Same tables as SQLite.health_samples.valueisJSONBin Postgres and JSON-encoded TEXT in SQLite.- Both adapters embed new records on write via
embed_text()(Gemini gemini-embedding-001, fire-and-forget) and implementsemantic_search()— Python cosine in SQLite, pgvector<=>in Postgres. logger.py—log_interaction()helper used by all handlerspost_conversation.py—extract_facts_from_exchange()(immediate post-message fact extraction) andsummarise_session()(30-min session-close summarisation); both fire-and-forget viaasyncio.create_task()reflection.py—ReflectionService— nightly LLM pass producing four outputs: updatedUserContextnarrative profile, auto-extractedUserFactrecords, newPendingIntentionrecords, andAgentLearningcorrections
Background Loops¶
All background loops run as asyncio tasks within the FastAPI lifespan.
| Loop | Trigger condition | Schedule |
|---|---|---|
| Reflection | REFLECTION_SECRET set |
2 AM UTC daily |
| Reminders | Telegram token configured | Every 5 minutes |
| Daily Briefing | BRIEFING_CHAT_ID set |
BRIEFING_TIME UTC daily (default 08:00) |
| Evening Recap | BRIEFING_CHAT_ID set |
EVENING_RECAP_TIME local (default 21:00) |
| Weekly Recap | BRIEFING_CHAT_ID set |
WEEKLY_RECAP_DAY/WEEKLY_RECAP_TIME local (default Friday 18:00) |
| Weekly Prep | BRIEFING_CHAT_ID set |
WEEKLY_PREP_DAY/WEEKLY_PREP_TIME local (default Sunday 18:00) |
| Read-Later Digest | BRIEFING_CHAT_ID set |
READ_LATER_DIGEST_DAY/READ_LATER_DIGEST_TIME local (default Saturday 09:00) |
| Journal Digest | BRIEFING_CHAT_ID set |
JOURNAL_DIGEST_DAY/JOURNAL_DIGEST_TIME local (default Sunday 19:00) |
| Email Intelligence | BRIEFING_CHAT_ID + email creds |
EMAIL_INTEL_TIME local (default 09:30); set "" to disable |
| User Scheduled Tasks | BRIEFING_CHAT_ID set |
Every 60 seconds (croniter-based evaluation) |
| Alert Rules + Intentions | BRIEFING_CHAT_ID set |
Every 5 minutes |
| Meeting Prep | BRIEFING_CHAT_ID set |
Every 5 minutes (fires ≈MEETING_PREP_LEAD_MINUTES before each meeting) |
| Approval Expiry | Telegram bot active | Every 5 minutes |
Key Design Decisions¶
Why Pydantic AI?¶
Pydantic AI handles the agent loop (tool calls → LLM → tool calls → ...), type-safe tool definitions, streaming, and model switching. The Agent class abstracts over different LLM providers — switching from Gemini to Claude is a single env var change.
Why polling instead of webhook for Telegram?¶
Long-polling (updater.start_polling) is simpler to deploy on Railway — no public URL needed for the bot itself, no TLS certificate management. The WhatsApp webhook still uses a proper HTTP endpoint since the Meta platform requires it.
Why asyncio tasks for background loops?¶
Avoids the complexity of a separate worker process, Celery, or a job scheduler. Since the app is single-user and the loops are low-frequency (5 min / daily / 60 sec), asyncio tasks inside the FastAPI lifespan are sufficient and operationally simple.
Why MCP for email/calendar?¶
MCP (Model Context Protocol via fastmcp) provides a clean abstraction for external tool servers. Gmail and Outlook have separate auth flows (OAuth2 with Google vs. MSAL with Microsoft) — MCP isolates this complexity from the agent.
Why native PyGitHub instead of the official GitHub MCP server?¶
The existing "MCP" tools in this project are local Python wrappers, not external server processes. PyGitHub follows the same pattern as other tools (weather, search, news) — in-process, async-wrapped, consistent output formatting. The official GitHub MCP server is a Go binary designed for IDE integrations, not embedded service use.
Why a file-drop skill registry?¶
Allows extending the agent's capabilities without touching agent.py. New skills can be developed, tested, and dropped in independently. The registry is idempotent and fail-safe — a broken skill file is skipped with a logged error.
Why keyword-based intent routing instead of LLM-based routing?¶
The routing step runs on every message, so it must have near-zero latency and zero cost. Keyword matching is deterministic, testable, and trivially debuggable — if it misroutes a message, you can see exactly why by reading the keyword list. An LLM-based router would add a full extra inference step per message, significantly increasing latency and cost for a single-user assistant. The keyword approach covers ~95% of real messages correctly; the remaining 5% fall through to the full agent, which is always the safe fallback.
Why a sentinel token instead of pausing the agent mid-run for approval?¶
The agent runs to completion — it always produces a text response. Pausing mid-run would require a resumable coroutine, persistent agent state, and a way to re-inject the approval result back into an in-progress LLM call. The sentinel pattern avoids all of this: the agent sees "[APPROVAL_PENDING:uuid]" as a normal tool result, writes a response like "your action is pending approval", and exits. handle_message post-processes the result synchronously. On Confirm, a simple direct executor call (no LLM re-run) performs the action. This keeps the agent loop stateless and the approval flow outside the LLM's critical path.
Why a separate app/approval.py module?¶
approval.py imports only from app/memory/ports — it has no knowledge of agent.py, bot.py, or client.py. This breaks the potential circular import chain: agent.py imports approval.py, bot.py imports approval.py, and client.py imports approval.py — but none of these files import each other. If the gate lived in agent.py or bot.py, it would create a cycle.
Why domain agents instead of a single agent with all tools?¶
Reducing the tool list the LLM sees for each request reduces token usage per call and improves model focus — a model with 8 tools performs more reliably than one with 50. Domain agents are built once at import time (zero per-request cost), share the same system prompt function, and fall back transparently to the full agent for anything ambiguous. The full agent is always the safety net.
Why both Logfire and Langfuse?¶
Different jobs. Logfire owns infrastructure and request-shape tracing — FastAPI routes, background-loop ticks, exceptions, latency by file path. The diagnostics_agent uses it via Logfire's MCP read tool to answer "did the briefing run?" or "which tool is slowest?". Langfuse owns the LLM layer — per-generation token usage and cost, prompt versions, session/user grouping, trace-level quality scores tied to user actions (Confirm/Cancel/Edit). Both processors hang off the same OTEL tracer provider in app/observability.py, so Pydantic AI is instrumented once and spans flow to both backends without duplication. Either side can be disabled (skip the env vars) and the other keeps working.
Why a push model for health data instead of a server-side polling loop?¶
Neither Samsung Health nor Google Health Connect exposes a cloud / server-side REST API — both store data on-device, encrypted, behind explicit user consent. There is no OAuth Kwasi could call from FastAPI to pull samples. Two options remained: (a) integrate a paid third-party aggregator (Terra/Rook/Thryve, ~$400/mo minimum) which itself still requires an Android piece, or (b) write a tiny sideloaded Android bridge (bridge-android/) that reads Health Connect locally and POSTs to /health/ingest. The bridge is one Kotlin Gradle project, ~600 LOC, and never goes through the Play Store — so we sidestep Google's sensitive-permissions review entirely. Per-metric watermarks in DataStore prevent re-sends and gaps across reboots; the server is idempotent on (metric_type, start_time, source_device) so retries are safe. Result: zero ongoing cost, full data ownership, and the rest of Kwasi's read path is identical to any other agent tool.
Why semantic search instead of upgrading keyword search?¶
Keyword (LIKE / ILIKE) search is fast and good enough for exact recall. Semantic search complements it — it finds content by meaning when the user's words don't match the stored text (e.g. "anything about burnout" finding a note titled "work-life balance thoughts"). Rather than replacing keyword search, the agent uses both: keyword first (zero latency, no API call), semantic as a fallback or in parallel via find_everything. Embeddings are stored at write time so queries are instant. The embed_text call at write time costs one Gemini API call per saved record — negligible for a personal assistant.