Skip to main content

Streaming & Real-Time

Deeployd provides comprehensive real-time streaming capabilities for agent responses, data pipelines, and event monitoring. All streaming uses Server-Sent Events (SSE) for efficient, browser-compatible real-time communication.

Conversation Streaming

Stream agent responses token-by-token for real-time chat experiences.

Basic Streaming

// Stream a conversation message
const eventSource = new EventSource(
`/api/conversations/${conversationId}/messages?stream=true`,
{ headers: { Authorization: `Bearer ${token}` } }
);

eventSource.onmessage = (event) => {
const data = JSON.parse(event.data);

switch (data.type) {
case 'token':
// Append token to response
appendToResponse(data.token);
break;
case 'tool_call':
// Agent is using a tool
showToolCall(data.name, data.arguments);
break;
case 'done':
// Response complete
console.log('Tokens:', data.usage);
console.log('Cost:', data.cost);
eventSource.close();
break;
case 'error':
console.error('Error:', data.message);
eventSource.close();
break;
}
};

SDK Streaming

// Using the TypeScript SDK
const stream = await client.conversations.sendMessageStream({
conversationId: 'conv-123',
content: 'Explain quantum computing'
});

for await (const event of stream) {
if (event.type === 'token') {
process.stdout.write(event.token);
} else if (event.type === 'done') {
console.log('\n\nTokens used:', event.usage.total);
}
}

Stream Event Types

EventDescriptionPayload
tokenGenerated text token{ token: string }
tool_callTool execution started{ name: string, arguments: object }
doneResponse complete{ usage: TokenUsage, cost: number }
errorError occurred{ message: string, code?: string }

Data Streaming Pipelines

Build real-time data processing pipelines for continuous data flows.

Creating a Pipeline

const pipeline = await client.streaming.createPipeline({
name: 'agent-events-to-analytics',
source: {
type: 'agent_execution',
agentIds: ['agent-123', 'agent-456']
},
transforms: [
{
type: 'filter',
config: {
field: 'status',
operator: 'eq',
value: 'completed'
}
},
{
type: 'map',
config: {
expression: {
event_type: '"agent_completion"',
agent_id: 'agentId',
tokens: 'tokenUsage.total',
cost: 'cost',
timestamp: 'completedAt'
}
}
}
],
destination: {
type: 'analytics',
config: {
provider: 'mixpanel',
projectToken: '${secrets.MIXPANEL_TOKEN}'
}
}
});

// Start the pipeline
await client.streaming.startPipeline(pipeline.id);

Pipeline Source Types

SourceDescriptionUse Case
agent_executionAgent run eventsAnalytics, monitoring
webhookIncoming webhooksExternal integrations
database_changeDB change data captureSync, replication
api_eventAPI activity eventsAudit, logging
scheduleScheduled triggersBatch processing

Transform Operations

Filter

{
type: 'filter',
config: {
field: 'status',
operator: 'eq', // eq, neq, gt, gte, lt, lte, contains, in, regex
value: 'completed'
}
}

Map

{
type: 'map',
config: {
expression: {
// Direct field mapping
userId: 'user.id',
// String functions
userName: 'upper(user.name)',
// Concatenation
fullName: 'concat(user.firstName, " ", user.lastName)'
}
}
}

Aggregate

{
type: 'aggregate',
config: {
groupBy: ['agentId'],
aggregations: [
{ field: 'cost', function: 'sum', alias: 'totalCost' },
{ field: 'tokens', function: 'avg', alias: 'avgTokens' },
{ field: '*', function: 'count', alias: 'runCount' }
]
}
}

Window

{
type: 'window',
config: {
type: 'tumbling', // tumbling, sliding, session
size: 60000, // 1 minute in ms
aggregations: [
{ field: 'cost', function: 'sum' }
]
}
}

Batch

{
type: 'batch',
config: {
size: 100, // Items per batch
timeout: 5000 // Max wait time ms
}
}

Destination Types

Webhook

{
type: 'webhook',
config: {
url: 'https://api.example.com/events',
method: 'POST',
headers: {
'Authorization': 'Bearer ${secrets.API_KEY}'
},
retryPolicy: {
maxRetries: 3,
backoffMs: 1000
}
}
}

Database

{
type: 'database',
config: {
table: 'agent_metrics',
operation: 'upsert', // insert, upsert, update
conflictFields: ['agent_id', 'date']
}
}

Analytics

{
type: 'analytics',
config: {
provider: 'segment', // mixpanel, amplitude, segment, custom
writeKey: '${secrets.SEGMENT_KEY}'
}
}

Notification

{
type: 'notification',
config: {
channel: 'slack',
webhook: '${secrets.SLACK_WEBHOOK}',
template: 'Agent {{agentName}} completed with {{tokens}} tokens'
}
}

Pipeline Management

// List pipelines
const pipelines = await client.streaming.listPipelines();

// Get pipeline details
const pipeline = await client.streaming.getPipeline('pipeline-123');

// Control pipeline
await client.streaming.startPipeline('pipeline-123');
await client.streaming.pausePipeline('pipeline-123');
await client.streaming.resumePipeline('pipeline-123');
await client.streaming.stopPipeline('pipeline-123');

// Get metrics
const metrics = await client.streaming.getPipelineMetrics('pipeline-123');
console.log({
itemsProcessed: metrics.itemsProcessed,
itemsFailed: metrics.itemsFailed,
throughputPerSecond: metrics.throughput,
avgLatencyMs: metrics.latency.avg
});

// Create checkpoint (for recovery)
await client.streaming.createCheckpoint('pipeline-123');

// Health check
const health = await client.streaming.getPipelineHealth('pipeline-123');

Command Center Streaming

