Class: WebhookLog

Inherits:
ApplicationRecord show all
Defined in:
app/models/webhook_log.rb

Overview

== Schema Information

Table name: webhook_logs
Database name: primary

id :bigint not null, primary key
category :string not null
data :jsonb not null
next_attempt :datetime
notes :text
process_attempts :integer default(0), not null
processed_at :datetime
provider :string not null
resource_type :string
response_data :jsonb
state :string default("ready"), not null
created_at :datetime not null
updated_at :datetime not null
external_id :string
resource_id :integer

Indexes

idx_webhook_logs_provider_category_state (provider,category,state)
idx_webhook_logs_resource (resource_type,resource_id)
idx_webhook_logs_state_created (state,created_at)
index_webhook_logs_on_category (category)
index_webhook_logs_on_external_id (external_id)
index_webhook_logs_on_next_attempt (next_attempt) USING brin

Constant Summary collapse

PROVIDERS =

Providers that can send webhooks

%w[assemblyai freightquote oxylabs sendgrid sftpgo shipengine switchvox].freeze
CATEGORIES =

Categories by provider

{
  'assemblyai' => %w[transcription_complete],
  'freightquote' => %w[
    load_created load_booked load_cancelled load_picked_up load_delivered
    order_created order_updated order_rejected order_canceled order_completed
    appointment_updated carrier_arrived carrier_departed
    in_transit in_transit_to_origin pro_number_added chr_tracking_number_published
    unknown
  ],
  'oxylabs' => %w[price_check price_check_complete],
  'sendgrid' => %w[delivery bounce engagement suppression unknown],
  'sftpgo' => %w[recording_uploaded],
  'shipengine' => %w[tracking_update],
  'switchvox' => %w[new_voicemail checked_voicemail incoming_call route_to_extension call_answered call_hangup outgoing_call agent_login agent_logout unknown]
}.freeze
MAX_RETRY_ATTEMPTS =

Maximum retry attempts before moving to exception

5
RETRY_DELAYS =

Retry delay calculation (exponential backoff)

[5.minutes, 15.minutes, 1.hour, 4.hours, 24.hours].freeze

Constants included from Schedulable

Schedulable::SIMPLE_FORM_OPTIONS

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from ApplicationRecord

ransortable_attributes, #to_relation

Methods included from Schedulable

config

Methods included from Models::AfterCommittable

#after_commit

Methods included from Models::EventPublishable

#publish_event

Instance Attribute Details

#categoryObject (readonly)



61
# File 'app/models/webhook_log.rb', line 61

validates :category, presence: true

#dataObject (readonly)



62
# File 'app/models/webhook_log.rb', line 62

validates :data, presence: true

#providerObject (readonly)



60
# File 'app/models/webhook_log.rb', line 60

validates :provider, presence: true

Class Method Details

.awaiting_callbackActiveRecord::Relation<WebhookLog>

A relation of WebhookLogs that are awaiting callback. Active Record Scope

Returns:

See Also:



107
# File 'app/models/webhook_log.rb', line 107

scope :awaiting_callback, -> { where(state: 'pending') }

.create_pending!(provider:, category:, resource_type:, resource_id:, external_id: nil, data: {}, notes: nil) ⇒ WebhookLog

Class method to create a pending entry when submitting a job
Call this when you submit a job that expects a webhook callback

Parameters:

  • provider (String)

    The webhook provider (e.g., 'assemblyai')

  • category (String)

    The webhook category (e.g., 'transcription_complete')

  • resource_type (String)

    Associated resource type (e.g., 'CallRecord', 'Video')

  • resource_id (Integer)

    Associated resource ID

  • external_id (String, nil) (defaults to: nil)

    Optional external ID from provider (e.g., job_id)

  • data (Hash) (defaults to: {})

    Optional metadata about the submission

  • notes (String, nil) (defaults to: nil)

    Optional notes about the submission

Returns:



217
218
219
220
221
222
223
224
225
226
227
228
# File 'app/models/webhook_log.rb', line 217

def self.create_pending!(provider:, category:, resource_type:, resource_id:, external_id: nil, data: {}, notes: nil)
  create!(
    provider: provider,
    category: category,
    resource_type: resource_type,
    resource_id: resource_id,
    external_id: external_id,
    data: data,
    state: 'pending',
    notes: notes
  )
