Skip to content

Message Lifecycle

How a message travels from Telegram to a response and back. These flows are Telegram-specific — WhatsApp follows the same logical path (intent routing → agent → log) but via a webhook handler (app/interfaces/whatsapp/webhook.py) without streaming or TTS. Each Telegram message type has a slightly different path.


Text Message (Primary Flow)

Text messages use streaming — you see the response being typed in real time.

sequenceDiagram
    participant You
    participant TG as Telegram
    participant H as handle_message
    participant R as Intent Router
    participant S as Storage
    participant A as Agent (domain or full)
    participant LLM as LLM
    participant T as Tools
    participant GW as approval_gate

    You->>TG: Send text message
    TG->>H: Update event (long-polling)
    H->>H: Check allowlist (ALLOWED_TELEGRAM_USER_IDS)
    H->>H: Check edit state (user_data["pending_edit_action_id"])
    H->>R: classify_intent(text)
    R-->>H: categories e.g. {"email", "utility"}
    H->>R: select_agent(categories)
    R-->>H: email_agent (or full_agent for ambiguous)
    H->>H: request_deps = _deps_replace(deps, active_categories, chat_id, pending_action_user_message=text)
    H->>S: fetch_message_history(user_id, message, settings)
    S-->>H: chronological 10 OR (3 recent + 3 semantic) if ENABLE_SEMANTIC_HISTORY → build_message_history (drops oldest if >6,000 est. tokens)
    H->>TG: send_chat_action("typing")

    Note over H,S: Context injection (shared 1,000-token budget)
    H->>S: find_relevant_notes (≥0.6 similarity + recency boost)
    H->>S: find_relevant_summaries (≥0.6 similarity + recency boost, if budget remains)
    H->>S: find_relevant_read_later (tag overlap, if budget remains)
    H->>H: Wrap blocks in XML tags + prepend datetime → enriched_text
    H->>TG: Send "…" placeholder message

    H->>A: agent.run_stream(enriched_text, deps, message_history)
    A->>S: get_context("global")
    S-->>A: UserContext (reflection profile or empty)
    A->>A: build_system_prompt() — injects active_categories focus hint
    A->>LLM: system_prompt + message_history + user_message

    loop 0 or more tool calls
        LLM->>T: tool_name(args)
        alt Gated tool (create_task, send_email, etc.)
            T->>GW: approval_gate(tool_name, payload, deps)
            GW->>S: save_pending_action(PendingAction)
            GW-->>T: "[APPROVAL_PENDING:<uuid>]" sentinel
        else Read-only tool
            T-->>LLM: result
        end
        T-->>LLM: result (sentinel or direct)
    end

    LLM-->>A: response stream ("action is pending your approval")
    A-->>H: text chunks (delta=True)

    loop Every 1.5 seconds
        H->>TG: Edit placeholder with current buffer
    end

    H->>TG: Final edit with complete response
    Note over H,TG: Splits at newline if >4096 chars

    H->>H: Scan tool calls for sentinels
    opt Sentinel(s) found
        H->>S: get_pending_action(uuid)
        H->>TG: Edit sent_msg with preview + [✅ Confirm] [❌ Cancel] [✏️ Edit]
        H->>S: update_pending_action(message_id=sent_msg.id)
    end

    H->>S: log_interaction(user_msg, response, channel="telegram")
    H->>S: _write_audit_entries() — sanitised tool call records

Notes on the streaming flow and approval gate

  • Intent routing runs firstclassify_intent() keyword-matches the message in microseconds, then select_agent() picks the appropriate domain agent (email, calendar, memory, github, news, utility, meetings, etc.) or falls back to the full agent for ambiguous/conversational messages
  • active_categories is set on AgentDeps before run_streambuild_system_prompt injects a focus hint telling the model which tool categories to prefer and only includes tool guidance for those domains
  • Context injection — before starting the stream, the handler runs three retrieval layers sharing a CONTEXT_TOKEN_BUDGET = 1000 token budget: (1) find_relevant_notes (≥0.6 similarity), (2) find_relevant_summaries (≥0.6), (3) find_relevant_read_later (tag overlap). Each layer's output is wrapped in XML tags (<context type="notes">, <context type="summaries">, <context type="read_later">) and prepended to the user message so the model can distinguish retrieved memory from instructions.
  • Datetime in user turn[Thursday, April 24, 2026 — 09:15 AM Europe/Paris] is prepended to the enriched message (not the system prompt) to keep the stable system prompt prefix identical across requests for Gemini implicit cache hits.
  • The placeholder "…" message is edited in-place rather than sending a new message — this prevents the chat from jumping around while you wait
  • Edits are throttled to every 1.5 seconds to stay within Telegram's rate limits
  • If the final response exceeds 4096 characters, it's split at the last newline before the limit and sent as sequential messages
  • Approval gate — when the agent calls a consequential tool (create task, send email, etc.), approval_gate() saves a PendingAction to the DB and returns a sentinel token instead of executing. After the stream ends, handle_message scans for sentinels and edits the sent message to add the inline keyboard. The agent is instructed via _APPROVAL_INSTRUCTIONS to tell the user "action pending approval" when it sees a sentinel — never to show the raw token.
  • Edit state — at the very start of handle_message, before intent routing, the handler checks context.user_data for pending_edit_action_id. If set, the message is treated as a correction and routed to _handle_edit_correction() instead of the normal flow.
  • Token logging — each telegram/message span records est_tokens_history, est_tokens_context, est_tokens_user_turn, context_layers, and history_interactions as Logfire span attributes for per-request token observability.

