Session Persistence
File-system based session persistence design for real-time message saving during SSE streaming.
Overview
The session persistence system unifies the two independent session systems (in-memory AgentService sessions and file-based SessionStore sessions) into a single file-system-based persistence layer that saves messages in real-time during SSE streaming.
Design Goals
- Real-time persistence during SSE streaming - Each message is immediately appended to file
- Unified data model - Session -> Task -> Message three-layer structure
- Background task support - Tasks continue executing and persisting when user switches away
- Task recovery - Application state can be restored after restart
File Storage Structure
~/.viben/
+-- agents/
| +-- <agent-id>/
| +-- config.yaml # Agent configuration
| +-- .agent_sessions/
| +-- <session-id>/
| +-- config.yaml # Session configuration
| +-- messages.ui.jsonl # UI rendering messages (append-only)
| +-- messages.rollout.jsonl # Messages sent to Agent
| +-- messages.agent.jsonl # Agent raw responses
+-- tasks/
| +-- <task-id>.yaml # Task configuration
+-- files/
+-- <file-id>/
+-- meta.yaml # File metadata
+-- <filename> # Actual file
Data Model
Session
Storage: ~/.viben/agents/<agent-id>/.agent_sessions/<session-id>/config.yaml
id: "20260224153000_hello-world" # Format: YYYYMMDDHHmmss_slug
agent_id: "personal-assistant"
prompt: "Hello world"
task_count: 3
status: "active" # active | completed | error
workspace_path: "/Users/xxx/project"
created_at: "2026-02-24T07:30:00.000Z"
updated_at: "2026-02-24T07:35:00.000Z"
interface SessionConfig {
id: string;
agentId: string;
prompt: string;
taskCount: number;
status: 'active' | 'completed' | 'error';
workspacePath?: string;
createdAt: string;
updatedAt: string;
metadata?: Record<string, unknown>;
}
Task
Storage: ~/.viben/tasks/<task-id>.yaml
id: "task_1708771800000_abc123"
session_id: "20260224153000_hello-world"
agent_id: "personal-assistant"
task_index: 1
prompt: "Please write a Hello World"
status: "completed" # running | completed | error | stopped
cost: 0.0023
duration: 12500
favorite: false
created_at: "2026-02-24T07:30:00.000Z"
updated_at: "2026-02-24T07:30:12.500Z"
interface TaskConfig {
id: string;
sessionId: string;
agentId: string;
taskIndex: number;
prompt: string;
status: 'running' | 'completed' | 'error' | 'stopped';
cost?: number;
duration?: number;
favorite?: boolean;
createdAt: string;
updatedAt: string;
}
Message (UI)
Storage: messages.ui.jsonl (JSON Lines, append-only)
{"id":"msg_1","taskId":"task_1","type":"user","content":"Hello","timestamp":"2026-02-24T07:30:00.000Z"}
{"id":"msg_2","taskId":"task_1","type":"text","content":"Hi there!","timestamp":"2026-02-24T07:30:01.000Z"}
{"id":"msg_3","taskId":"task_1","type":"tool_use","toolName":"Read","toolUseId":"tu_1","toolInput":{"file_path":"/tmp/test.txt"},"timestamp":"2026-02-24T07:30:02.000Z"}
{"id":"msg_4","taskId":"task_1","type":"tool_result","toolUseId":"tu_1","toolOutput":"File content...","timestamp":"2026-02-24T07:30:03.000Z"}
TypeScript Types (Discriminated Union):
type UIMessage =
| UIUserMessage
| UITextMessage
| UIToolUseMessage
| UIToolResultMessage
| UIPlanMessage
| UIQuestionMessage
| UIErrorMessage
| UIResultMessage;
Each message type extends a base interface with id, taskId, and timestamp fields.
Core Interface
SessionStore
Location: packages/core/src/services/session-store.ts
export class SessionStore {
// Session Operations
async createSession(config: SessionConfig): Promise<void>;
async getSession(agentId: string, sessionId: string): Promise<SessionConfig>;
async listSessions(agentId: string): Promise<SessionConfig[]>;
async updateSession(agentId: string, sessionId: string, updates: Partial<SessionConfig>): Promise<void>;
async deleteSession(agentId: string, sessionId: string): Promise<void>;
// Message Operations (UI) - append-only, no buffering
async appendUIMessage(agentId: string, sessionId: string, message: UIMessage): Promise<void>;
async readUIMessages(agentId: string, sessionId: string): Promise<UIMessage[]>;
async readUIMessagesByTask(agentId: string, sessionId: string, taskId: string): Promise<UIMessage[]>;
// Task Operations
async createTask(config: TaskConfig): Promise<void>;
async getTask(taskId: string): Promise<TaskConfig | null>;
async listTasksBySession(sessionId: string): Promise<TaskConfig[]>;
async updateTask(taskId: string, updates: Partial<TaskConfig>): Promise<void>;
// File Operations
async createFile(file: LibraryFile, content?: Buffer): Promise<void>;
async getFile(fileId: string): Promise<LibraryFile | null>;
async listFilesByTask(taskId: string): Promise<LibraryFile[]>;
}
Key Implementation Details
Session ID Generation: Uses timestamp + slug format YYYYMMDDHHmmss_slug.
Message Append (append-only):
async appendUIMessage(agentId: string, sessionId: string, message: UIMessage): Promise<void> {
const messagesPath = this.uiMessagesPath(agentId, sessionId);
const json = JSON.stringify(message);
await appendFile(messagesPath, json + '\n');
}
Message Read (with fallback): If messages.ui.jsonl is empty, falls back to reading messages.rollout.jsonl and converting.
SSE Stream Persistence
During SSE stream processing, each message is persisted immediately:
const processStream = async (response: Response, currentTaskId: string) => {
const reader = response.body?.getReader();
const decoder = new TextDecoder();
let buffer = '';
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split('\n');
buffer = lines.pop() || '';
for (const line of lines) {
if (line.startsWith('data: ')) {
const data = JSON.parse(line.slice(6));
// 1. Update UI state (active task only)
if (isActiveTask()) {
setMessages(prev => [...prev, data]);
}
// 2. Immediately persist to file system (all tasks)
await sessionStore.appendUIMessage(agentId, sessionId, {
id: generateId(),
taskId: currentTaskId,
type: data.type,
content: data.content,
timestamp: new Date().toISOString(),
});
// 3. Update task status
await updateTaskFromMessage(currentTaskId, data);
}
}
}
};
API Endpoint Changes
/api/agent/run Modifications
New request fields:
interface AgentRunRequest {
prompt: string;
cwd?: string;
agentConfigPath?: string;
agentConfig?: AgentConfigPayload;
sessionId?: string; // File system session ID
taskId?: string; // File system task ID
}
Behavior changes:
agentIdis automatically derived fromagentConfigPathoragentConfig- If
sessionIdandtaskIdare provided, messages are persisted in real-time - Backward compatible: without
sessionId/taskId, behavior is unchanged
New Endpoints
GET /api/agent/:agentId/sessions/:sessionId/tasks
GET /api/agent/:agentId/sessions/:sessionId/tasks/:taskId/messages
PATCH /api/tasks/:taskId
Migration Plan
- Phase 1: Extend SessionStore with Task and File operations
- Phase 2: Modify API endpoints for real-time persistence
- Phase 3: Modify frontend hooks for background task support
- Phase 4: Remove duplicate in-memory session system
Related Documentation
- SSE Streaming - SSE communication protocol
- Background Tasks - Background task management
- Sessions API - Session management endpoints