end

.find_by_external_id(provider, external_id) ⇒ Object

Find any entry by external_id (for duplicate detection)
Uses the external_id column, not JSON data



310
311
312
313
314
315
316
# File 'app/models/webhook_log.rb', line 310

def self.find_by_external_id(provider, external_id)
  return nil if external_id.blank?

  where(provider: provider, external_id: external_id)
    .order(created_at: :desc)
    .first
end

.find_pending_for_resource(provider, category, resource_type, resource_id) ⇒ Object

Find a pending entry for a specific resource



319
320
321
322
323
324
325
326
327
328
329
# File 'app/models/webhook_log.rb', line 319

def self.find_pending_for_resource(provider, category, resource_type, resource_id)
  return nil if resource_type.blank? || resource_id.blank?

  where(
    provider: provider,
    category: category,
    resource_type: resource_type,
    resource_id: resource_id,
    state: 'pending'
  ).order(created_at: :desc).first
end

.for_resourceActiveRecord::Relation<WebhookLog>

A relation of WebhookLogs that are for resource. Active Record Scope

Returns:

See Also:



109
# File 'app/models/webhook_log.rb', line 109

scope :for_resource, ->(resource) { where(resource_type: resource.class.name, resource_id: resource.id) }

.ingest!(provider:, category:, data:, resource_type: nil, resource_id: nil, external_id: nil, notes: nil) ⇒ WebhookLog

Class method to ingest a webhook callback
Finds existing pending entry or creates a new ready entry
Prevents duplicates by checking external_id

Parameters:

  • provider (String)

    The webhook provider (e.g., 'assemblyai')

  • category (String)

    The webhook category (e.g., 'transcription_complete')

  • resource_type (String, nil) (defaults to: nil)

    Associated resource type

  • resource_id (Integer, nil) (defaults to: nil)

    Associated resource ID

  • data (Hash)

    The webhook payload

  • external_id (Object, nil) (defaults to: nil)
  • notes (Object, nil) (defaults to: nil)

Returns:

  • (WebhookLog)

    The updated or created log entry



241
242
243
244
245
246
247
248
249
250
# File 'app/models/webhook_log.rb', line 241

def self.ingest!(provider:, category:, data:, resource_type: nil, resource_id: nil, external_id: nil, notes: nil)
  log = resolve_ingest(provider:, category:, data:, resource_type:, resource_id:, external_id:, notes:)
  # Confirm the row durably committed before the controller ACKs 200 to the
  # provider. A silent fake-success commit (see Durability) would otherwise
  # drop the webhook with no retry and no trace; raising here makes the
  # controller return 5xx so the provider re-delivers. Verify by natural key
  # when we have an external_id (the high-value providers all send one).
  Durability.confirm_persisted!(self, { provider: provider, external_id: external_id }, context: { category: category }) if external_id.present?
  log
end

.payload_containsActiveRecord::Relation<WebhookLog>

A relation of WebhookLogs that are payload contains. Active Record Scope

Returns:

See Also:



115
116
117
118
119
120
# File 'app/models/webhook_log.rb', line 115

scope :payload_contains, ->(term) {
  return none if term.blank?

  sanitized = "%#{sanitize_sql_like(term)}%"
  where('data::text ILIKE :term OR response_data::text ILIKE :term', term: sanitized)
}

.providers_for_selectObject

Helper for tom-select dropdowns



391
392
393
# File 'app/models/webhook_log.rb', line 391

def self.providers_for_select
  PROVIDERS.map { |p| [p.titleize, p] }
end

.ransackable_associations(_auth_object = nil) ⇒ Object



128
129
130
# File 'app/models/webhook_log.rb', line 128

def self.ransackable_associations(_auth_object = nil)
  []
end

.ransackable_attributes(_auth_object = nil) ⇒ Object

Ransack attributes for search



123
124
125
126
# File 'app/models/webhook_log.rb', line 123

def self.ransackable_attributes(_auth_object = nil)
  %w[id provider category resource_type resource_id external_id state process_attempts
     created_at updated_at processed_at next_attempt notes]
end

.ransackable_scopes(_auth_object = nil) ⇒ Object

Custom Ransack scopes for advanced searching



133
134
135
# File 'app/models/webhook_log.rb', line 133

def self.ransackable_scopes(_auth_object = nil)
  %i[payload_contains]
