Voisnap Docs
Guides

Handle Webhooks

Receive and process Voisnap webhook events reliably — signature verification, idempotency, async processing, and framework examples.

Handle Webhooks

Webhooks are the primary way to receive real-time notifications from Voisnap. This guide covers building a production-grade webhook handler.


The golden rules

  1. Respond with 2xx within 5 seconds — processing happens async
  2. Verify the signature on every request — reject anything that fails
  3. Use the Delivery-ID as an idempotency key — retries will resend the same event
  4. Store events async — write to a queue or database, don't do heavy work inline

Express.js handler

import express from 'express';
import crypto from 'crypto';
 
const app = express();
const WEBHOOK_SECRET = process.env.VOISNAP_WEBHOOK_SECRET;
 
// Critical: use raw body parser for signature verification
app.use('/webhooks/voisnap', express.raw({ type: 'application/json' }));
 
app.post('/webhooks/voisnap', async (req, res) => {
  const signature = req.headers['x-webhook-signature'];
  const timestamp = req.headers['x-webhook-timestamp'];
  const deliveryId = req.headers['x-webhook-delivery-id'];
  const body = req.body; // Buffer
 
  // 1. Verify timestamp freshness
  if (Math.abs(Date.now() / 1000 - parseInt(timestamp)) > 300) {
    return res.status(400).json({ error: 'Timestamp too old' });
  }
 
  // 2. Verify HMAC signature
  const signedContent = `${timestamp}.${body.toString()}`;
  const expected = 'sha256=' + crypto
    .createHmac('sha256', WEBHOOK_SECRET)
    .update(signedContent)
    .digest('hex');
 
  if (!crypto.timingSafeEqual(Buffer.from(expected), Buffer.from(signature))) {
    return res.status(401).json({ error: 'Invalid signature' });
  }
 
  // 3. Acknowledge immediately
  res.status(200).json({ received: true });
 
  // 4. Process async (non-blocking)
  const event = JSON.parse(body.toString());
  processEventAsync(deliveryId, event).catch(console.error);
});
 
async function processEventAsync(deliveryId, event) {
  // Check idempotency
  const alreadyProcessed = await db.webhookEvents.exists({ deliveryId });
  if (alreadyProcessed) {
    console.log(`Skipping duplicate delivery: ${deliveryId}`);
    return;
  }
 
  try {
    switch (event.type) {
      case 'SessionStarted':
        await handleSessionStarted(event.data);
        break;
      case 'SessionEnded':
        await handleSessionEnded(event.data);
        break;
      case 'AnalysisCompleted':
        await handleAnalysisCompleted(event.data);
        break;
      case 'TransferInitiated':
        await handleTransfer(event.data);
        break;
      case 'Error':
        await handleAgentError(event.data);
        break;
      default:
        console.log(`Unknown event type: ${event.type}`);
    }
 
    // Mark as processed
    await db.webhookEvents.insert({ deliveryId, processedAt: new Date() });
  } catch (err) {
    console.error(`Failed to process ${deliveryId}:`, err);
    // Log to your error tracker (Sentry, Datadog, etc.)
  }
}
 
async function handleSessionEnded(data) {
  console.log(`Session ended: ${data.conversationId}, duration: ${data.durationSeconds}s`);
  // Update your database, trigger follow-up workflows, etc.
  await db.conversations.upsert({
    id: data.conversationId,
    status: 'completed',
    durationSeconds: data.durationSeconds,
    cost: data.cost,
    endReason: data.endReason,
  });
}
 
async function handleAnalysisCompleted(data) {
  await db.conversations.update(data.conversationId, {
    sentiment: data.sentiment,
    intent: data.intent,
    summary: data.summary,
    outcome: data.customFields?.resolution_status,
  });
  // Trigger follow-up email, CRM update, etc.
}
 
app.listen(3000);

FastAPI handler

from fastapi import FastAPI, Request, HTTPException, BackgroundTasks
from contextlib import asynccontextmanager
import hmac, hashlib, time, json
 
app = FastAPI()
WEBHOOK_SECRET = "whsec_your_secret"
 
# Processed delivery IDs (use Redis or DB in production)
processed_deliveries: set[str] = set()
 
@app.post("/webhooks/voisnap")
async def voisnap_webhook(request: Request, background_tasks: BackgroundTasks):
    signature = request.headers.get("X-Webhook-Signature", "")
    timestamp = request.headers.get("X-Webhook-Timestamp", "0")
    delivery_id = request.headers.get("X-Webhook-Delivery-Id", "")
    body = await request.body()
 
    # Verify timestamp
    if abs(time.time() - int(timestamp)) > 300:
        raise HTTPException(status_code=400, detail="Webhook timestamp too old")
 
    # Verify signature
    signed = f"{timestamp}.{body.decode()}"
    expected = "sha256=" + hmac.new(
        WEBHOOK_SECRET.encode(), signed.encode(), hashlib.sha256
    ).hexdigest()
    if not hmac.compare_digest(expected, signature):
        raise HTTPException(status_code=401, detail="Invalid signature")
 
    # Acknowledge immediately
    event = json.loads(body)
    background_tasks.add_task(process_event, delivery_id, event)
    return {"status": "received"}
 
 
async def process_event(delivery_id: str, event: dict):
    # Idempotency
    if delivery_id in processed_deliveries:
        return
    processed_deliveries.add(delivery_id)
 
    event_type = event["type"]
    data = event["data"]
 
    if event_type == "SessionEnded":
        print(f"Session ended: {data['conversationId']}, {data['durationSeconds']}s")
        # await db.update_conversation(data["conversationId"], ...)
 
    elif event_type == "AnalysisCompleted":
        print(f"Analysis ready: {data['conversationId']}")
        print(f"  Sentiment: {data['sentiment']} ({data['sentimentScore']})")
        print(f"  Summary: {data['summary']}")
 
    elif event_type == "Error":
        print(f"Agent error [{data['errorCode']}]: {data['errorMessage']}")
        # Alert your team if not recoverable
        if not data.get("recoverable"):
            await notify_team(data)

Queue-based architecture

For high-volume deployments, push events to a queue and process them in workers:

# In your webhook handler — just enqueue
import boto3
import json
 
sqs = boto3.client('sqs')
QUEUE_URL = "https://sqs.us-east-1.amazonaws.com/123456789/voisnap-events"
 
async def process_event(delivery_id: str, event: dict):
    if delivery_id in processed_deliveries:
        return
 
    sqs.send_message(
        QueueUrl=QUEUE_URL,
        MessageBody=json.dumps(event),
        MessageDeduplicationId=delivery_id,  # SQS FIFO deduplication
        MessageGroupId=event.get("agentId", "default"),
    )
 
# In a separate worker process
def worker_loop():
    while True:
        messages = sqs.receive_message(
            QueueUrl=QUEUE_URL,
            MaxNumberOfMessages=10,
            WaitTimeSeconds=20,
        ).get("Messages", [])
 
        for msg in messages:
            event = json.loads(msg["Body"])
            handle_event(event)
            sqs.delete_message(QueueUrl=QUEUE_URL, ReceiptHandle=msg["ReceiptHandle"])

Testing webhooks locally

Use ngrok or Cloudflare Tunnel to expose your local server:

# ngrok
ngrok http 3000
 
# Use the HTTPS URL for your webhook: https://abc123.ngrok.io/webhooks/voisnap

Then create a webhook in Voisnap pointing to your tunnel URL.

:::tip Use the Console's Webhooks → Delivery Log to inspect payloads, view retry history, and manually replay failed deliveries during development. :::

On this page