Skip to main content

SSE Streaming

Server-Sent Events (SSE) implementation for real-time Agent responses

Overview

Viben Gateway uses Server-Sent Events (SSE) to enable real-time communication between the Agent and frontend, supporting text streaming, tool calls, plan approval, and interactive questions.

Protocol Format

SSE Message Format

data: {"type":"session","sessionId":"12345"}\n\n
data: {"type":"text","content":"Hello"}\n\n
data: {"type":"tool_use","id":"tool_1","name":"Read","input":{"file_path":"/a.txt"}}\n\n
data: {"type":"done"}\n\n

Response Headers

Content-Type: text/event-stream
Cache-Control: no-cache, no-transform
Connection: keep-alive
X-Accel-Buffering: no

Message Types

1. session - Session Created

Sent immediately when a request starts.

interface SessionMessage {
type: "session";
sessionId: string;
}

2. text - Text Content

Sent when the Agent generates text responses. May be sent multiple times; the frontend needs to accumulate.

interface TextMessage {
type: "text";
content: string;
}

3. tool_use - Tool Call

Sent before the Agent invokes a tool.

interface ToolUseMessage {
type: "tool_use";
id: string; // Tool call ID
name: string; // Tool name
input: unknown; // Tool input parameters
}

4. tool_result - Tool Result

Sent after a tool execution completes.

interface ToolResultMessage {
type: "tool_result";
toolUseId: string; // Corresponding tool_use ID
output: string; // Tool output
isError?: boolean; // Whether this is an error result
}

5. plan - Execution Plan

Sent when the Agent generates an execution plan. The frontend should pause and wait for user approval.

interface PlanMessage {
type: "plan";
plan: {
id: string;
goal: string;
steps: Array<{
id: string;
description: string;
status: "pending" | "in_progress" | "completed" | "failed";
}>;
notes?: string;
};
}

6. question - Interactive Question

Sent when the Agent needs user input. The frontend should display a question form and wait for a response.

interface QuestionMessage {
type: "question";
id: string;
questions: Array<{
header: string;
question: string;
options: Array<{
label: string;
description?: string;
}>;
multiSelect: boolean;
}>;
}

7. result - Task Completion

Sent when task execution completes.

interface ResultMessage {
type: "result";
cost?: number; // API call cost
duration?: number; // Execution duration (ms)
subtype?: "success" | "error" | "error_max_turns";
}

8. error - Error

Sent when an error occurs.

interface ErrorMessage {
type: "error";
message: string;
}

9. done - End Marker

Sent before the SSE stream ends.

interface DoneMessage {
type: "done";
}

Backend Implementation

Hono SSE Helper

// packages/core/src/gateway/routes/agent.ts

import { Hono } from "hono";
import { streamSSE } from "hono/streaming";

const agent = new Hono();

agent.post("/run", async (c) => {
const body = await c.req.json();

return streamSSE(c, async (stream) => {
// Send session ID
await stream.writeSSE({
data: JSON.stringify({ type: "session", sessionId: "12345" }),
});

// Execute Agent and stream responses
for await (const message of executeAgent(body)) {
await stream.writeSSE({
data: JSON.stringify(message),
});
}

// Send end marker
await stream.writeSSE({
data: JSON.stringify({ type: "done" }),
});
});
});

Setting Response Headers

// Hono's streamSSE automatically sets these headers
// For custom configuration:

c.header("Content-Type", "text/event-stream");
c.header("Cache-Control", "no-cache, no-transform");
c.header("Connection", "keep-alive");
c.header("X-Accel-Buffering", "no"); // Disable Nginx buffering

Frontend Implementation

Using fetch + ReadableStream

// apps/desktop/src/hooks/use-agent.ts

async function connectSSE(url: string, body: object): Promise<void> {
const response = await fetch(url, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify(body),
signal: abortController.signal,
});

const reader = response.body?.getReader();
if (!reader) throw new Error("No response body");

const decoder = new TextDecoder();
let buffer = "";

while (true) {
const { done, value } = await reader.read();
if (done) break;

buffer += decoder.decode(value, { stream: true });
const lines = buffer.split("\n");

// Keep the last line (may be incomplete)
buffer = lines.pop() || "";

for (const line of lines) {
if (line.startsWith("data: ")) {
const data = line.slice(6);
if (data) {
const message = JSON.parse(data);
handleMessage(message);
}
}
}
}
}

Message Handling

function handleMessage(message: SSEMessage): void {
switch (message.type) {
case "session":
setSessionId(message.sessionId);
break;

case "text":
// Accumulate text to current assistant message
setMessages((prev) => {
const last = prev[prev.length - 1];
if (last && last.type === "text") {
return [
...prev.slice(0, -1),
{ ...last, content: last.content + message.content },
];
}
return [...prev, { id: Date.now().toString(), type: "text", content: message.content }];
});
break;

case "tool_use":
setMessages((prev) => [...prev, {
id: message.id,
type: "tool_use",
name: message.name,
input: message.input,
}]);
break;

case "tool_result":
setMessages((prev) => [...prev, {
id: Date.now().toString(),
type: "tool_result",
toolUseId: message.toolUseId,
output: message.output,
isError: message.isError,
}]);
break;

case "plan":
setPlan(message.plan);
setPhase("awaiting_approval");
break;

case "question":
setPendingQuestion({
id: message.id,
questions: message.questions,
});
break;

case "result":
onComplete?.(message.cost, message.duration);
break;

case "error":
setError(message.message);
break;

case "done":
setPhase("idle");
break;
}
}

Error Handling

Connection Errors

try {
await connectSSE(url, body);
} catch (error) {
if (error.name === "AbortError") {
// User cancelled, no handling needed
return;
}

// Check for network errors
if (error.message === "Load failed" || error.message === "Failed to fetch") {
setError("Connection failed, please check if Gateway is running");
} else {
setError(error.message);
}
}

Retry Strategy

const MAX_RETRIES = 3;
const RETRY_DELAY = 1000;

async function connectWithRetry(url: string, body: object): Promise<void> {
let lastError: Error | null = null;

for (let i = 0; i < MAX_RETRIES; i++) {
try {
await connectSSE(url, body);
return;
} catch (error) {
lastError = error as Error;
if (error.name === "AbortError") throw error;
await new Promise((r) => setTimeout(r, RETRY_DELAY * (i + 1)));
}
}

throw lastError;
}

Testing

Mock SSE Server

// Test mock server
import { Hono } from "hono";
import { streamSSE } from "hono/streaming";

const mockServer = new Hono();

mockServer.post("/api/agent/run", (c) => {
return streamSSE(c, async (stream) => {
await stream.writeSSE({ data: JSON.stringify({ type: "session", sessionId: "test" }) });
await new Promise(r => setTimeout(r, 100));
await stream.writeSSE({ data: JSON.stringify({ type: "text", content: "Hello" }) });
await new Promise(r => setTimeout(r, 100));
await stream.writeSSE({ data: JSON.stringify({ type: "done" }) });
});
});