end

.recentActiveRecord::Relation<WebhookLog>

A relation of WebhookLogs that are recent. Active Record Scope

Returns:

See Also:



111
# File 'app/models/webhook_log.rb', line 111

scope :recent, ->(hours = 24) { where(created_at: hours.hours.ago..) }

.requiring_processingActiveRecord::Relation<WebhookLog>

A relation of WebhookLogs that are requiring processing. Active Record Scope

Returns:

See Also:



93
94
95
96
97
# File 'app/models/webhook_log.rb', line 93

scope :requiring_processing, -> {
  where(state: 'ready')
    .or(where(state: 'retry').where(next_attempt: ..Time.current))
    .order(:created_at)
}

.stale_pendingActiveRecord::Relation<WebhookLog>

A relation of WebhookLogs that are stale pending. Active Record Scope

Returns:

See Also:



100
101
102
103
104
# File 'app/models/webhook_log.rb', line 100

scope :stale_pending, ->(threshold = 1.hour) {
  where(state: 'pending')
    .where(created_at: ..threshold.ago)
    .order(:created_at)
}

.states_for_selectObject

Helper for tom-select dropdowns



386
387
388
# File 'app/models/webhook_log.rb', line 386

def self.states_for_select
  state_machine.states.map { |s| [s.name.to_s.titleize, s.name.to_s] }
end

Instance Method Details

#data_jsonObject

Virtual attribute for editing data as JSON string
Used in the edit form to allow manual payload editing



67
68
69
70
71
# File 'app/models/webhook_log.rb', line 67

def data_json
  JSON.pretty_generate(data) if data.present?
rescue JSON::GeneratorError
  data.to_s
end

#data_json=(json_string) ⇒ Object



73
74
75
76
77
78
79
80
81
# File 'app/models/webhook_log.rb', line 73

def data_json=(json_string)
  return if json_string.blank?

  self.data = JSON.parse(json_string)
rescue JSON::ParserError => e
  @data_json_error = e.message
  # Keep the raw string so we can show an error and preserve user input
  @data_json_raw = json_string
end

#display_nameObject

Display name for logs



401
402
403
# File 'app/models/webhook_log.rb', line 401

def display_name
  "#{provider.titleize} - #{category} (##{id})"
end

#human_state_nameObject

Human readable state name



396
397
398
# File 'app/models/webhook_log.rb', line 396

def human_state_name
  state.to_s.titleize
end

#process!Object

Process this webhook log entry
Delegates to the appropriate processor based on provider/category



333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
# File 'app/models/webhook_log.rb', line 333

def process!
  return unless can_start_processing?

  start_processing!

  begin
    result = processor_class.call(self)
    self.response_data = result if result.is_a?(Hash)
    self.notes = nil # Clear any previous error notes on success
    complete!
  rescue StandardError => e
    self.notes = "#{e.class}: #{e.message}\n#{e.backtrace&.first(5)&.join("\n")}"

    if process_attempts >= MAX_RETRY_ATTEMPTS
      fail!
      ErrorReporting.error(e, webhook_log_id: id, provider: provider, category: category)
    else
      schedule_retry!
      Rails.logger.warn "[WebhookLog] Scheduled retry for #{id}: #{e.message}"
    end
  end
end

#processor_classObject

Get the appropriate processor class for this webhook



357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
# File 'app/models/webhook_log.rb', line 357

def processor_class
  case provider
  when 'assemblyai'
    WebhookProcessors::AssemblyaiProcessor
  when 'freightquote'
    WebhookProcessors::FreightquoteProcessor
  when 'oxylabs'
    WebhookProcessors::OxylabsProcessor
  when 'sendgrid'
    WebhookProcessors::SendgridProcessor
  when 'sftpgo'
    WebhookProcessors::SftpgoProcessor
  when 'shipengine'
    WebhookProcessors::ShipengineProcessor
  when 'switchvox'
    WebhookProcessors::SwitchvoxProcessor
  else
    raise NotImplementedError, "No processor for provider: #{provider}"
  end
end

#resourceObject

Find the associated resource



379
380
381
382
383
# File 'app/models/webhook_log.rb', line 379

def resource
  return nil if resource_type.blank? || resource_id.blank?

  resource_type.constantize.find_by(id: resource_id)
end