Skip to content

Architecture

System Overview

---
config:
    layout: elk
---
graph LR
    subgraph Interfaces
        TG[Telegram\npython-telegram-bot polling]
        WA[WhatsApp\nwebhook]
        CLI[CLI\nlocal REPL]
        DASH[Dashboard\n/dashboard/]
    end

    subgraph Planning["Planning Layer (app/planning/) — Spec 008"]
        CC[classify_complexity\nregex pre-filter]
        GP[generate_plan\nplanner agent]
        EP[execute_plan\nstep loop · scratchpad threading]
    end

    subgraph Routing["Intent Routing (app/tools/router.py + app/routing/agents.py)"]
        CI[classify_intent\nkeyword matcher]
        SA[select_agent\ndispatcher]
        DA[Domain Agents\nemail · calendar · memory · github · news\nslack · jira · drive · meetings\ndiagnostics · health · utility]
        FA2[Full Agent\nfallback]
    end

    subgraph Core["Core (FastAPI + asyncio)"]
        FAST[FastAPI\napp/main.py]
        AG[Pydantic AI Agent\napp/agent.py]
        SP[System Prompt\nbuild_system_prompt]
    end

    subgraph Tools["Native Tools"]
        T1[search_web\nTavily]
        T2[get_weather\nWeatherAPI]
        T3[get_datetime]
        T4[summarize_url]
        T5[transcribe_audio\nGemini STT]
        T6[analyze_image\nGemini Vision]
        T7[browse_web\nbrowser_submit_form\nPlaywright]
        T8[github_*\nPyGitHub]
    end

    subgraph Skills["Skills (file-drop plugins)\napp/skills/"]
        S1[content_curator\nread_later.py]
        S2[travel_briefing]
        S3[cv]
        S4[deep_research\nresearch.py]
        S5[meeting_notes\nget_meeting_insights · list_recent_meetings]
    end

    subgraph MCPTools["MCP Tools"]
        M1[Gmail personal + work\ngmail_* tools]
        M2[Outlook Email\noutlook_* tools]
        M3[Google Calendar\ngoogle_* tools]
        M4[Outlook Calendar\noutlook_calendar_* tools]
        M5[Microsoft To Do\ntodo_* tools]
        M6[Google Drive\ndrive_* tools]
        M7[Logfire\nlogfire_* tools]
    end

    subgraph Memory["Memory Tools (agent tools over storage)"]
        N[Notes\nsave · get · search · edit · delete · semantic]
        TK[Tasks\ncreate · list · search · complete · edit · delete]
        RM[Reminders\nset · list · cancel]
        SC[Scheduled Tasks\ncreate · list · update · delete]
        HX[History\nsearch_history · semantic_search]
        UF[User Facts\nremember_fact · recall_facts · forget_fact]
    end

    subgraph Storage["Storage (StoragePort)"]
        SQ[(SQLite\nlocal dev)]
        PG[(PostgreSQL\nRailway prod)]
    end

    subgraph HealthIngest["Health Data Ingest (Spec 010 — push model, no server-side loop)"]
        WATCH[Samsung Watch / wearable]
        HC[Google Health Connect\non-device]
        BR["bridge-android/\nKotlin · WorkManager 15 min"]
        IN["POST /health/ingest\nX-Health-Secret · idempotent"]
        WATCH --> HC
        HC --> BR
        BR --> IN
        IN --> Storage
    end

    subgraph BG["Background Loops"]
        RL[Reminder Loop\nevery 5 min]
        BL[Briefing Loop\ndaily at BRIEFING_TIME]
        ST[Scheduled Tasks Loop\nevery 60 sec]
        RF[Reflection Loop\ndaily at 2 AM UTC]
        ER[Evening Recap Loop\ndaily local evening]
        WR[Weekly Recap Loop\nFriday local]
        WP[Weekly Prep Loop\nSunday local]
        RLD[Read-Later Digest\nSaturday morning]
        JD[Journal Digest\nSunday evening]
        EI[Email Intelligence\ndaily at EMAIL_INTEL_TIME]
        AL[Alert + Intentions Loop\nevery 5 min]
        MP[Meeting Prep Loop\nevery 5 min]
        AE[Approval Expiry Loop\nevery 5 min]
    end

    subgraph Models["LLM Models"]
        LLM[Primary LLM\nMODEL_NAME\nGemini / Claude / GPT-4]
        MINI[Mini LLM\nMINI_MODEL_NAME\nskill synthesis]
        REFL[Reflection LLM\nREFLECTION_MODEL_NAME\nnightly reflection]
    end

    TG --> CC
    CC -- complex --> GP
    GP -- 2+ steps --> EP
    CC -- simple --> CI
    GP -- 0–1 steps --> CI
    EP -- per step --> CI
    WA --> FAST
    CLI --> AG
    CI --> SA
    SA --> DA
    SA --> FA2
    DA --> LLM
    FA2 --> LLM
    FAST --> FAST
    DASH --> Storage
    AG --> SP
    SP --> Storage
    AG --> LLM
    AG --> Tools
    AG --> Skills
    AG --> MCPTools
    AG --> Memory
    Memory --> Storage
    BG --> Storage
    BG --> TG
    Skills --> MINI
    BG --> REFL

