Webhook Log System
A unified, auditable webhook processing system for handling callbacks from external services.
Overview
The WebhookLog system provides:
- Audit trail for all incoming webhooks
- State machine for reliable processing
- Retry logic with exponential backoff
- Duplicate detection to prevent reprocessing
- CRM interface for monitoring and manual intervention
Supported Providers
| Provider | Category | Use Case |
|---|---|---|
assemblyai |
transcription_complete |
Audio/video transcription callbacks |
oxylabs |
price_check |
Web scraping result callbacks |
sendgrid |
delivery, bounce, engagement, suppression |
Email event webhooks |
State Machine
┌─────────┐ callback ┌───────┐ process ┌────────────┐ success ┌───────────┐
│ pending │ ───────────────► │ ready │ ──────────────► │ processing │ ──────────────► │ processed │
└─────────┘ └───────┘ └────────────┘ └───────────┘
│ ▲ │
│ timeout │ reprocess │ failure
▼ │ ▼
┌───────────┐ │ ┌───────┐
│ exception │ ◄───────────────────┴────────────────────│ retry │ (max 5 attempts)
└───────────┘ └───────┘
States
pending: Job submitted, awaiting webhook callbackready: Webhook received, queued for processingprocessing: Currently being processed by workerprocessed: Successfully completedretry: Failed, scheduled for retry (exponential backoff)exception: Failed after max retries, requires manual interventionarchived: Manually archived
Usage
Creating a Pending Entry
When submitting a job that expects a webhook callback:
# When submitting transcription to AssemblyAI
transcript_id = assemblyai_client.submit_transcription(audio_url, webhook_url: callback_url)
WebhookLog.create_pending!(
provider: 'assemblyai',
category: 'transcription_complete',
resource_type: 'CallRecord',
resource_id: call_record.id,
external_id: transcript_id,
data: { submitted_at: Time.current.iso8601 },
notes: "Transcription submitted for CallRecord #{call_record.id}"
)
Ingesting a Webhook
In the webhook controller:
def create
# Validate authentication first
webhook_log = WebhookLog.ingest!(
provider: 'assemblyai',
category: 'transcription_complete',
resource_type: 'CallRecord',
resource_id: params[:call_record_id],
external_id: params[:transcript_id],
data: JSON.parse(request.raw_post),
notes: "Status: #{params[:status]}"
)
# Queue for async processing
WebhookProcessorWorker.perform_async(webhook_log.id)
head :ok
end
Processing Webhooks
The WebhookProcessorWorker handles processing:
class WebhookProcessorWorker
def perform(webhook_log_id)
webhook_log = WebhookLog.find(webhook_log_id)
webhook_log.process! # Delegates to appropriate processor
end
end
Creating a Processor
module WebhookProcessors
class MyProviderProcessor
def self.call(webhook_log)
new(webhook_log).call
end
def initialize(webhook_log)
@webhook_log = webhook_log
@data = webhook_log.data
end
def call
# Process the webhook data
# Return a hash to store in response_data
{ status: 'success', processed_at: Time.current }
end
end
end
Register the processor in WebhookLog#processor_class:
def processor_class
case provider
when 'myprovider'
WebhookProcessors::MyProviderProcessor
# ...
end
end
Duplicate Detection
Duplicates are detected by external_id:
# If SendGrid retries a webhook, the same sg_event_id won't be processed twice
WebhookLog.ingest!(
provider: 'sendgrid',
external_id: params[:sg_event_id], # Unique per event
# ...
)
# Returns existing entry if already processed
Retry Logic
Failed webhooks are retried with exponential backoff:
| Attempt | Delay |
|---|---|
| 1 | 5 minutes |
| 2 | 15 minutes |
| 3 | 1 hour |
| 4 | 4 hours |
| 5 | 24 hours |
After 5 failures, the entry moves to exception state.
Stale Entry Recovery
The StaleTranscriptionRecoveryWorker runs hourly to:
- Find
pendingentries older than 1 hour - Check the external service for status
- Transition to
readyif the job completed - Transition to
exceptionif the job failed
CRM Interface
Access at /webhook_logs:
- Index: Filterable list with stats
- Show: Full payload and response data
- Actions: Reprocess, Archive
Filters
- Provider (AssemblyAI, Oxylabs, SendGrid)
- Category
- State
- Date range
- External ID
Authentication
AssemblyAI
JWT tokens with embedded resource info:
AssemblyaiCallbackTokenService.generate_token(
resource_type: 'CallRecord',
resource_id: 123
)
# Validates on callback with AssemblyaiCallbackTokenService.validate_token(token)
Oxylabs
JWT tokens via Retailer::CallbackTokenService:
Retailer::CallbackTokenService.callback_url(catalog_item_id: 123)
SendGrid
ECDSA signature verification:
# In controller
verifier = Sendgrid::SignatureVerifier.new
return head :unauthorized unless verifier.verify(request)
Configuration:
# ENV takes priority, then Heatwave::Configuration
ENV['SENDGRID_WEBHOOK_VERIFICATION_KEY'] ||
Heatwave::Configuration.fetch(:sendgrid_api, :sendgrid_webhook_verification_key)
Maintenance
Old entries are pruned by Maintenance::PurgeOldTrackingEvents:
processed,archived,exception,pending> 1 month: Deletedretry> 1 week: Transitioned toexception
Adding a New Provider
- Add provider to
WebhookLog::PROVIDERS - Add categories to
WebhookLog::CATEGORIES - Create controller in
app/controllers/webhooks/v1/ - Create processor in
app/services/webhook_processors/ - Register processor in
WebhookLog#processor_class - Add route in
config/routes/api.rb - Update CRM helpers for display
Troubleshooting
Webhook Not Received
- Check logs for incoming requests
- Verify callback URL is accessible (dev tunnel for local)
- Check authentication (JWT expiry, signature verification)
Stuck in Processing
The entry may have crashed mid-process:
- Check
notesfield for error details - Use "Reprocess" action in CRM
- Check Sidekiq for failed jobs
Too Many Retries
- Check
notesfor error messages - Verify external service is accessible
- Check processor logic for bugs
- Manually archive if unrecoverable