Class: CallRecordBulkTranscriptionWorker

Inherits:
Object
  • Object
show all
Includes:
Sidekiq::IterableJob, Sidekiq::Job
Defined in:
app/workers/call_record_bulk_transcription_worker.rb

Overview

Nightly worker to backfill historical call record transcriptions using the
cheap Gemini 3.1 Flash-Lite native-audio model via RubyLLM (model resolved
from AiModelConstants.id(:transcription)).

Uses Sidekiq::IterableJob so progress is saved after each record — a deploy
or worker restart resumes from the last successful call rather than restarting.

Each iteration downloads the audio, transcribes synchronously, saves the
transcript, and queues an EmbeddingWorker job. This is intentionally serial
(not fan-out) to control API throughput and costs.

Processes newest-first (order: created_at DESC) within the eligible window —
any pending/error call older than min_age_days. The premium AssemblyAI
pipeline (at-import enqueue + DailyCallRecordTranscriptionWorker 24h catch-up

  • StaleTranscriptionRecoveryWorker) gets first crack on recent calls; this
    worker sweeps whatever it didn't complete once the call ages past min_age_days,
    which is what closes the former 24h–365d coverage gap.

Scheduled: Nightly via config/sidekiq_production_schedule.yml

Examples:

Backfill eligible calls older than 7 days (default: 200/run)

CallRecordBulkTranscriptionWorker.perform_async

Custom limit / model / age floor

CallRecordBulkTranscriptionWorker.perform_async(
  'limit' => 2000,
  'model' => 'gpt-4o-mini-transcribe',
  'min_age_days' => 30
)

Constant Summary collapse

DEFAULT_LIMIT =

Conservative default when no limit is passed. The scheduled entry overrides
this; raising the scheduled limit to drain the backlog faster is a
bulk-op decision (count-first + confirm) — see CLAUDE.md bulk-op protocol.

200
DEFAULT_MIN_AGE_DAYS =

Cheap pipeline handles any pending/error call older than this. The premium
AssemblyAI pipeline (at-import enqueue + DailyCallRecordTranscriptionWorker
24h catch-up + StaleTranscriptionRecoveryWorker + retries) gets first crack
on recent calls; a 7-day floor leaves that premium window comfortably clear,
and anything still un-transcribed after 7 days falls through to this cheap
backfill instead of sitting pending forever (the prior 365-day floor left a
24h–365d coverage gap).

7
MIN_DURATION_SECONDS =

Minimum duration seconds.

CallRecord::MIN_TRANSCRIPTION_DURATION
MIN_DURATION_SECONDS_VOICEMAIL =

Minimum duration seconds voicemail.

CallRecord::MIN_TRANSCRIPTION_DURATION_VOICEMAIL

Instance Method Summary collapse

Instance Method Details

#build_enumerator(options = nil, cursor:) ⇒ Object



58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
# File 'app/workers/call_record_bulk_transcription_worker.rb', line 58

def build_enumerator(options = nil, cursor:)
  opts = options.to_h.with_indifferent_access
  @limit = (opts[:limit] || DEFAULT_LIMIT).to_i
  @model = opts[:model] || CallRecordProcessing::BulkTranscriptionService::DEFAULT_MODEL
  @min_age_days = (opts[:min_age_days] || DEFAULT_MIN_AGE_DAYS).to_i
  @success_count = 0
  @skip_count = 0
  @error_count = 0

  scope = build_scope
  candidate_count = scope.count
  log_info "Starting bulk transcription: #{candidate_count} candidates, limit #{@limit}, model #{@model}, min_age #{@min_age_days}d"

  return nil if candidate_count.zero?

  active_record_records_enumerator(scope.limit(@limit), cursor: cursor)
end

#each_iteration(call_record, *_args) ⇒ Object



76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
# File 'app/workers/call_record_bulk_transcription_worker.rb', line 76

def each_iteration(call_record, *_args)
  service = CallRecordProcessing::BulkTranscriptionService.new(call_record, model: @model)
  result = service.transcribe

  case result[:status]
  when :success
    @success_count += 1
    log_info "Transcribed ##{call_record.id}: #{result[:word_count]} words (#{result[:duration_secs]}s)" if (@success_count % 20).zero?
  when :skipped
    @skip_count += 1
  when :error
    @error_count += 1
    log_error "Failed ##{call_record.id}: #{result[:reason]}#{result[:message]}"
  end

  sleep(rand(0.5..1.5))
rescue StandardError => e
  @error_count += 1
  log_error "Unexpected error for ##{call_record.id}: #{e.message}"
  ErrorReporting.error(e)
end

#on_completeObject



98
99
100
# File 'app/workers/call_record_bulk_transcription_worker.rb', line 98

def on_complete
  log_info "Complete: #{@success_count} transcribed, #{@skip_count} skipped, #{@error_count} errors"
end