Component Breakdown

FastAPI (app/main.py)

The web process entry point. Responsibilities:

  • Mounts the WhatsApp webhook router, dashboard router, and health router (/health/ingest, /health/backfill, /health/status)
  • Exposes GET /health and POST /reflect
  • Exposes POST /embed-backfill — bulk-embeds existing rows without embeddings; protected by X-Reflection-Secret; rate-limited to ~10 rows/sec
  • Exposes POST /health/ingest — idempotent batch ingest for wearable samples from the Android bridge; protected by X-Health-Secret (Spec 010)
  • Exposes POST /health/backfill — one-shot upload of a Samsung Health "Download Personal Data" CSV/JSON archive to seed historical samples; protected by X-Health-Secret
  • Exposes GET /health/status — per-metric ingest freshness summary for monitoring the bridge from outside the device. Returns {now, latest_ingest, lag_minutes, metrics: [...]} with one entry per known metric type (zeros when never ingested) plus per-metric ingest_lag_minutes. Protected by X-Health-Secret so polling it doesn't expose data shape (Spec 010)
  • Exposes POST /message — external message endpoint for Android HTTP Shortcuts and other clients. Accepts text and/or images (multipart/form-data or JSON with base64). Protected by X-API-Token header (API_TOKEN env var). Processes input through the full agent pipeline (intent routing, read-later injection, message history). Delivers response to Telegram (BRIEFING_CHAT_ID) and returns it in the JSON body. Conversation history is unified with Telegram — both interfaces use BRIEFING_CHAT_ID as the user ID so context is shared across surfaces.
  • Manages the FastAPI lifespan context — starts and stops the Telegram bot and all background asyncio tasks
  • Exports API keys to os.environ so LLM SDKs can find them

Agent (app/agent.py)

The brain. Built with Pydantic AI. Responsibilities:

  • Defines AgentDeps — the dependency container passed to every tool and the system prompt
  • build_deps() — single factory function used by all interfaces to create AgentDeps
  • build_system_prompt() — async function decorated with @agent.system_prompt, assembles the full prompt from 8 static instruction sections + 3 dynamic storage-fetched sections (permanent facts, behavioral learnings, narrative profile). See System Prompt Architecture below.
  • build_briefing_system_prompt() — extends build_system_prompt by appending all briefing structural templates (_MORNING_BRIEFING, _EVENING_RECAP, _WEEKLY_RECAP, _WEEKLY_PREP, _MEETING_PREP) at the system level. Used exclusively by briefing_agent.
  • Registers all native tools via @agent.tool
  • Calls load_skills(agent) at startup to auto-register skills from app/skills/

Multi-Model Architecture

Three model roles are configured independently via env vars. Each is optimised for a different cost/capability trade-off:

Role Env var Default Used for
Primary MODEL_NAME google-gla:gemini-2.5-flash Main conversational agent and all 12 domain agents — every interactive user message
Mini MINI_MODEL_NAME google-gla:gemini-3-1-flash-lite-preview Skill synthesis: CV extraction, deep research synthesis, read-later summarisation, travel briefing, post-conversation fact extraction, session summarisation
Reflection REFLECTION_MODEL_NAME (empty — falls back to MODEL_NAME) Nightly reflection run only — set to a larger/more capable model for deeper analysis without affecting interactive latency

settings.effective_reflection_model resolves the fallback: if REFLECTION_MODEL_NAME is empty, it returns MODEL_NAME.

Why separate models? The mini model handles high-frequency, low-stakes synthesis tasks (run after every message) where cost and latency matter most. The primary model handles user-facing responses where quality matters. The reflection model is a deliberate override slot — if you want a frontier model for nightly analysis without paying for it on every chat message, set only REFLECTION_MODEL_NAME.