Multi-step Plan Flow (Spec 008)

Before a Telegram text message hits the agent, a planning gate decides whether it should be split into ordered steps. Most messages are simple ("what's on my calendar?") and skip planning entirely. Complex messages ("check email, then post a summary in #standup, then add a follow-up task") get decomposed, previewed for approval, and executed step-by-step with live progress.

sequenceDiagram
    participant You
    participant TG as Telegram
    participant H as handle_message
    participant CC as classify_complexity
    participant GP as generate_plan
    participant S as Storage
    participant EX as execute_plan
    participant R as Intent Router (per step)
    participant A as Domain Agent

    You->>TG: "Check email, then post a summary in #standup, then add a task"
    TG->>H: Update event
    Note over H: Standard pre-checks (allowlist, edit-state, history, context injection)
    H->>CC: classify_complexity(text)
    CC-->>H: True (regex matched "then" connectives, len ≥ 30)

    H->>GP: generate_plan(text, deps)
    GP->>GP: _planner_agent.run(text)\n→ ExecutionPlan(goal, steps[], needs_planning)
    alt needs_planning=False or <2 steps
        GP-->>H: None (fall through to normal flow)
        Note over H,A: Continue to standard text-message flow
    else 2+ steps
        GP-->>H: ExecutionPlan
        H->>H: format_plan_preview(plan) → MarkdownV1 message
        H->>S: save PendingAction(action_type="plan", payload=plan.json, trace_id)
        H->>TG: Send preview + [✅ Confirm] [❌ Cancel] [✏️ Edit]
        Note over You,TG: Wait for user

        You->>TG: Tap ✅ Confirm
        TG->>H: CallbackQuery (handle_callback_query loads plan)
        H->>EX: execute_plan(plan, deps, send_progress)

        loop For each step in plan.steps
            EX->>TG: Edit progress message\n(✓ done · ▶ current · _ pending)
            EX->>R: classify_intent(step_message + scratchpad)
            R-->>EX: domain agent
            EX->>A: agent.run(step_message_with_scratchpad)
            alt Step succeeds
                A-->>EX: output text
                EX->>EX: scratchpad.append(step output, truncated 300 chars)
            else Step throws
                A-->>EX: Exception
                EX->>S: save PendingAction(action_type="plan_resume",\npayload=PlanResumePayload(plan, remaining_steps, scratchpad,\nfailed_step, failure_reason))
                EX->>TG: Send "Step N failed: {reason}\nContinue or abort?"\n+ inline keyboard
                EX-->>H: partial summary (return)
            end
        end

        EX->>TG: Final message — per-step outputs concatenated under "*Done!* {goal}"
        H->>S: log_interaction
    end

Notes on the plan flow

  • Pre-filter is cheap. classify_complexity is a single regex match on connectives (and then, also, after that) with a 30-char minimum — single-action messages cost zero LLM tokens for planning. The planner agent itself is tool-less; per-step execution uses the existing router + domain agents.
  • Scratchpad threading. Step N+1 receives a prepended block "Context from previous steps:\n- Step N (description): output[:300]" so later steps build on earlier results without re-fetching.
  • The plan is always gated, even when none of its constituent tool calls would normally be gated. action_type="plan" and tool_name="__plan__" distinguish plan actions from regular tool approvals; plan-resume actions use "plan_resume" / "__plan_resume__".
  • Resume preserves work. Confirm restarts execute_plan with steps_override=remaining_steps and initial_scratchpad=payload.scratchpad — completed steps are not re-run. Trace scoring carries through to plan and plan_resume actions via PendingAction.trace_id (Spec 009).

Inline Approval (Button Callbacks)

