Guides
Handle Webhooks
Receive and process Voisnap webhook events reliably — signature verification, idempotency, async processing, and framework examples.
Handle Webhooks
Webhooks are the primary way to receive real-time notifications from Voisnap. This guide covers building a production-grade webhook handler.
The golden rules
- Respond with 2xx within 5 seconds — processing happens async
- Verify the signature on every request — reject anything that fails
- Use the Delivery-ID as an idempotency key — retries will resend the same event
- Store events async — write to a queue or database, don't do heavy work inline
Express.js handler
import express from 'express';
import crypto from 'crypto';
const app = express();
const WEBHOOK_SECRET = process.env.VOISNAP_WEBHOOK_SECRET;
// Critical: use raw body parser for signature verification
app.use('/webhooks/voisnap', express.raw({ type: 'application/json' }));
app.post('/webhooks/voisnap', async (req, res) => {
const signature = req.headers['x-webhook-signature'];
const timestamp = req.headers['x-webhook-timestamp'];
const deliveryId = req.headers['x-webhook-delivery-id'];
const body = req.body; // Buffer
// 1. Verify timestamp freshness
if (Math.abs(Date.now() / 1000 - parseInt(timestamp)) > 300) {
return res.status(400).json({ error: 'Timestamp too old' });
}
// 2. Verify HMAC signature
const signedContent = `${timestamp}.${body.toString()}`;
const expected = 'sha256=' + crypto
.createHmac('sha256', WEBHOOK_SECRET)
.update(signedContent)
.digest('hex');
if (!crypto.timingSafeEqual(Buffer.from(expected), Buffer.from(signature))) {
return res.status(401).json({ error: 'Invalid signature' });
}
// 3. Acknowledge immediately
res.status(200).json({ received: true });
// 4. Process async (non-blocking)
const event = JSON.parse(body.toString());
processEventAsync(deliveryId, event).catch(console.error);
});
async function processEventAsync(deliveryId, event) {
// Check idempotency
const alreadyProcessed = await db.webhookEvents.exists({ deliveryId });
if (alreadyProcessed) {
console.log(`Skipping duplicate delivery: ${deliveryId}`);
return;
}
try {
switch (event.type) {
case 'SessionStarted':
await handleSessionStarted(event.data);
break;
case 'SessionEnded':
await handleSessionEnded(event.data);
break;
case 'AnalysisCompleted':
await handleAnalysisCompleted(event.data);
break;
case 'TransferInitiated':
await handleTransfer(event.data);
break;
case 'Error':
await handleAgentError(event.data);
break;
default:
console.log(`Unknown event type: ${event.type}`);
}
// Mark as processed
await db.webhookEvents.insert({ deliveryId, processedAt: new Date() });
} catch (err) {
console.error(`Failed to process ${deliveryId}:`, err);
// Log to your error tracker (Sentry, Datadog, etc.)
}
}
async function handleSessionEnded(data) {
console.log(`Session ended: ${data.conversationId}, duration: ${data.durationSeconds}s`);
// Update your database, trigger follow-up workflows, etc.
await db.conversations.upsert({
id: data.conversationId,
status: 'completed',
durationSeconds: data.durationSeconds,
cost: data.cost,
endReason: data.endReason,
});
}
async function handleAnalysisCompleted(data) {
await db.conversations.update(data.conversationId, {
sentiment: data.sentiment,
intent: data.intent,
summary: data.summary,
outcome: data.customFields?.resolution_status,
});
// Trigger follow-up email, CRM update, etc.
}
app.listen(3000);
FastAPI handler
from fastapi import FastAPI, Request, HTTPException, BackgroundTasks
from contextlib import asynccontextmanager
import hmac, hashlib, time, json
app = FastAPI()
WEBHOOK_SECRET = "whsec_your_secret"
# Processed delivery IDs (use Redis or DB in production)
processed_deliveries: set[str] = set()
@app.post("/webhooks/voisnap")
async def voisnap_webhook(request: Request, background_tasks: BackgroundTasks):
signature = request.headers.get("X-Webhook-Signature", "")
timestamp = request.headers.get("X-Webhook-Timestamp", "0")
delivery_id = request.headers.get("X-Webhook-Delivery-Id", "")
body = await request.body()
# Verify timestamp
if abs(time.time() - int(timestamp)) > 300:
raise HTTPException(status_code=400, detail="Webhook timestamp too old")
# Verify signature
signed = f"{timestamp}.{body.decode()}"
expected = "sha256=" + hmac.new(
WEBHOOK_SECRET.encode(), signed.encode(), hashlib.sha256
).hexdigest()
if not hmac.compare_digest(expected, signature):
raise HTTPException(status_code=401, detail="Invalid signature")
# Acknowledge immediately
event = json.loads(body)
background_tasks.add_task(process_event, delivery_id, event)
return {"status": "received"}
async def process_event(delivery_id: str, event: dict):
# Idempotency
if delivery_id in processed_deliveries:
return
processed_deliveries.add(delivery_id)
event_type = event["type"]
data = event["data"]
if event_type == "SessionEnded":
print(f"Session ended: {data['conversationId']}, {data['durationSeconds']}s")
# await db.update_conversation(data["conversationId"], ...)
elif event_type == "AnalysisCompleted":
print(f"Analysis ready: {data['conversationId']}")
print(f" Sentiment: {data['sentiment']} ({data['sentimentScore']})")
print(f" Summary: {data['summary']}")
elif event_type == "Error":
print(f"Agent error [{data['errorCode']}]: {data['errorMessage']}")
# Alert your team if not recoverable
if not data.get("recoverable"):
await notify_team(data)
Queue-based architecture
For high-volume deployments, push events to a queue and process them in workers:
# In your webhook handler — just enqueue
import boto3
import json
sqs = boto3.client('sqs')
QUEUE_URL = "https://sqs.us-east-1.amazonaws.com/123456789/voisnap-events"
async def process_event(delivery_id: str, event: dict):
if delivery_id in processed_deliveries:
return
sqs.send_message(
QueueUrl=QUEUE_URL,
MessageBody=json.dumps(event),
MessageDeduplicationId=delivery_id, # SQS FIFO deduplication
MessageGroupId=event.get("agentId", "default"),
)
# In a separate worker process
def worker_loop():
while True:
messages = sqs.receive_message(
QueueUrl=QUEUE_URL,
MaxNumberOfMessages=10,
WaitTimeSeconds=20,
).get("Messages", [])
for msg in messages:
event = json.loads(msg["Body"])
handle_event(event)
sqs.delete_message(QueueUrl=QUEUE_URL, ReceiptHandle=msg["ReceiptHandle"])
Testing webhooks locally
Use ngrok or Cloudflare Tunnel to expose your local server:
# ngrok
ngrok http 3000
# Use the HTTPS URL for your webhook: https://abc123.ngrok.io/webhooks/voisnap
Then create a webhook in Voisnap pointing to your tunnel URL.
:::tip Use the Console's Webhooks → Delivery Log to inspect payloads, view retry history, and manually replay failed deliveries during development. :::