Telegram Interface (app/interfaces/telegram/bot.py)

The primary user-facing interface. Handles:

  • Text messages — streamed response with live edit every 1.5s (Telegram rate limit)
  • Voice/audio — downloads from Telegram, transcribes via Gemini STT, runs agent, replies with a voice note (TTS) for responses ≤500 words
  • Photos — downloads highest resolution, runs Gemini Vision, runs agent
  • Documents — PDFs via Gemini Vision, text files decoded directly
  • Slash commands/start, /help, /notes, /tasks, /reminders (bypass the agent, hit storage directly)
  • AllowlistALLOWED_TELEGRAM_USER_IDS checked on every handler; blocks all if unset
  • Audit logging_write_audit_entries() writes sanitised tool call records to audit_log after each interaction
  • Inline approval flow — after streaming, scans tool call outputs for sentinel tokens ([APPROVAL_PENDING:<uuid>]); edits the sent message to add Confirm/Cancel/Edit inline keyboard buttons. CallbackQueryHandler handles button taps — Confirm executes via ACTION_REGISTRY, Cancel marks cancelled, Edit prompts for a correction and re-runs the agent. Edit state is tracked in context.user_data.
  • Post-conversation background tasks — after audit logging, fires two asyncio.create_task() calls: (1) extract_facts_from_exchange() immediately extracts explicitly stated user facts; (2) _reschedule_session_close() schedules (or reschedules) a 30-minute delayed summarise_session() call. Module-level _session_tasks: dict[str, asyncio.Task] tracks one pending task per chat. See Memory & Reflection.

Browser Tools (app/tools/browser.py)

Playwright-based headless Chromium. Features:

  • Chrome user agent spoofing to reduce bot detection
  • Asset blocking (images, fonts, media) for faster loads
  • Semantic content extraction — targets <main>, <article>, [role="main"] before falling back to <body>
  • Headline extraction from h1/h2/h3 in the content zone
  • URL security: blocks file://, localhost, and private IPs; optional BROWSER_ALLOWED_DOMAINS allowlist
  • Graceful degradation if Playwright is not installed

GitHub Tools (app/tools/github.py)

PyGitHub-based integration. All API calls are synchronous and wrapped with asyncio.to_thread(). Gracefully returns a clear message if GITHUB_TOKEN is not configured or PyGitHub is not installed.

Embedding (app/tools/embedding.py)

embed_text(text, api_key) → list[float] | None — calls the Gemini gemini-embedding-001 REST API directly via httpx (3072 dimensions). Called internally by both storage adapters after each write to notes, interactions, and read_later. Returns None when GOOGLE_API_KEY is not set or the API call fails — never raises. Falls back to gemini-embedding-2-preview if the primary model returns 404.

Slack Tools (app/tools/slack.py)

slack-sdk WebClient integration. All API calls are synchronous and wrapped with asyncio.to_thread(). Username and channel name resolution are cached within each tool call. Gracefully returns a clear message if SLACK_BOT_TOKEN is not configured or slack-sdk is not installed. Requires the bot to be invited to any channel it reads or posts in.

Intent Router (app/tools/router.py + app/routing/agents.py)

Keyword-based intent classification and domain agent dispatch. Runs on every text message (Telegram and WhatsApp) before the agent is invoked.

Taxonomy (TOOL_CATEGORIES) — maps twelve domain names to their canonical tool lists: utility, memory, email, calendar, github, news, slack, jira, drive, meetings, diagnostics, health. TOOL_TO_CATEGORY is the reverse lookup used in tests.

classify_intent(message, context_hint=None) — lowercases the message and checks it against keyword lists for each domain (e.g. "email", "inbox", "draft"email). Returns a set[str] of matched categories, always including "utility" if any other domain is matched. Returns an empty set for conversational messages. Zero LLM cost — microsecond latency.

When a message produces no keyword match, an optional context_hint: set[str] (the domain from the previous turn) can be passed in. If the hint is a single non-utility domain, it is inherited — so short follow-ups like "anything else?" continue in the same domain agent instead of falling back to the full 84-tool agent. Multi-domain or utility-only hints are not inherited (too ambiguous). Both handle_message (Telegram) and _process_message (WhatsApp) derive the hint from the most recent stored interaction before calling classify_intent.