When the user taps a button under a pending action preview, Telegram fires a CallbackQuery event handled by handle_callback_query.

sequenceDiagram
    participant You
    participant TG as Telegram
    participant CB as handle_callback_query
    participant S as Storage
    participant EX as executor (ACTION_REGISTRY)

    You->>TG: Tap ✅ Confirm / ❌ Cancel / ✏️ Edit
    TG->>CB: CallbackQuery (data="approve:{verb}:{uuid}")
    CB->>CB: query.answer() — dismiss Telegram loading spinner
    CB->>S: get_pending_action(uuid)
    CB->>CB: Guard: status == "pending" and now < expires_at

    alt Confirm
        CB->>EX: execute_approved_action(tool_name, payload, deps)
        EX-->>CB: result text
        CB->>S: update_pending_action(status="confirmed")
        CB->>TG: edit_message_text("✅ Done!\n{result}")
        CB->>S: log_audit_entry
    else Cancel
        CB->>S: update_pending_action(status="cancelled")
        CB->>TG: edit_message_text("❌ Cancelled.")
    else Edit
        CB->>S: user_data["pending_edit_action_id"] = uuid
        CB->>TG: edit_message_text("What would you like to change?")
        Note over You,TG: Next message from user → _handle_edit_correction()
    else Already resolved or expired
        CB->>TG: edit_message_text("This action was already {status}.")
    end

_handle_edit_correction() — called when the user sends a correction after tapping Edit:

  1. Loads the original PendingAction, verifies it's still pending and unexpired
  2. Marks the old action cancelled, increments edit_count
  3. Re-runs the agent with a [REVISION] prompt: "Original: {preview_text}\nCorrection: {user_text}\nPlease redo with this change."
  4. The agent produces a new sentinel → new preview with a fresh keyboard
  5. Old message_id is no longer relevant (previous message was edited to show "Cancelled")

Non-Telegram interfaces (CLI, WhatsApp): approval_gate() detects interface != "telegram" and executes the action immediately without saving a PendingAction. The approval UI is Telegram-only.


Voice Message

Voice goes through an extra STT step before hitting the agent. No streaming — the full transcript is needed first.

sequenceDiagram
    participant You
    participant TG as Telegram
    participant H as handle_voice_message
    participant G as Gemini STT
    participant S as Storage
    participant A as Agent

    You->>TG: Send voice note
    TG->>H: Update event (voice/audio)
    H->>H: Check allowlist
    H->>TG: Download audio bytes
    TG-->>H: audio_bytes
    H->>G: transcribe_audio(audio_bytes, model_id)
    G-->>H: transcript text

    H->>S: get_interactions_by_user(limit=5)
    S-->>H: message_history
    H->>A: agent.run(transcript, deps, message_history)
    A-->>H: response_text

    alt response ≤ 500 words
        H->>H: synthesize_speech(response_text) via edge-tts
        H->>TG: send_voice(audio_bytes)
    else response > 500 words
        H->>TG: send_message(response_text)
    end
    H->>S: log_interaction(transcript, response)

The model ID for Gemini STT is derived from MODEL_NAME by stripping the provider prefix (e.g. google-gla:gemini-2.5-flashgemini-2.5-flash). TTS uses edge-tts with the voice configured via TTS_VOICE (default en-GB-RyanNeural). WhatsApp voice messages receive a text reply only — no TTS.


Photo Message

Photos are analyzed by Gemini Vision. The analysis result becomes the user message fed to the agent.

sequenceDiagram
    participant You
    participant TG as Telegram
    participant H as handle_photo_message
    participant GV as Gemini Vision
    participant A as Agent
    participant S as Storage

    You->>TG: Send photo (+ optional caption)
    TG->>H: Update event (photo)
    H->>H: Check allowlist
    H->>TG: Download highest-resolution photo
    TG-->>H: image_bytes
    H->>GV: analyze_image(image_bytes, "image/jpeg", caption)
    GV-->>H: image analysis text

    H->>H: Build user_message:\n"[Image shared]\nQuestion: {caption}\nAnalysis: {analysis}"
    H->>S: get_interactions_by_user(limit=5)
    S-->>H: message_history
    H->>A: agent.run(user_message, deps, message_history)
    A-->>H: response_text
    H->>TG: Reply with response
    H->>S: log_interaction(user_message, response)

Document Message (PDF / Text)

PDFs go through Gemini Vision. Text files are decoded directly and passed to the agent as-is.

