Message Lifecycle¶
How a message travels from Telegram to a response and back. These flows are Telegram-specific — WhatsApp follows the same logical path (intent routing → agent → log) but via a webhook handler (app/interfaces/whatsapp/webhook.py) without streaming or TTS. Each Telegram message type has a slightly different path.
Text Message (Primary Flow)¶
Text messages use streaming — you see the response being typed in real time.
sequenceDiagram
participant You
participant TG as Telegram
participant H as handle_message
participant R as Intent Router
participant S as Storage
participant A as Agent (domain or full)
participant LLM as LLM
participant T as Tools
participant GW as approval_gate
You->>TG: Send text message
TG->>H: Update event (long-polling)
H->>H: Check allowlist (ALLOWED_TELEGRAM_USER_IDS)
H->>H: Check edit state (user_data["pending_edit_action_id"])
H->>R: classify_intent(text)
R-->>H: categories e.g. {"email", "utility"}
H->>R: select_agent(categories)
R-->>H: email_agent (or full_agent for ambiguous)
H->>H: request_deps = _deps_replace(deps, active_categories, chat_id, pending_action_user_message=text)
H->>S: fetch_message_history(user_id, message, settings)
S-->>H: chronological 10 OR (3 recent + 3 semantic) if ENABLE_SEMANTIC_HISTORY → build_message_history (drops oldest if >6,000 est. tokens)
H->>TG: send_chat_action("typing")
Note over H,S: Context injection (shared 1,000-token budget)
H->>S: find_relevant_notes (≥0.6 similarity + recency boost)
H->>S: find_relevant_summaries (≥0.6 similarity + recency boost, if budget remains)
H->>S: find_relevant_read_later (tag overlap, if budget remains)
H->>H: Wrap blocks in XML tags + prepend datetime → enriched_text
H->>TG: Send "…" placeholder message
H->>A: agent.run_stream(enriched_text, deps, message_history)
A->>S: get_context("global")
S-->>A: UserContext (reflection profile or empty)
A->>A: build_system_prompt() — injects active_categories focus hint
A->>LLM: system_prompt + message_history + user_message
loop 0 or more tool calls
LLM->>T: tool_name(args)
alt Gated tool (create_task, send_email, etc.)
T->>GW: approval_gate(tool_name, payload, deps)
GW->>S: save_pending_action(PendingAction)
GW-->>T: "[APPROVAL_PENDING:<uuid>]" sentinel
else Read-only tool
T-->>LLM: result
end
T-->>LLM: result (sentinel or direct)
end
LLM-->>A: response stream ("action is pending your approval")
A-->>H: text chunks (delta=True)
loop Every 1.5 seconds
H->>TG: Edit placeholder with current buffer
end
H->>TG: Final edit with complete response
Note over H,TG: Splits at newline if >4096 chars
H->>H: Scan tool calls for sentinels
opt Sentinel(s) found
H->>S: get_pending_action(uuid)
H->>TG: Edit sent_msg with preview + [✅ Confirm] [❌ Cancel] [✏️ Edit]
H->>S: update_pending_action(message_id=sent_msg.id)
end
H->>S: log_interaction(user_msg, response, channel="telegram")
H->>S: _write_audit_entries() — sanitised tool call records
Notes on the streaming flow and approval gate¶
- Intent routing runs first —
classify_intent()keyword-matches the message in microseconds, thenselect_agent()picks the appropriate domain agent (email, calendar, memory, github, news, utility, meetings, etc.) or falls back to the full agent for ambiguous/conversational messages active_categoriesis set onAgentDepsbeforerun_stream—build_system_promptinjects a focus hint telling the model which tool categories to prefer and only includes tool guidance for those domains- Context injection — before starting the stream, the handler runs three retrieval layers sharing a
CONTEXT_TOKEN_BUDGET = 1000token budget: (1)find_relevant_notes(≥0.6 similarity), (2)find_relevant_summaries(≥0.6), (3)find_relevant_read_later(tag overlap). Each layer's output is wrapped in XML tags (<context type="notes">,<context type="summaries">,<context type="read_later">) and prepended to the user message so the model can distinguish retrieved memory from instructions. - Datetime in user turn —
[Thursday, April 24, 2026 — 09:15 AM Europe/Paris]is prepended to the enriched message (not the system prompt) to keep the stable system prompt prefix identical across requests for Gemini implicit cache hits. - The placeholder
"…"message is edited in-place rather than sending a new message — this prevents the chat from jumping around while you wait - Edits are throttled to every 1.5 seconds to stay within Telegram's rate limits
- If the final response exceeds 4096 characters, it's split at the last newline before the limit and sent as sequential messages
- Approval gate — when the agent calls a consequential tool (create task, send email, etc.),
approval_gate()saves aPendingActionto the DB and returns a sentinel token instead of executing. After the stream ends,handle_messagescans for sentinels and edits the sent message to add the inline keyboard. The agent is instructed via_APPROVAL_INSTRUCTIONSto tell the user "action pending approval" when it sees a sentinel — never to show the raw token. - Edit state — at the very start of
handle_message, before intent routing, the handler checkscontext.user_dataforpending_edit_action_id. If set, the message is treated as a correction and routed to_handle_edit_correction()instead of the normal flow. - Token logging — each
telegram/messagespan recordsest_tokens_history,est_tokens_context,est_tokens_user_turn,context_layers, andhistory_interactionsas Logfire span attributes for per-request token observability.
Multi-step Plan Flow (Spec 008)¶
Before a Telegram text message hits the agent, a planning gate decides whether it should be split into ordered steps. Most messages are simple ("what's on my calendar?") and skip planning entirely. Complex messages ("check email, then post a summary in #standup, then add a follow-up task") get decomposed, previewed for approval, and executed step-by-step with live progress.
sequenceDiagram
participant You
participant TG as Telegram
participant H as handle_message
participant CC as classify_complexity
participant GP as generate_plan
participant S as Storage
participant EX as execute_plan
participant R as Intent Router (per step)
participant A as Domain Agent
You->>TG: "Check email, then post a summary in #standup, then add a task"
TG->>H: Update event
Note over H: Standard pre-checks (allowlist, edit-state, history, context injection)
H->>CC: classify_complexity(text)
CC-->>H: True (regex matched "then" connectives, len ≥ 30)
H->>GP: generate_plan(text, deps)
GP->>GP: _planner_agent.run(text)\n→ ExecutionPlan(goal, steps[], needs_planning)
alt needs_planning=False or <2 steps
GP-->>H: None (fall through to normal flow)
Note over H,A: Continue to standard text-message flow
else 2+ steps
GP-->>H: ExecutionPlan
H->>H: format_plan_preview(plan) → MarkdownV1 message
H->>S: save PendingAction(action_type="plan", payload=plan.json, trace_id)
H->>TG: Send preview + [✅ Confirm] [❌ Cancel] [✏️ Edit]
Note over You,TG: Wait for user
You->>TG: Tap ✅ Confirm
TG->>H: CallbackQuery (handle_callback_query loads plan)
H->>EX: execute_plan(plan, deps, send_progress)
loop For each step in plan.steps
EX->>TG: Edit progress message\n(✓ done · ▶ current · _ pending)
EX->>R: classify_intent(step_message + scratchpad)
R-->>EX: domain agent
EX->>A: agent.run(step_message_with_scratchpad)
alt Step succeeds
A-->>EX: output text
EX->>EX: scratchpad.append(step output, truncated 300 chars)
else Step throws
A-->>EX: Exception
EX->>S: save PendingAction(action_type="plan_resume",\npayload=PlanResumePayload(plan, remaining_steps, scratchpad,\nfailed_step, failure_reason))
EX->>TG: Send "Step N failed: {reason}\nContinue or abort?"\n+ inline keyboard
EX-->>H: partial summary (return)
end
end
EX->>TG: Final message — per-step outputs concatenated under "*Done!* {goal}"
H->>S: log_interaction
end
Notes on the plan flow¶
- Pre-filter is cheap.
classify_complexityis a single regex match on connectives (and then,also,after that) with a 30-char minimum — single-action messages cost zero LLM tokens for planning. The planner agent itself is tool-less; per-step execution uses the existing router + domain agents. - Scratchpad threading. Step N+1 receives a prepended block
"Context from previous steps:\n- Step N (description): output[:300]"so later steps build on earlier results without re-fetching. - The plan is always gated, even when none of its constituent tool calls would normally be gated.
action_type="plan"andtool_name="__plan__"distinguish plan actions from regular tool approvals; plan-resume actions use"plan_resume"/"__plan_resume__". - Resume preserves work. Confirm restarts
execute_planwithsteps_override=remaining_stepsandinitial_scratchpad=payload.scratchpad— completed steps are not re-run. Trace scoring carries through to plan and plan_resume actions viaPendingAction.trace_id(Spec 009).
Inline Approval (Button Callbacks)¶
When the user taps a button under a pending action preview, Telegram fires a CallbackQuery event handled by handle_callback_query.
sequenceDiagram
participant You
participant TG as Telegram
participant CB as handle_callback_query
participant S as Storage
participant EX as executor (ACTION_REGISTRY)
You->>TG: Tap ✅ Confirm / ❌ Cancel / ✏️ Edit
TG->>CB: CallbackQuery (data="approve:{verb}:{uuid}")
CB->>CB: query.answer() — dismiss Telegram loading spinner
CB->>S: get_pending_action(uuid)
CB->>CB: Guard: status == "pending" and now < expires_at
alt Confirm
CB->>EX: execute_approved_action(tool_name, payload, deps)
EX-->>CB: result text
CB->>S: update_pending_action(status="confirmed")
CB->>TG: edit_message_text("✅ Done!\n{result}")
CB->>S: log_audit_entry
else Cancel
CB->>S: update_pending_action(status="cancelled")
CB->>TG: edit_message_text("❌ Cancelled.")
else Edit
CB->>S: user_data["pending_edit_action_id"] = uuid
CB->>TG: edit_message_text("What would you like to change?")
Note over You,TG: Next message from user → _handle_edit_correction()
else Already resolved or expired
CB->>TG: edit_message_text("This action was already {status}.")
end
_handle_edit_correction() — called when the user sends a correction after tapping Edit:
- Loads the original
PendingAction, verifies it's still pending and unexpired - Marks the old action
cancelled, incrementsedit_count - Re-runs the agent with a
[REVISION]prompt:"Original: {preview_text}\nCorrection: {user_text}\nPlease redo with this change." - The agent produces a new sentinel → new preview with a fresh keyboard
- Old
message_idis no longer relevant (previous message was edited to show "Cancelled")
Non-Telegram interfaces (CLI, WhatsApp): approval_gate() detects interface != "telegram" and executes the action immediately without saving a PendingAction. The approval UI is Telegram-only.
Voice Message¶
Voice goes through an extra STT step before hitting the agent. No streaming — the full transcript is needed first.
sequenceDiagram
participant You
participant TG as Telegram
participant H as handle_voice_message
participant G as Gemini STT
participant S as Storage
participant A as Agent
You->>TG: Send voice note
TG->>H: Update event (voice/audio)
H->>H: Check allowlist
H->>TG: Download audio bytes
TG-->>H: audio_bytes
H->>G: transcribe_audio(audio_bytes, model_id)
G-->>H: transcript text
H->>S: get_interactions_by_user(limit=5)
S-->>H: message_history
H->>A: agent.run(transcript, deps, message_history)
A-->>H: response_text
alt response ≤ 500 words
H->>H: synthesize_speech(response_text) via edge-tts
H->>TG: send_voice(audio_bytes)
else response > 500 words
H->>TG: send_message(response_text)
end
H->>S: log_interaction(transcript, response)
The model ID for Gemini STT is derived from MODEL_NAME by stripping the provider prefix (e.g. google-gla:gemini-2.5-flash → gemini-2.5-flash). TTS uses edge-tts with the voice configured via TTS_VOICE (default en-GB-RyanNeural). WhatsApp voice messages receive a text reply only — no TTS.
Photo Message¶
Photos are analyzed by Gemini Vision. The analysis result becomes the user message fed to the agent.
sequenceDiagram
participant You
participant TG as Telegram
participant H as handle_photo_message
participant GV as Gemini Vision
participant A as Agent
participant S as Storage
You->>TG: Send photo (+ optional caption)
TG->>H: Update event (photo)
H->>H: Check allowlist
H->>TG: Download highest-resolution photo
TG-->>H: image_bytes
H->>GV: analyze_image(image_bytes, "image/jpeg", caption)
GV-->>H: image analysis text
H->>H: Build user_message:\n"[Image shared]\nQuestion: {caption}\nAnalysis: {analysis}"
H->>S: get_interactions_by_user(limit=5)
S-->>H: message_history
H->>A: agent.run(user_message, deps, message_history)
A-->>H: response_text
H->>TG: Reply with response
H->>S: log_interaction(user_message, response)
Document Message (PDF / Text)¶
PDFs go through Gemini Vision. Text files are decoded directly and passed to the agent as-is.
sequenceDiagram
participant You
participant TG as Telegram
participant H as handle_document_message
participant GV as Gemini Vision
participant A as Agent
participant S as Storage
You->>TG: Send document (+ optional caption)
TG->>H: Update event (document)
H->>H: Check allowlist
H->>H: Check mime_type (PDF / text / other)
alt Unsupported type
H->>TG: "I can analyze PDFs and text files only"
else Text file (text/*)
H->>TG: Download file bytes
TG-->>H: file_bytes
H->>H: Decode UTF-8, truncate at 50k chars
H->>A: agent.run("[Document: name]\nContents:\n{text}", ...)
else PDF (application/pdf)
H->>TG: Download file bytes
TG-->>H: file_bytes
H->>GV: analyze_image(file_bytes, "application/pdf", caption)
GV-->>H: document analysis
H->>A: agent.run("[PDF: name]\nAnalysis:\n{analysis}", ...)
end
A-->>H: response_text
H->>TG: Reply with response
H->>S: log_interaction(user_message, response)
Slash Commands¶
Slash commands (/notes, /tasks, /reminders) bypass the agent entirely and query storage directly. They're fast, deterministic, and don't consume LLM tokens.
sequenceDiagram
participant You
participant TG as Telegram
participant H as Command Handler
participant S as Storage
You->>TG: /tasks
TG->>H: CommandHandler("tasks")
H->>H: Check allowlist
H->>S: list_tasks(status="todo")
S-->>H: list[Task]
H->>TG: Formatted task list (Markdown)
| Command | Storage call | Returns |
|---|---|---|
/notes |
get_notes() |
5 most recent notes |
/tasks |
list_tasks(status="todo") |
All pending tasks |
/reminders |
list_reminders() |
All pending reminders |
/readlater |
delegates to handle_message |
Runs list_read_later via agent |
/briefing |
delegates to handle_message |
Runs the morning briefing prompt on demand |
/start, /help |
none | Static help text |
Voice Reply Trigger (Text Messages)¶
Text messages that contain specific phrases cause Kwasi to reply with a voice note instead of text, provided the response is ≤500 words. This works even though the input is typed text, not a voice message.
Trigger phrases (case-insensitive, word-boundary matched):
| Phrase pattern | Examples |
|---|---|
tell me |
"tell me the weather today" |
read me / read that / read this / read it (out) |
"read me that article", "read it out" |
say that / say it |
"say that again" |
speak (to me) |
"speak to me", "speak" |
flowchart TD
A[Text message arrives] --> B{_VOICE_TRIGGER_RE\nmatches phrase?}
B -- No --> C[Normal text reply\n_to_telegram_md + edit flow]
B -- Yes --> D{Response ≤ 500 words\nAND TTS_VOICE set?}
D -- No --> C
D -- Yes --> E[synthesize_speech via edge-tts]
E --> F{Audio generated?}
F -- No, exception --> C
F -- Yes --> G[Delete placeholder message]
G --> H[reply_voice with audio_bytes]
If TTS synthesis fails for any reason, it falls back silently to the normal text response. The voice reply trigger only applies to Telegram text messages — voice input, WhatsApp, and CLI are unaffected.
External API Message (POST /message)¶
For Android HTTP Shortcuts and other external clients. Requires API_TOKEN and BRIEFING_CHAT_ID.
flowchart TD
A[POST /message] --> B{X-API-Token\nvalid?}
B -- No --> B1[401 Unauthorized]
B -- Yes --> C{Body type?}
C -- multipart/form-data --> D[Read text field\nRead file field if present]
C -- application/json --> E[Parse text\nDecode image_b64 if present]
D --> F{Image provided?}
E --> F
F -- Yes --> G[analyze_image via Gemini Vision\nResult prepended to user_message]
F -- No --> H[user_message = text]
G --> H
H --> I[Load last 10 interactions\nusing briefing_chat_id as user_id]
I --> J[classify_intent → select_agent]
J --> K[Inject read-later context]
K --> L[agent.run with message_history]
L --> M[log_interaction\nuser_id = briefing_chat_id]
M --> N[Send response to Telegram\nsplit at 4096 chars]
N --> O[Return JSON\n{response, status: ok}]
Key design decisions:
- user_id is set to BRIEFING_CHAT_ID — the same ID used by the Telegram bot. This means Android shares and Telegram messages share the same conversation history and context window.
- Images are analysed via Gemini Vision before being passed to the agent — the agent sees the analysis text, not raw bytes.
- The response is always delivered to Telegram. The JSON response body also contains the full response text for use in HTTP Shortcuts toasts or Tasker variables.
- The approval gate is bypassed for consequential actions — interface is set to "api", which is not "telegram", so approval-gated tools execute immediately (same as CLI and briefing contexts).