Skip to content

Webhook Log System

A unified, auditable webhook processing system for handling callbacks from external services.

The WebhookLog system provides:

  • Audit trail for all incoming webhooks
  • State machine for reliable processing
  • Retry logic with exponential backoff
  • Duplicate detection to prevent reprocessing
  • CRM interface for monitoring and manual intervention
ProviderCategoryUse Case
assemblyaitranscription_completeAudio/video transcription callbacks
oxylabsprice_checkWeb scraping result callbacks
sendgriddelivery, bounce, engagement, suppressionEmail event webhooks
┌─────────┐ callback ┌───────┐ process ┌────────────┐ success ┌───────────┐
│ pending │ ───────────────► │ ready │ ──────────────► │ processing │ ──────────────► │ processed │
└─────────┘ └───────┘ └────────────┘ └───────────┘
│ ▲ │
│ timeout │ reprocess │ failure
▼ │ ▼
┌───────────┐ │ ┌───────┐
│ exception │ ◄───────────────────┴────────────────────│ retry │ (max 5 attempts)
└───────────┘ └───────┘
  • pending: Job submitted, awaiting webhook callback
  • ready: Webhook received, queued for processing
  • processing: Currently being processed by worker
  • processed: Successfully completed
  • retry: Failed, scheduled for retry (exponential backoff)
  • exception: Failed after max retries, requires manual intervention
  • archived: Manually archived

When submitting a job that expects a webhook callback:

# When submitting transcription to AssemblyAI
transcript_id = assemblyai_client.submit_transcription(audio_url, webhook_url: callback_url)
WebhookLog.create_pending!(
provider: 'assemblyai',
category: 'transcription_complete',
resource_type: 'CallRecord',
resource_id: call_record.id,
external_id: transcript_id,
data: { submitted_at: Time.current.iso8601 },
notes: "Transcription submitted for CallRecord #{call_record.id}"
)

In the webhook controller:

def create
# Validate authentication first
webhook_log = WebhookLog.ingest!(
provider: 'assemblyai',
category: 'transcription_complete',
resource_type: 'CallRecord',
resource_id: params[:call_record_id],
external_id: params[:transcript_id],
data: JSON.parse(request.raw_post),
notes: "Status: #{params[:status]}"
)
# Queue for async processing
WebhookProcessorWorker.perform_async(webhook_log.id)
head :ok
end

The WebhookProcessorWorker handles processing:

class WebhookProcessorWorker
def perform(webhook_log_id)
webhook_log = WebhookLog.find(webhook_log_id)
webhook_log.process! # Delegates to appropriate processor
end
end
module WebhookProcessors
class MyProviderProcessor
def self.call(webhook_log)
new(webhook_log).call
end
def initialize(webhook_log)
@webhook_log = webhook_log
@data = webhook_log.data
end
def call
# Process the webhook data
# Return a hash to store in response_data
{ status: 'success', processed_at: Time.current }
end
end
end

Register the processor in WebhookLog#processor_class:

def processor_class
case provider
when 'myprovider'
WebhookProcessors::MyProviderProcessor
# ...
end
end

Duplicates are detected by external_id:

# If SendGrid retries a webhook, the same sg_event_id won't be processed twice
WebhookLog.ingest!(
provider: 'sendgrid',
external_id: params[:sg_event_id], # Unique per event
# ...
)
# Returns existing entry if already processed

Failed webhooks are retried with exponential backoff:

AttemptDelay
15 minutes
215 minutes
31 hour
44 hours
524 hours

After 5 failures, the entry moves to exception state.

The StaleTranscriptionRecoveryWorker runs hourly to:

  1. Find pending entries older than 1 hour
  2. Check the external service for status
  3. Transition to ready if the job completed
  4. Transition to exception if the job failed

Access at /webhook_logs:

  • Index: Filterable list with stats
  • Show: Full payload and response data
  • Actions: Reprocess, Archive
  • Provider (AssemblyAI, Oxylabs, SendGrid)
  • Category
  • State
  • Date range
  • External ID

JWT tokens with embedded resource info:

AssemblyaiCallbackTokenService.generate_token(
resource_type: 'CallRecord',
resource_id: 123
)
# Validates on callback with AssemblyaiCallbackTokenService.validate_token(token)

JWT tokens via Retailer::CallbackTokenService:

Retailer::CallbackTokenService.callback_url(catalog_item_id: 123)

ECDSA signature verification:

# In controller
verifier = Sendgrid::SignatureVerifier.new
return head :unauthorized unless verifier.verify(request)

Configuration:

# ENV takes priority, then Heatwave::Configuration
ENV['SENDGRID_WEBHOOK_VERIFICATION_KEY'] ||
Heatwave::Configuration.fetch(:sendgrid_api, :sendgrid_webhook_verification_key)

Old entries are pruned by Maintenance::PurgeOldTrackingEvents:

  • processed, archived, exception, pending > 1 month: Deleted
  • retry > 1 week: Transitioned to exception
  1. Add provider to WebhookLog::PROVIDERS
  2. Add categories to WebhookLog::CATEGORIES
  3. Create controller in app/controllers/webhooks/v1/
  4. Create processor in app/services/webhook_processors/
  5. Register processor in WebhookLog#processor_class
  6. Add route in config/routes/api.rb
  7. Update CRM helpers for display
  1. Check logs for incoming requests
  2. Verify callback URL is accessible (dev tunnel for local)
  3. Check authentication (JWT expiry, signature verification)

The entry may have crashed mid-process:

  1. Check notes field for error details
  2. Use “Reprocess” action in CRM
  3. Check Sidekiq for failed jobs
  1. Check notes for error messages
  2. Verify external service is accessible
  3. Check processor logic for bugs
  4. Manually archive if unrecoverable