Streaming & Real-Time
Deeployd provides comprehensive real-time streaming capabilities for agent responses, data pipelines, and event monitoring. All streaming uses Server-Sent Events (SSE) for efficient, browser-compatible real-time communication.
Conversation Streaming
Stream agent responses token-by-token for real-time chat experiences.
Basic Streaming
// Stream a conversation message
const eventSource = new EventSource(
`/api/conversations/${conversationId}/messages?stream=true`,
{ headers: { Authorization: `Bearer ${token}` } }
);
eventSource.onmessage = (event) => {
const data = JSON.parse(event.data);
switch (data.type) {
case 'token':
// Append token to response
appendToResponse(data.token);
break;
case 'tool_call':
// Agent is using a tool
showToolCall(data.name, data.arguments);
break;
case 'done':
// Response complete
console.log('Tokens:', data.usage);
console.log('Cost:', data.cost);
eventSource.close();
break;
case 'error':
console.error('Error:', data.message);
eventSource.close();
break;
}
};
SDK Streaming
// Using the TypeScript SDK
const stream = await client.conversations.sendMessageStream({
conversationId: 'conv-123',
content: 'Explain quantum computing'
});
for await (const event of stream) {
if (event.type === 'token') {
process.stdout.write(event.token);
} else if (event.type === 'done') {
console.log('\n\nTokens used:', event.usage.total);
}
}
Stream Event Types
| Event | Description | Payload |
|---|---|---|
token | Generated text token | { token: string } |
tool_call | Tool execution started | { name: string, arguments: object } |
done | Response complete | { usage: TokenUsage, cost: number } |
error | Error occurred | { message: string, code?: string } |
Data Streaming Pipelines
Build real-time data processing pipelines for continuous data flows.
Creating a Pipeline
const pipeline = await client.streaming.createPipeline({
name: 'agent-events-to-analytics',
source: {
type: 'agent_execution',
agentIds: ['agent-123', 'agent-456']
},
transforms: [
{
type: 'filter',
config: {
field: 'status',
operator: 'eq',
value: 'completed'
}
},
{
type: 'map',
config: {
expression: {
event_type: '"agent_completion"',
agent_id: 'agentId',
tokens: 'tokenUsage.total',
cost: 'cost',
timestamp: 'completedAt'
}
}
}
],
destination: {
type: 'analytics',
config: {
provider: 'mixpanel',
projectToken: '${secrets.MIXPANEL_TOKEN}'
}
}
});
// Start the pipeline
await client.streaming.startPipeline(pipeline.id);
Pipeline Source Types
| Source | Description | Use Case |
|---|---|---|
agent_execution | Agent run events | Analytics, monitoring |
webhook | Incoming webhooks | External integrations |
database_change | DB change data capture | Sync, replication |
api_event | API activity events | Audit, logging |
schedule | Scheduled triggers | Batch processing |
Transform Operations
Filter
{
type: 'filter',
config: {
field: 'status',
operator: 'eq', // eq, neq, gt, gte, lt, lte, contains, in, regex
value: 'completed'
}
}
Map
{
type: 'map',
config: {
expression: {
// Direct field mapping
userId: 'user.id',
// String functions
userName: 'upper(user.name)',
// Concatenation
fullName: 'concat(user.firstName, " ", user.lastName)'
}
}
}
Aggregate
{
type: 'aggregate',
config: {
groupBy: ['agentId'],
aggregations: [
{ field: 'cost', function: 'sum', alias: 'totalCost' },
{ field: 'tokens', function: 'avg', alias: 'avgTokens' },
{ field: '*', function: 'count', alias: 'runCount' }
]
}
}
Window
{
type: 'window',
config: {
type: 'tumbling', // tumbling, sliding, session
size: 60000, // 1 minute in ms
aggregations: [
{ field: 'cost', function: 'sum' }
]
}
}
Batch
{
type: 'batch',
config: {
size: 100, // Items per batch
timeout: 5000 // Max wait time ms
}
}
Destination Types
Webhook
{
type: 'webhook',
config: {
url: 'https://api.example.com/events',
method: 'POST',
headers: {
'Authorization': 'Bearer ${secrets.API_KEY}'
},
retryPolicy: {
maxRetries: 3,
backoffMs: 1000
}
}
}
Database
{
type: 'database',
config: {
table: 'agent_metrics',
operation: 'upsert', // insert, upsert, update
conflictFields: ['agent_id', 'date']
}
}
Analytics
{
type: 'analytics',
config: {
provider: 'segment', // mixpanel, amplitude, segment, custom
writeKey: '${secrets.SEGMENT_KEY}'
}
}
Notification
{
type: 'notification',
config: {
channel: 'slack',
webhook: '${secrets.SLACK_WEBHOOK}',
template: 'Agent {{agentName}} completed with {{tokens}} tokens'
}
}
Pipeline Management
// List pipelines
const pipelines = await client.streaming.listPipelines();
// Get pipeline details
const pipeline = await client.streaming.getPipeline('pipeline-123');
// Control pipeline
await client.streaming.startPipeline('pipeline-123');
await client.streaming.pausePipeline('pipeline-123');
await client.streaming.resumePipeline('pipeline-123');
await client.streaming.stopPipeline('pipeline-123');
// Get metrics
const metrics = await client.streaming.getPipelineMetrics('pipeline-123');
console.log({
itemsProcessed: metrics.itemsProcessed,
itemsFailed: metrics.itemsFailed,
throughputPerSecond: metrics.throughput,
avgLatencyMs: metrics.latency.avg
});
// Create checkpoint (for recovery)
await client.streaming.createCheckpoint('pipeline-123');
// Health check
const health = await client.streaming.getPipelineHealth('pipeline-123');
Command Center Streaming
Real-time monitoring of agent executions across your organization.
Subscribing to Events
const eventSource = new EventSource(
'/api/command-center/stream',
{ headers: { Authorization: `Bearer ${token}` } }
);
eventSource.onmessage = (event) => {
const data = JSON.parse(event.data);
switch (data.type) {
case 'run:started':
addRunToLiveView(data.run);
break;
case 'run:progress':
updateRunProgress(data.runId, data.tokens, data.cost);
break;
case 'run:tool_started':
showToolExecution(data.runId, data.tool);
break;
case 'run:completed':
markRunComplete(data.runId, data.result);
break;
case 'alert:triggered':
showAlert(data.alert);
break;
}
};
Event Types
| Event | Description |
|---|---|
run:started | Agent run initiated |
run:status_changed | Run status transition |
run:progress | Token/cost progress update |
run:tool_started | Tool execution begins |
run:tool_completed | Tool execution ends |
run:llm_call | LLM API call metrics |
run:memory_op | Memory operation |
run:error | Execution error |
alert:triggered | Alert condition met |
alert:resolved | Alert condition cleared |
Filtering Events
// Filter by specific agents
const params = new URLSearchParams({
agentIds: 'agent-123,agent-456',
eventTypes: 'run:started,run:completed,run:error'
});
const eventSource = new EventSource(
`/api/command-center/stream?${params}`,
{ headers: { Authorization: `Bearer ${token}` } }
);
A2A Task Streaming
Stream task updates for agent-to-agent communication.
Task Event Stream
const eventSource = new EventSource(
`/api/a2a/tasks/${taskId}/stream`,
{ headers: { Authorization: `Bearer ${token}` } }
);
eventSource.onmessage = (event) => {
const data = JSON.parse(event.data);
switch (data.type) {
case 'task':
// Initial task state
initializeTaskView(data.task);
break;
case 'message':
// Text message from agent
appendMessage(data.message);
break;
case 'statusUpdate':
// Status change
updateStatus(data.status, data.message);
break;
case 'artifactUpdate':
// Artifact being produced
if (data.lastChunk) {
finalizeArtifact(data.artifact);
} else {
appendToArtifact(data.chunk);
}
break;
}
};
Task Status Flow
submitted ─▶ working ─▶ completed
│
├─▶ input-required (HITL)
│ │
│ ▼
│ working
│
└─▶ failed
Voice Streaming
Stream text-to-speech audio in real-time.
Audio Streaming
// Stream TTS audio
const response = await fetch('/api/voice/tts/stream', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${token}`
},
body: JSON.stringify({
text: 'Hello, how can I help you today?',
voice: 'alloy',
format: 'mp3'
})
});
// Play streaming audio
const audioContext = new AudioContext();
const reader = response.body.getReader();
while (true) {
const { done, value } = await reader.read();
if (done) break;
const audioBuffer = await audioContext.decodeAudioData(value.buffer);
const source = audioContext.createBufferSource();
source.buffer = audioBuffer;
source.connect(audioContext.destination);
source.start();
}
Best Practices
1. Handle Reconnection
function createReconnectingEventSource(url: string) {
let eventSource: EventSource;
let reconnectAttempts = 0;
const maxReconnectAttempts = 5;
function connect() {
eventSource = new EventSource(url);
eventSource.onopen = () => {
reconnectAttempts = 0;
};
eventSource.onerror = () => {
eventSource.close();
if (reconnectAttempts < maxReconnectAttempts) {
reconnectAttempts++;
const delay = Math.min(1000 * Math.pow(2, reconnectAttempts), 30000);
setTimeout(connect, delay);
}
};
return eventSource;
}
return connect();
}
2. Clean Up Connections
// Always close EventSource when done
useEffect(() => {
const eventSource = new EventSource(url);
// ... setup handlers
return () => {
eventSource.close();
};
}, [url]);
3. Buffer for UI Updates
// Batch UI updates for performance
let tokenBuffer = '';
let updateScheduled = false;
eventSource.onmessage = (event) => {
const data = JSON.parse(event.data);
if (data.type === 'token') {
tokenBuffer += data.token;
if (!updateScheduled) {
updateScheduled = true;
requestAnimationFrame(() => {
updateUI(tokenBuffer);
updateScheduled = false;
});
}
}
};
4. Handle Backpressure
// For pipelines, monitor metrics
const metrics = await client.streaming.getPipelineMetrics(pipelineId);
if (metrics.backpressure > 0.8) {
// Pipeline is struggling, consider:
// - Increasing batch size
// - Adding more transforms to reduce data
// - Scaling destination capacity
console.warn('Pipeline backpressure high:', metrics.backpressure);
}
API Reference
Streaming Endpoints
# Conversations
POST /api/conversations/:id/messages # Stream=true for SSE
# Pipelines
POST /api/streaming/pipelines # Create pipeline
GET /api/streaming/pipelines # List pipelines
GET /api/streaming/pipelines/:id # Get pipeline
POST /api/streaming/pipelines/:id/start # Start pipeline
POST /api/streaming/pipelines/:id/stop # Stop pipeline
POST /api/streaming/pipelines/:id/pause # Pause pipeline
POST /api/streaming/pipelines/:id/resume # Resume pipeline
POST /api/streaming/pipelines/:id/process # Push single item
POST /api/streaming/pipelines/:id/batch # Push batch
GET /api/streaming/pipelines/:id/metrics # Get metrics
GET /api/streaming/pipelines/:id/health # Health check
GET /api/streaming/stats # Global stats
# Command Center
GET /api/command-center/stream # SSE event stream
# A2A Tasks
GET /api/a2a/tasks/:id/stream # Task event stream
# Voice
POST /api/voice/tts/stream # Stream TTS audio
Next: Learn about Avatars for video responses from agents.