select_agent(categories) — dispatches to the right agent: - Empty set → full_agent (conversational fallback) - {"utility"} only → utility_agent - One non-utility domain (e.g. {"email", "utility"}) → email_agent - Multiple non-utility domains → _get_composed_agent(frozenset(categories)) — a dynamically built agent whose tool set is the union of all matched domains' tools, cached with lru_cache(maxsize=32). The full agent is only used for empty/ambiguous sets.

Domain agents — 12 interactive Agent instances + 1 briefing_agent, all built once at import time in app/routing/agents.py. Each carries only its relevant tool subset, reducing the tool list the LLM sees. They share build_system_prompt and the active_categories focus hint injected into AgentDeps per request.

The jira_agent handles Jira queries (tickets, sprints, projects) via the six jira_* tools. Requires JIRA_BASE_URL, JIRA_EMAIL, and JIRA_API_TOKEN.

The drive_agent handles Google Drive queries (search, read, list recent, transcripts) via the five Drive MCP tools. Supports personal (GOOGLE_DRIVE_REFRESH_TOKEN) and work (GOOGLE_DRIVE_WORK_REFRESH_TOKEN) accounts.

The meetings_agent handles meeting intelligence queries via get_meeting_insights and list_recent_meetings skills (from app/skills/meeting_notes.py), plus Drive MCP tools and calendar MCP tools for context. Intent keywords include "recap the meeting", "notes from the", "what came out of", and "standup notes".

The diagnostics_agent handles self-diagnosis queries (errors, performance, loop health) via the three Logfire MCP tools. Its intent keywords use specific multi-word phrases (e.g. "your errors", "did the briefing run") to avoid false positives on common words like "error".

The health_agent (Spec 010) handles wearable / Samsung Watch queries via four read-only tools (get_recent_health, get_sleep_summary, get_hrv_trend, get_health_snapshot). Routed by intent keywords like "how did i sleep", "my hrv", "resting heart rate", "step count". Data is ingested separately by the Android bridge — the agent only reads.

Planning Layer (app/planning/) — Spec 008

A multi-step decomposition pass that runs before intent routing on Telegram messages, so requests like "check my email, then post a summary in #standup, then add a follow-up task" execute as a sequenced plan instead of one ambiguous tool dance.

  • planner.pyclassify_complexity(text) is a regex pre-filter (≥30 chars + connectives like "and then", "after that", "also send") that gates the LLM call. generate_plan(text, deps) runs a tool-less _planner_agent: Agent[None, ExecutionPlan]. Returns None when needs_planning=False or len(steps) < 2 — the message then falls through to the normal single-step flow.
  • executor.pyformat_plan_preview(plan) renders the plan as a Telegram approval message. execute_plan(plan, deps, send_progress) runs steps sequentially; each step is independently routed via classify_intent + select_agent, with truncated outputs of prior steps threaded forward as a scratchpad so later steps can build on earlier results. A live / progress checklist is edited into the same message after each step.
  • Resume on failure — when a step throws, execute_plan saves a PendingAction(action_type="plan_resume") with a PlanResumePayload(plan, remaining_steps, scratchpad, failed_step, failure_reason) and stops. Confirm restarts from remaining_steps; Cancel aborts cleanly.

The plan itself is always behind the standard inline approval keyboard, even when none of the constituent tools would normally be gated. Full sequence: Multi-step plan flow.

Observability — Logfire + Langfuse (app/observability.py) — Spec 009

Two complementary observability stacks share a single OpenTelemetry tracer provider and run side-by-side without duplicating instrumentation:

graph LR
    PA[Pydantic AI agent.run / run_stream] --> OTEL[OTEL TracerProvider]
    FAST[FastAPI requests] --> OTEL
    OTEL --> LF[Logfire processor\ninfra · FastAPI routes · loop spans]
    OTEL --> LFC[Langfuse processor\nLLM generations · sessions · scores]
    LF --> LFCloud[(Logfire Cloud)]
    LFC --> LFCloud2[(Langfuse Cloud)]
    LF -. self-diagnosis MCP .-> AG[Agent — diagnostics_agent]