Real-time monitoring of agent executions across your organization.

Subscribing to Events

const eventSource = new EventSource(
'/api/command-center/stream',
{ headers: { Authorization: `Bearer ${token}` } }
);

eventSource.onmessage = (event) => {
const data = JSON.parse(event.data);

switch (data.type) {
case 'run:started':
addRunToLiveView(data.run);
break;
case 'run:progress':
updateRunProgress(data.runId, data.tokens, data.cost);
break;
case 'run:tool_started':
showToolExecution(data.runId, data.tool);
break;
case 'run:completed':
markRunComplete(data.runId, data.result);
break;
case 'alert:triggered':
showAlert(data.alert);
break;
}
};

Event Types

EventDescription
run:startedAgent run initiated
run:status_changedRun status transition
run:progressToken/cost progress update
run:tool_startedTool execution begins
run:tool_completedTool execution ends
run:llm_callLLM API call metrics
run:memory_opMemory operation
run:errorExecution error
alert:triggeredAlert condition met
alert:resolvedAlert condition cleared

Filtering Events

// Filter by specific agents
const params = new URLSearchParams({
agentIds: 'agent-123,agent-456',
eventTypes: 'run:started,run:completed,run:error'
});

const eventSource = new EventSource(
`/api/command-center/stream?${params}`,
{ headers: { Authorization: `Bearer ${token}` } }
);

A2A Task Streaming

Stream task updates for agent-to-agent communication.

Task Event Stream

const eventSource = new EventSource(
`/api/a2a/tasks/${taskId}/stream`,
{ headers: { Authorization: `Bearer ${token}` } }
);

eventSource.onmessage = (event) => {
const data = JSON.parse(event.data);

switch (data.type) {
case 'task':
// Initial task state
initializeTaskView(data.task);
break;
case 'message':
// Text message from agent
appendMessage(data.message);
break;
case 'statusUpdate':
// Status change
updateStatus(data.status, data.message);
break;
case 'artifactUpdate':
// Artifact being produced
if (data.lastChunk) {
finalizeArtifact(data.artifact);
} else {
appendToArtifact(data.chunk);
}
break;
}
};

Task Status Flow

submitted ─▶ working ─▶ completed

├─▶ input-required (HITL)
│ │
│ ▼
│ working

└─▶ failed

Voice Streaming

Stream text-to-speech audio in real-time.

Audio Streaming

// Stream TTS audio
const response = await fetch('/api/voice/tts/stream', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${token}`
},
body: JSON.stringify({
text: 'Hello, how can I help you today?',
voice: 'alloy',
format: 'mp3'
})
});

// Play streaming audio
const audioContext = new AudioContext();
const reader = response.body.getReader();

while (true) {
const { done, value } = await reader.read();
if (done) break;

const audioBuffer = await audioContext.decodeAudioData(value.buffer);
const source = audioContext.createBufferSource();
source.buffer = audioBuffer;
source.connect(audioContext.destination);
source.start();
}

Best Practices

1. Handle Reconnection

function createReconnectingEventSource(url: string) {
let eventSource: EventSource;
let reconnectAttempts = 0;
const maxReconnectAttempts = 5;

function connect() {
eventSource = new EventSource(url);

eventSource.onopen = () => {
reconnectAttempts = 0;
};

eventSource.onerror = () => {
eventSource.close();

if (reconnectAttempts < maxReconnectAttempts) {
reconnectAttempts++;
const delay = Math.min(1000 * Math.pow(2, reconnectAttempts), 30000);
setTimeout(connect, delay);
}
};

return eventSource;
}

return connect();
}

2. Clean Up Connections

// Always close EventSource when done
useEffect(() => {
const eventSource = new EventSource(url);

// ... setup handlers

return () => {
eventSource.close();
};
}, [url]);

3. Buffer for UI Updates

// Batch UI updates for performance
let tokenBuffer = '';
let updateScheduled = false;

eventSource.onmessage = (event) => {
const data = JSON.parse(event.data);

if (data.type === 'token') {
tokenBuffer += data.token;

if (!updateScheduled) {
updateScheduled = true;
requestAnimationFrame(() => {
updateUI(tokenBuffer);
updateScheduled = false;
});
}
}
};

4. Handle Backpressure

// For pipelines, monitor metrics
const metrics = await client.streaming.getPipelineMetrics(pipelineId);

if (metrics.backpressure > 0.8) {
// Pipeline is struggling, consider:
// - Increasing batch size
// - Adding more transforms to reduce data
// - Scaling destination capacity
console.warn('Pipeline backpressure high:', metrics.backpressure);
}

API Reference

Streaming Endpoints

# Conversations
POST /api/conversations/:id/messages # Stream=true for SSE

# Pipelines
POST /api/streaming/pipelines # Create pipeline
GET /api/streaming/pipelines # List pipelines
GET /api/streaming/pipelines/:id # Get pipeline
POST /api/streaming/pipelines/:id/start # Start pipeline
POST /api/streaming/pipelines/:id/stop # Stop pipeline
POST /api/streaming/pipelines/:id/pause # Pause pipeline
POST /api/streaming/pipelines/:id/resume # Resume pipeline
POST /api/streaming/pipelines/:id/process # Push single item
POST /api/streaming/pipelines/:id/batch # Push batch
GET /api/streaming/pipelines/:id/metrics # Get metrics
GET /api/streaming/pipelines/:id/health # Health check
GET /api/streaming/stats # Global stats

# Command Center
GET /api/command-center/stream # SSE event stream

# A2A Tasks
GET /api/a2a/tasks/:id/stream # Task event stream

# Voice
POST /api/voice/tts/stream # Stream TTS audio

Next: Learn about Avatars for video responses from agents.