Source Code
The runnable TypeScript source for this lesson is in
lessons/13-client-server/
Lesson 13: Client/Server Separation¶
Overview¶
This lesson implements a client/server architecture for AI agents, separating the agent logic (server) from the user interface (client). This pattern enables:
- Multiple UIs: CLI, web, mobile, IDE plugins can all connect
- Scalability: Server handles multiple concurrent clients
- State Persistence: Sessions survive client disconnects
- Real-time Updates: Event streaming for live feedback
Why Client/Server Architecture?¶
The Problem¶
Early AI agents often tightly couple the UI with the agent logic:
// Tightly coupled - UI and agent mixed together
async function runAgent() {
const input = await readline.question('> '); // UI concern
const response = await llm.chat(input); // Agent concern
console.log(response); // UI concern
}
This makes it impossible to: - Use the same agent from different interfaces - Scale to multiple users - Maintain state across connections - Add features like streaming without rewriting everything
The Solution¶
Separate concerns with a clear API boundary:
+-------------+ +-------------------------------------+
| CLI UI |---->| |
+-------------+ | |
| Agent Server |
+-------------+ | +---------+ +----------------+ |
| Web UI |---->| | Session | | LLM/Tools | |
+-------------+ | | Manager | | Integration | |
| +---------+ +----------------+ |
+-------------+ | |
| Mobile UI |---->| |
+-------------+ +-------------------------------------+
Core Components¶
1. Types (types.ts)¶
Defines the contract between client and server:
// Session represents an ongoing conversation
interface Session {
id: string;
config: SessionConfig;
status: SessionStatus;
createdAt: Date;
lastActivityAt: Date;
messageCount: number;
tokenUsage: TokenUsage;
}
// Message represents a single exchange
interface Message {
id: string;
sessionId: string;
role: MessageRole;
content: string;
timestamp: Date;
}
// Server API defines available operations
interface AgentServerAPI {
createSession(config?: SessionConfig): Promise<Session>;
sendMessage(sessionId: string, content: string): Promise<Message>;
getMessages(sessionId: string, options?: GetMessagesOptions): Promise<Message[]>;
streamMessage(sessionId: string, content: string): AsyncIterable<StreamChunk>;
subscribe(sessionId: string): AsyncIterable<AgentEvent>;
health(): Promise<HealthStatus>;
}
2. Protocol (protocol.ts)¶
Handles message serialization and request/response correlation:
// Message format for wire protocol
interface ProtocolMessage {
version: string;
type: 'request' | 'response' | 'event' | 'ping' | 'pong';
payload: unknown;
}
// Build typed requests
const message = buildRequest('session.create', { model: 'gpt-4' });
// Parse responses
const response = extractResponse<Session>(parsed);
if (response?.success) {
console.log('Created session:', response.data);
}
// Protocol handler routes methods to implementations
const handler = createProtocolHandler();
handler.register('session.create', async (params) => {
return sessionManager.createSession(params);
});
3. Session Manager (session-manager.ts)¶
Manages multiple concurrent sessions:
class SessionManager {
// Lifecycle
createSession(config: SessionConfig): Session;
getSession(sessionId: string): Session | null;
closeSession(sessionId: string): boolean;
// Messaging
addMessage(sessionId: string, role: MessageRole, content: string): Message;
getMessages(sessionId: string, options?: GetMessagesOptions): Message[];
// Events
subscribeToSession(sessionId: string, listener: AgentEventListener): () => void;
emitToSession(sessionId: string, event: AgentEvent): void;
// Maintenance
updateStatus(sessionId: string, status: SessionStatus): void;
getStats(): SessionStats;
}
Key features: - Automatic cleanup: Expired sessions are removed - Event distribution: Subscribers receive real-time updates - Token tracking: Usage is tracked per session
4. Server (server.ts)¶
Exposes the agent via HTTP/WebSocket API:
class AgentServer {
// Lifecycle
async start(): Promise<void>;
async stop(): Promise<void>;
// Connection handling
async handleConnection(connectionId: string): Promise<void>;
async handleMessage(connectionId: string, rawMessage: string): Promise<string | null>;
async closeConnection(connectionId: string): Promise<void>;
}
// Create and start server
const server = createAgentServer({
port: 3000,
maxSessions: 100,
sessionTimeout: 30 * 60 * 1000,
rateLimit: { maxRequests: 100, windowMs: 60000 },
});
await server.start();
5. Client (client.ts)¶
Provides a clean SDK for connecting to servers:
class AgentClient {
// Connection
async connect(): Promise<void>;
async disconnect(): Promise<void>;
getState(): ConnectionState;
// Sessions
async createSession(config?: SessionConfig): Promise<Session>;
async getSession(sessionId: string): Promise<Session | null>;
async closeSession(sessionId: string): Promise<void>;
// Messaging
async sendMessage(sessionId: string, content: string): Promise<Message>;
async *streamMessage(sessionId: string, content: string): AsyncIterable<StreamChunk>;
// Events
subscribe(listener: AgentEventListener): () => void;
}
// Use the client
const client = createAgentClient({ serverUrl: 'http://localhost:3000' });
await client.connect();
const session = await client.createSession({ model: 'gpt-4' });
const response = await client.sendMessage(session.id, 'Hello!');
Key Patterns¶
Request/Response with Correlation IDs¶
Every request has a unique ID that the response references:
// Client sends request with ID
{
"type": "request",
"payload": {
"id": "req-123",
"method": "session.create",
"params": { "model": "gpt-4" }
}
}
// Server responds with same ID
{
"type": "response",
"payload": {
"id": "req-123",
"success": true,
"data": { "id": "session-456", ... }
}
}
Request Queue with Timeouts¶
The client tracks pending requests and handles timeouts:
class RequestQueue {
add<T>(requestId: string, timeout?: number): Promise<T>;
resolve(requestId: string, result: unknown): boolean;
reject(requestId: string, error: Error): boolean;
cancelAll(reason: string): void;
}
// Usage
const responsePromise = queue.add<Session>('req-123', 30000);
// Later, when response arrives
queue.resolve('req-123', sessionData);
Event Streaming¶
Real-time updates via async iterables:
// Server emits events
sessionManager.emitToSession(sessionId, {
type: 'message.delta',
messageId: 'msg-123',
delta: 'Hello',
});
// Client receives events
for await (const event of client.subscribeToSession(sessionId)) {
if (event.type === 'message.delta') {
process.stdout.write(event.delta);
}
}
Connection State Machine¶
Client tracks connection state:
disconnected --- connect() --> connecting --> connected
^ | |
| v |
+---------------------- reconnecting <--------+
(on error)
Rate Limiting¶
Server protects against abuse:
interface RateLimitConfig {
maxRequests: number;
windowMs: number;
}
// Check before processing
if (!checkRateLimit(connectionId)) {
return buildErrorResponse(requestId, 'RATE_LIMITED', 'Too many requests');
}
Streaming Responses¶
Server-Side Streaming¶
// Server streams response chunks
async *generateResponse(sessionId: string, content: string): AsyncIterable<StreamChunk> {
const chunks = await llm.streamChat(content);
for await (const chunk of chunks) {
yield { type: 'text', content: chunk };
// Also emit as event for subscribers
this.emitToSession(sessionId, {
type: 'message.delta',
delta: chunk,
});
}
yield { type: 'done', messageId: 'msg-123' };
}
Client-Side Consumption¶
// Client consumes stream
const fullResponse: string[] = [];
for await (const chunk of client.streamMessage(sessionId, 'Tell me a story')) {
switch (chunk.type) {
case 'text':
process.stdout.write(chunk.content);
fullResponse.push(chunk.content);
break;
case 'done':
console.log('\nCompleted:', chunk.messageId);
break;
case 'error':
console.error('Error:', chunk.error);
break;
}
}
Error Handling¶
Error Codes¶
type ErrorCode =
| 'INVALID_REQUEST' // Malformed request
| 'SESSION_NOT_FOUND' // Session doesn't exist
| 'SESSION_EXPIRED' // Session timed out
| 'RATE_LIMITED' // Too many requests
| 'SERVER_ERROR' // Internal error
| 'TIMEOUT' // Request timed out
| 'CANCELLED' // Operation cancelled
| 'UNAUTHORIZED'; // Auth failed
Custom Error Types¶
class SessionNotFoundError extends Error {
constructor(sessionId: string) {
super(`Session not found: ${sessionId}`);
this.name = 'SessionNotFoundError';
}
}
// Handler maps errors to codes
if (err instanceof SessionNotFoundError) {
return buildErrorResponse(requestId, 'SESSION_NOT_FOUND', err.message);
}
Client Error Handling¶
try {
const session = await client.getSession('invalid-id');
} catch (err) {
if (err.message.includes('SESSION_NOT_FOUND')) {
console.log('Session does not exist');
}
}
Running the Demo¶
Key Takeaways¶
-
Separation of Concerns: UI and agent logic can evolve independently
-
Protocol Design: Clear message formats with correlation IDs enable reliable communication
-
Session Management: Server maintains state, clients are stateless
-
Event Streaming: Async iterables provide clean streaming APIs
-
Error Handling: Typed error codes enable proper client-side handling
-
Rate Limiting: Protect server resources from abuse
-
Connection Management: Handle disconnects and reconnects gracefully
Extension Ideas¶
- Authentication: Add API key validation or OAuth
- WebSocket: Replace HTTP polling with WebSocket for real-time events
- Load Balancing: Add session affinity for multi-server deployments
- Persistence: Store sessions in Redis/database for durability
- Metrics: Add Prometheus metrics for monitoring
Advanced: Session Persistence (JSONL)¶
The production agent implements session persistence using JSONL (JSON Lines) format for crash-safe, append-only storage.
Why JSONL?¶
JSON (entire file must be valid):
{"messages":[{"role":"user",...},{"role":"assistant",...}]}
-> Crash during write = corrupted file, entire history lost
JSONL (each line is independent):
{"type":"message","data":{...}}
{"type":"message","data":{...}}
{"type":"tool_call","data":{...}}
-> Crash during write = only last line corrupted, history safe
Session Entry Types¶
type SessionEntryType =
| 'message' // User or assistant message
| 'tool_call' // Tool invocation
| 'tool_result' // Tool response
| 'checkpoint' // Manual save point
| 'compaction' // Summary of compacted messages
| 'metadata'; // Session metadata updates
interface SessionEntry {
timestamp: string;
type: SessionEntryType;
data: unknown;
}
Session Store¶
class SessionStore {
// Create new session
async createSession(name?: string): Promise<string> {
const id = `session-${Date.now().toString(36)}-${randomId()}`;
const metadata: SessionMetadata = {
id,
name,
createdAt: new Date().toISOString(),
lastActiveAt: new Date().toISOString(),
messageCount: 0,
tokenCount: 0,
};
this.index.sessions.unshift(metadata);
await this.saveIndex();
return id;
}
// Append entry (crash-safe)
async appendEntry(entry: Omit<SessionEntry, 'timestamp'>): Promise<void> {
const fullEntry = {
...entry,
timestamp: new Date().toISOString(),
};
// Append single line to file
const sessionPath = join(this.baseDir, `${sessionId}.jsonl`);
await writeFile(sessionPath, JSON.stringify(fullEntry) + '\n', { flag: 'a' });
// Update index
await this.saveIndex();
}
// Load session entries
async loadSession(sessionId: string): Promise<SessionEntry[]> {
const content = await readFile(`${sessionId}.jsonl`, 'utf-8');
const entries: SessionEntry[] = [];
for (const line of content.split('\n')) {
if (line.trim()) {
try {
entries.push(JSON.parse(line));
} catch {
// Skip corrupted lines (crash recovery)
}
}
}
return entries;
}
// Reconstruct messages for LLM
async loadSessionMessages(sessionId: string): Promise<Message[]> {
const entries = await this.loadSession(sessionId);
const messages: Message[] = [];
for (const entry of entries) {
if (entry.type === 'message') {
messages.push(entry.data as Message);
} else if (entry.type === 'compaction') {
// Insert compaction summary as context
const compaction = entry.data as { summary: string };
messages.push({
role: 'system',
content: `[Previous conversation summary]\n${compaction.summary}`,
});
}
}
return messages;
}
}
Session Index¶
Metadata stored separately for fast listing:
interface SessionIndex {
version: number;
sessions: SessionMetadata[];
}
interface SessionMetadata {
id: string;
name?: string;
createdAt: string;
lastActiveAt: string;
messageCount: number;
tokenCount: number;
summary?: string; // AI-generated summary
}
Compaction Integration¶
When context is compacted (see Lesson 9), store the summary:
// After compaction
await sessionStore.appendEntry({
type: 'compaction',
data: {
summary: 'User was working on implementing OAuth. Key decisions...',
compactedCount: 50, // Messages summarized
compactedAt: new Date().toISOString(),
},
});
Convenience Methods¶
// Quick message append
await store.appendMessage({ role: 'user', content: 'Hello' });
await store.appendMessage({ role: 'assistant', content: 'Hi there!' });
// Tool tracking
await store.appendToolCall({ id: 'call_123', name: 'read_file', args: {...} });
await store.appendToolResult('call_123', { content: '...' });
Events¶
store.on((event) => {
switch (event.type) {
case 'session.created':
console.log(`Created session: ${event.sessionId}`);
break;
case 'session.loaded':
console.log(`Loaded ${event.entryCount} entries`);
break;
case 'entry.appended':
console.log(`Appended ${event.entryType} to ${event.sessionId}`);
break;
}
});
Auto-Pruning¶
Old sessions are automatically removed:
const store = new SessionStore({
baseDir: '.agent/sessions',
maxSessions: 50, // Keep last 50 sessions
autoSave: true, // Save index on each entry
});
// Old sessions pruned when limit exceeded
await store.createSession('new-session');