init_observability(settings, app) is called once from create_app() before any agent runs. It configures Logfire (logfire.configure() + instrument_fastapi) when LOGFIRE_TOKEN is set, and calls Agent.instrument_all() from Pydantic AI when Langfuse is enabled so spans land on the global OTEL provider where both processors observe them. (When only Logfire is enabled, logfire.instrument_pydantic_ai() is used instead — it routes through Logfire's internal tracer, which Langfuse would not see.)

Per-turn trace grouping. langfuse_root_span(name, session_id, user_id, …) is an async context manager wrapping the whole turn so every agent.run() inside it nests under one Langfuse trace instead of producing one trace per call.

Stage-level spans inside a turn. Pydantic AI's Agent.instrument_all() covers LLM generations + tool calls automatically, but the preprocessing and routing stages would otherwise be invisible. The following functions are decorated with Langfuse @observe so they appear as nested children of telegram.turn:

  • preprocess.sttapp.tools.voice.transcribe_audio (audio bytes → transcript)
  • preprocess.visionapp.tools.vision.analyze_image (image bytes → description)
  • embed_textapp.tools.embedding.embed_text (sanitized: only chars/dims/model captured, never the vector or api_key)
  • routing.classify_intentapp.tools.router.classify_intent (output: {categories, method} where method is keyword | semantic | context_hint | none)
  • context_injectionapp.utils.message_utils.inject_context (output: per-layer counts + 1500-char preview of what was prepended to the user message)
  • message_history_retrievalapp.utils.message_utils.fetch_message_history (metadata: {mode: chronological|semantic, recent_count, semantic_count, semantic_enabled}; mode=error indicates the semantic path failed and chronological was used)

update_current_span(input=, output=, metadata=) in observability.py is the helper they use to attach sanitized payloads; it degrades to a no-op when Langfuse is disabled.

Asynchronous trace scoring. When a Confirm/Cancel/Edit happens minutes after the original turn closed, score_trace(action.trace_id, name, value) attaches a quality score to the original trace. PendingAction.trace_id is captured at gate time via OTEL context (_current_otel_trace_id() in app/approval.py) precisely so this delayed score can land. Three score names are emitted today: user_approval (Confirm 1.0 / Cancel 0.0), user_edit (Edit tap), and agent_error (in-flight exception, written via score_current_trace).

Langfuse-managed Prompts (app/prompts.py + prompts.lock.json) — Spec 009

Nine system prompts can be tuned in the Langfuse UI without redeploying — get_prompt(name, fallback) returns the production-labeled Langfuse version when reachable, otherwise the code constant. prompts.lock.json (repo root) pins each constant's sha256, and check_drift() runs at startup to warn when code has been edited without being pushed. The sync workflow lives in scripts/sync_prompts.py; full prompt list and CLI usage in Tools → Langfuse-managed Prompts.

Briefing Agent (app/routing/agents.pybriefing_agent)

A dedicated agent used exclusively by the scheduled briefing loops (morning, evening, weekly). It is never invoked for interactive user messages.

Tool set: memory (tasks, notes, reminders) + utility (weather, datetime, search, URL, maps) + all email MCP tools + all calendar MCP tools + Microsoft To Do (todo_*) MCP tools.

Why separate? The interactive agents are designed around one domain. The morning briefing legitimately needs email + calendar + tasks + weather simultaneously — a cross-domain request that would normally fall back to the full 45-tool full_agent. briefing_agent gives it a pre-composed, focused tool set without the overhead of GitHub, news, or browser tools that are irrelevant for briefings.

Skill Registry (app/skills/)

File-drop plugin architecture. Any .py file placed in app/skills/ with a function decorated @skill is auto-discovered and registered as an agent tool on startup via load_skills(agent). Modules starting with _ are ignored. Loading is idempotent — calling load_skills() twice registers nothing new.

Built-in skills: - read_later.py — Content Curator (save_to_read_later, list_read_later, delete_read_later, get_read_later_digest) - travel_briefing.pyget_travel_briefing — on-demand travel summary (weather + maps/transit + LLM synthesis) - cv.pystore_cv, get_cv — parse and persist CV as structured user facts for job application workflows - research.pydeep_research — multi-step web research: generates sub-questions, fetches and synthesises sources in parallel, saves result as a "Research: <topic>" note - meeting_notes.pyget_meeting_insights, list_recent_meetings — extract decisions/actions/key points from Drive transcripts via mini-model; save as notes. Source-agnostic: _SOURCE_REGISTRY maps source name to fetcher, allowing Teams or Notion to be wired in later.

System Prompt Architecture

build_system_prompt() assembles the agent's full context from three categories of sections, always in this order:

graph TD
    subgraph Static["Static Instruction Sections (identical across requests — cache-friendly)"]
        direction LR
        A1[_PERSONA\nVoice · banned openers · cultural context]
        A2[_DECISION_MAKING\nWhen to act vs. confirm · tool priority]
        A3[_CONTEXT_USAGE\nHow to apply the profile silently]
        A4[_MEMORY_INSTRUCTIONS\nFacts store · search priority order]
        A5[_JOURNAL_INSTRUCTIONS\nWhen to call save_journal_entry]
        A6[_TONE_CALIBRATION\nSituational energy matching]
        A7[_OUTPUT_FORMAT\nMobile formatting · brevity rules]
        A8[_ERROR_HANDLING\nRetry-first · plain-English failure summaries]
        A9[_TOOLS_AND_ACCOUNTS\nTool guidance scoped to active domain]
        A10[_TOOL_USAGE_EXAMPLES\nExample flows]
        A11[_APPROVAL_INSTRUCTIONS\nSentinel handling · never show raw token]
        A12[focus_section\nActive domain focus hint if scoped]
    end

    subgraph Dynamic["Dynamic Sections (fetched from storage per request)"]
        direction LR
        B1[facts_section\nuser_facts table — key/value facts\nQuick Reference Facts header]
        B2[learnings_section\nagent_learnings table status=active\nBehavioral Guidelines header]
        B3[context_section\ncontext table — narrative profile\ncapped at 4,800 chars ≈ 1,200 tokens]
    end

    Static --> Prompt[System Prompt\n~3,000–6,000 tokens]
    Dynamic --> Prompt

Key design constraints:

  • Datetime is NOT in the system prompt. It is prepended to the user turn (as [Thursday, April 24, 2026 — 09:15 AM Europe/Paris]) so the stable system prompt sections form a consistent prefix across requests and qualify for Gemini's implicit prompt cache. A changed datetime would bust the cache on every request.
  • Profile is capped at 4,800 chars (~1,200 tokens). The nightly reflection profile grows over time — the cap prevents unbounded system prompt growth while keeping the most recent content intact.
  • Tool sections are scoped. When active_categories is set (i.e. a domain agent is running), only the relevant _DOMAIN_TOOL_SECTIONS entries are included — cutting ~600–1,000 tokens of irrelevant tool guidance from each domain-routed request.
  • Briefing agent uses build_briefing_system_prompt(), which extends the base prompt by appending _MORNING_BRIEFING, _EVENING_RECAP, _WEEKLY_RECAP, _WEEKLY_PREP, and _MEETING_PREP templates. These arrive at the system level so the model's structural understanding of briefing formats is cached, not re-parsed per call.

Personality directives (_PERSONA + _ERROR_HANDLING):

Two sections encode Kwasi's distinctive character:

  • _PERSONA — Direct, warm-but-not-performative voice. Natural Ghanaian cultural familiarity. Light humor where it lands. Banned openers list: "I'm sorry", "Unfortunately", "I apologise", "I wasn't able to", "I'm afraid" — the model must reframe as what it CAN do.
  • _ERROR_HANDLING — Retry-first directive: when a tool fails, attempt at least one alternative (different tool, rephrased query, decomposed sub-task) before reporting failure. When all paths are exhausted: structured ≤3-bullet plain-English summary of what was tried + one concrete next step. No raw error codes or exception names in any user-facing response.

Dashboard (app/routers/dashboard.py)

FastAPI router mounted at /dashboard/. Four views (interactions, audit log, scheduled tasks, memory profile). Protected by DASHBOARD_SECRET — disabled entirely if the env var is not set. Auth via X-Dashboard-Secret header or ?token= query param.

MCP Tools (app/interfaces/mcp/client.py)

Gmail, Outlook email, Outlook calendar, and Microsoft To Do tools are synchronous (Google/MS APIs). They are wrapped with asyncio.to_thread() to avoid blocking the event loop. get_mcp_tools() returns all wrappers; agents.py filters them into _EMAIL_MCP, _CALENDAR_MCP, and _TODO_MCP sets for domain-specific routing.

gmail_read_email_wrapper — fetches the complete body of a Gmail message (up to 20,000 chars). Walks the MIME part tree recursively, prefers text/plain, strips HTML from text/html fallback. The search_emails_wrapper and summary tools only return a short snippet (~100–200 chars) — always follow up with gmail_read_email_wrapper when the full text is needed.

outlook_read_wrapper — fetches the complete body of an Outlook message (up to 20,000 chars). Strips HTML (Graph API returns HTML by default), includes From/Subject/Date fields.

Google Calendar shared calendarsgoogle_get_todays_schedule_wrapper and google_get_calendar_events_wrapper query all calendars the user has access to (primary, shared, subscribed) by calling calendarList().list() first. Each event is prefixed with [Calendar Name] in the output.

Microsoft To Do tools (todo_*) use the same OutlookClient._make_request pattern as Outlook email and calendar. They require Tasks.ReadWrite scope on the Outlook refresh token.

Memory (app/memory/)

  • ports.pyStoragePort protocol + all data models (Interaction, Note, Task, Reminder, UserContext, UserFact, ScheduledTask, AuditEntry, ReadLaterItem, NewsTopic, SeenStory, AlertRule, PendingAction, PendingIntention, AgentLearning, JournalEntry, SemanticSearchResult). The HealthSample Pydantic model lives in app/health/models.py.
  • adapters/sqlite.py — SQLite implementation (local dev, :memory: for tests). Tables: interactions, notes, tasks, reminders, context, user_facts, scheduled_tasks, audit_log, read_later, news_topics, seen_news, alert_rules, pending_actions, pending_intentions, agent_learnings, journal_entries, health_samples.
  • adapters/postgres.py — asyncpg connection pool (Railway production). Same tables as SQLite. health_samples.value is JSONB in Postgres and JSON-encoded TEXT in SQLite.
  • Both adapters embed new records on write via embed_text() (Gemini gemini-embedding-001, fire-and-forget) and implement semantic_search() — Python cosine in SQLite, pgvector <=> in Postgres.
  • logger.pylog_interaction() helper used by all handlers
  • post_conversation.pyextract_facts_from_exchange() (immediate post-message fact extraction) and summarise_session() (30-min session-close summarisation); both fire-and-forget via asyncio.create_task()
  • reflection.pyReflectionService — nightly LLM pass producing four outputs: updated UserContext narrative profile, auto-extracted UserFact records, new PendingIntention records, and AgentLearning corrections

Background Loops

All background loops run as asyncio tasks within the FastAPI lifespan.

Loop Trigger condition Schedule
Reflection REFLECTION_SECRET set 2 AM UTC daily
Reminders Telegram token configured Every 5 minutes
Daily Briefing BRIEFING_CHAT_ID set BRIEFING_TIME UTC daily (default 08:00)
Evening Recap BRIEFING_CHAT_ID set EVENING_RECAP_TIME local (default 21:00)
Weekly Recap BRIEFING_CHAT_ID set WEEKLY_RECAP_DAY/WEEKLY_RECAP_TIME local (default Friday 18:00)
Weekly Prep BRIEFING_CHAT_ID set WEEKLY_PREP_DAY/WEEKLY_PREP_TIME local (default Sunday 18:00)
Read-Later Digest BRIEFING_CHAT_ID set READ_LATER_DIGEST_DAY/READ_LATER_DIGEST_TIME local (default Saturday 09:00)
Journal Digest BRIEFING_CHAT_ID set JOURNAL_DIGEST_DAY/JOURNAL_DIGEST_TIME local (default Sunday 19:00)
Email Intelligence BRIEFING_CHAT_ID + email creds EMAIL_INTEL_TIME local (default 09:30); set "" to disable
User Scheduled Tasks BRIEFING_CHAT_ID set Every 60 seconds (croniter-based evaluation)
Alert Rules + Intentions BRIEFING_CHAT_ID set Every 5 minutes
Meeting Prep BRIEFING_CHAT_ID set Every 5 minutes (fires ≈MEETING_PREP_LEAD_MINUTES before each meeting)
Approval Expiry Telegram bot active Every 5 minutes

Key Design Decisions

Why Pydantic AI?

Pydantic AI handles the agent loop (tool calls → LLM → tool calls → ...), type-safe tool definitions, streaming, and model switching. The Agent class abstracts over different LLM providers — switching from Gemini to Claude is a single env var change.

Why polling instead of webhook for Telegram?

Long-polling (updater.start_polling) is simpler to deploy on Railway — no public URL needed for the bot itself, no TLS certificate management. The WhatsApp webhook still uses a proper HTTP endpoint since the Meta platform requires it.

Why asyncio tasks for background loops?

Avoids the complexity of a separate worker process, Celery, or a job scheduler. Since the app is single-user and the loops are low-frequency (5 min / daily / 60 sec), asyncio tasks inside the FastAPI lifespan are sufficient and operationally simple.

Why MCP for email/calendar?

MCP (Model Context Protocol via fastmcp) provides a clean abstraction for external tool servers. Gmail and Outlook have separate auth flows (OAuth2 with Google vs. MSAL with Microsoft) — MCP isolates this complexity from the agent.

Why native PyGitHub instead of the official GitHub MCP server?

The existing "MCP" tools in this project are local Python wrappers, not external server processes. PyGitHub follows the same pattern as other tools (weather, search, news) — in-process, async-wrapped, consistent output formatting. The official GitHub MCP server is a Go binary designed for IDE integrations, not embedded service use.

Why a file-drop skill registry?

Allows extending the agent's capabilities without touching agent.py. New skills can be developed, tested, and dropped in independently. The registry is idempotent and fail-safe — a broken skill file is skipped with a logged error.

Why keyword-based intent routing instead of LLM-based routing?

The routing step runs on every message, so it must have near-zero latency and zero cost. Keyword matching is deterministic, testable, and trivially debuggable — if it misroutes a message, you can see exactly why by reading the keyword list. An LLM-based router would add a full extra inference step per message, significantly increasing latency and cost for a single-user assistant. The keyword approach covers ~95% of real messages correctly; the remaining 5% fall through to the full agent, which is always the safe fallback.

Why a sentinel token instead of pausing the agent mid-run for approval?

The agent runs to completion — it always produces a text response. Pausing mid-run would require a resumable coroutine, persistent agent state, and a way to re-inject the approval result back into an in-progress LLM call. The sentinel pattern avoids all of this: the agent sees "[APPROVAL_PENDING:uuid]" as a normal tool result, writes a response like "your action is pending approval", and exits. handle_message post-processes the result synchronously. On Confirm, a simple direct executor call (no LLM re-run) performs the action. This keeps the agent loop stateless and the approval flow outside the LLM's critical path.

Why a separate app/approval.py module?

approval.py imports only from app/memory/ports — it has no knowledge of agent.py, bot.py, or client.py. This breaks the potential circular import chain: agent.py imports approval.py, bot.py imports approval.py, and client.py imports approval.py — but none of these files import each other. If the gate lived in agent.py or bot.py, it would create a cycle.

Why domain agents instead of a single agent with all tools?

Reducing the tool list the LLM sees for each request reduces token usage per call and improves model focus — a model with 8 tools performs more reliably than one with 50. Domain agents are built once at import time (zero per-request cost), share the same system prompt function, and fall back transparently to the full agent for anything ambiguous. The full agent is always the safety net.

Why both Logfire and Langfuse?

Different jobs. Logfire owns infrastructure and request-shape tracing — FastAPI routes, background-loop ticks, exceptions, latency by file path. The diagnostics_agent uses it via Logfire's MCP read tool to answer "did the briefing run?" or "which tool is slowest?". Langfuse owns the LLM layer — per-generation token usage and cost, prompt versions, session/user grouping, trace-level quality scores tied to user actions (Confirm/Cancel/Edit). Both processors hang off the same OTEL tracer provider in app/observability.py, so Pydantic AI is instrumented once and spans flow to both backends without duplication. Either side can be disabled (skip the env vars) and the other keeps working.

Why a push model for health data instead of a server-side polling loop?

Neither Samsung Health nor Google Health Connect exposes a cloud / server-side REST API — both store data on-device, encrypted, behind explicit user consent. There is no OAuth Kwasi could call from FastAPI to pull samples. Two options remained: (a) integrate a paid third-party aggregator (Terra/Rook/Thryve, ~$400/mo minimum) which itself still requires an Android piece, or (b) write a tiny sideloaded Android bridge (bridge-android/) that reads Health Connect locally and POSTs to /health/ingest. The bridge is one Kotlin Gradle project, ~600 LOC, and never goes through the Play Store — so we sidestep Google's sensitive-permissions review entirely. Per-metric watermarks in DataStore prevent re-sends and gaps across reboots; the server is idempotent on (metric_type, start_time, source_device) so retries are safe. Result: zero ongoing cost, full data ownership, and the rest of Kwasi's read path is identical to any other agent tool.

Keyword (LIKE / ILIKE) search is fast and good enough for exact recall. Semantic search complements it — it finds content by meaning when the user's words don't match the stored text (e.g. "anything about burnout" finding a note titled "work-life balance thoughts"). Rather than replacing keyword search, the agent uses both: keyword first (zero latency, no API call), semantic as a fallback or in parallel via find_everything. Embeddings are stored at write time so queries are instant. The embed_text call at write time costs one Gemini API call per saved record — negligible for a personal assistant.