sequenceDiagram
    participant You
    participant TG as Telegram
    participant H as handle_document_message
    participant GV as Gemini Vision
    participant A as Agent
    participant S as Storage

    You->>TG: Send document (+ optional caption)
    TG->>H: Update event (document)
    H->>H: Check allowlist
    H->>H: Check mime_type (PDF / text / other)

    alt Unsupported type
        H->>TG: "I can analyze PDFs and text files only"
    else Text file (text/*)
        H->>TG: Download file bytes
        TG-->>H: file_bytes
        H->>H: Decode UTF-8, truncate at 50k chars
        H->>A: agent.run("[Document: name]\nContents:\n{text}", ...)
    else PDF (application/pdf)
        H->>TG: Download file bytes
        TG-->>H: file_bytes
        H->>GV: analyze_image(file_bytes, "application/pdf", caption)
        GV-->>H: document analysis
        H->>A: agent.run("[PDF: name]\nAnalysis:\n{analysis}", ...)
    end

    A-->>H: response_text
    H->>TG: Reply with response
    H->>S: log_interaction(user_message, response)

Slash Commands

Slash commands (/notes, /tasks, /reminders) bypass the agent entirely and query storage directly. They're fast, deterministic, and don't consume LLM tokens.

sequenceDiagram
    participant You
    participant TG as Telegram
    participant H as Command Handler
    participant S as Storage

    You->>TG: /tasks
    TG->>H: CommandHandler("tasks")
    H->>H: Check allowlist
    H->>S: list_tasks(status="todo")
    S-->>H: list[Task]
    H->>TG: Formatted task list (Markdown)
Command Storage call Returns
/notes get_notes() 5 most recent notes
/tasks list_tasks(status="todo") All pending tasks
/reminders list_reminders() All pending reminders
/readlater delegates to handle_message Runs list_read_later via agent
/briefing delegates to handle_message Runs the morning briefing prompt on demand
/start, /help none Static help text

Voice Reply Trigger (Text Messages)

Text messages that contain specific phrases cause Kwasi to reply with a voice note instead of text, provided the response is ≤500 words. This works even though the input is typed text, not a voice message.

Trigger phrases (case-insensitive, word-boundary matched):

Phrase pattern Examples
tell me "tell me the weather today"
read me / read that / read this / read it (out) "read me that article", "read it out"
say that / say it "say that again"
speak (to me) "speak to me", "speak"
flowchart TD
    A[Text message arrives] --> B{_VOICE_TRIGGER_RE\nmatches phrase?}
    B -- No --> C[Normal text reply\n_to_telegram_md + edit flow]
    B -- Yes --> D{Response ≤ 500 words\nAND TTS_VOICE set?}
    D -- No --> C
    D -- Yes --> E[synthesize_speech via edge-tts]
    E --> F{Audio generated?}
    F -- No, exception --> C
    F -- Yes --> G[Delete placeholder message]
    G --> H[reply_voice with audio_bytes]

If TTS synthesis fails for any reason, it falls back silently to the normal text response. The voice reply trigger only applies to Telegram text messages — voice input, WhatsApp, and CLI are unaffected.


External API Message (POST /message)

For Android HTTP Shortcuts and other external clients. Requires API_TOKEN and BRIEFING_CHAT_ID.

flowchart TD
    A[POST /message] --> B{X-API-Token\nvalid?}
    B -- No --> B1[401 Unauthorized]
    B -- Yes --> C{Body type?}
    C -- multipart/form-data --> D[Read text field\nRead file field if present]
    C -- application/json --> E[Parse text\nDecode image_b64 if present]
    D --> F{Image provided?}
    E --> F
    F -- Yes --> G[analyze_image via Gemini Vision\nResult prepended to user_message]
    F -- No --> H[user_message = text]
    G --> H
    H --> I[Load last 10 interactions\nusing briefing_chat_id as user_id]
    I --> J[classify_intent → select_agent]
    J --> K[Inject read-later context]
    K --> L[agent.run with message_history]
    L --> M[log_interaction\nuser_id = briefing_chat_id]
    M --> N[Send response to Telegram\nsplit at 4096 chars]
    N --> O[Return JSON\n{response, status: ok}]

Key design decisions: - user_id is set to BRIEFING_CHAT_ID — the same ID used by the Telegram bot. This means Android shares and Telegram messages share the same conversation history and context window. - Images are analysed via Gemini Vision before being passed to the agent — the agent sees the analysis text, not raw bytes. - The response is always delivered to Telegram. The JSON response body also contains the full response text for use in HTTP Shortcuts toasts or Tasker variables. - The approval gate is bypassed for consequential actions — interface is set to "api", which is not "telegram", so approval-gated tools execute immediately (same as CLI and briefing contexts).