Class: CallRecordBulkTranscriptionWorker

Inherits:
Object
  • Object
show all
Includes:
Sidekiq::IterableJob, Sidekiq::Worker
Defined in:
app/workers/call_record_bulk_transcription_worker.rb

Overview

Nightly worker to backfill historical call record transcriptions using the
cheaper GPT-4o-mini-transcribe model via RubyLLM.

Uses Sidekiq::IterableJob so progress is saved after each record — a deploy
or worker restart resumes from the last successful call rather than restarting.

Each iteration downloads the audio, transcribes synchronously, saves the
transcript, and queues an EmbeddingWorker job. This is intentionally serial
(not fan-out) to control API throughput and costs.

Processes oldest-first by default (most recent calls are handled by the
primary AssemblyAI pipeline via DailyCallRecordTranscriptionWorker).

Scheduled: Nightly via config/sidekiq_production_schedule.yml

Examples:

Backfill calls older than 1 year (default: 200/run)

CallRecordBulkTranscriptionWorker.perform_async

Custom limit and model

CallRecordBulkTranscriptionWorker.perform_async(
  'limit' => 500,
  'model' => 'whisper-1',
  'min_age_days' => 365
)

Constant Summary collapse

DEFAULT_LIMIT =

Conservative default: ~200 calls/night ≈ 10 hours of audio ≈ ~$1.80/night

200
DEFAULT_MIN_AGE_DAYS =

Only backfill calls older than this to avoid conflicting with the
primary AssemblyAI pipeline (DailyCallRecordTranscriptionWorker)

365
MIN_DURATION_SECONDS =
CallRecord::MIN_TRANSCRIPTION_DURATION
MIN_DURATION_SECONDS_VOICEMAIL =
CallRecord::MIN_TRANSCRIPTION_DURATION_VOICEMAIL

Instance Method Summary collapse

Instance Method Details

#build_enumerator(options = nil, cursor:) ⇒ Object



44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
# File 'app/workers/call_record_bulk_transcription_worker.rb', line 44

def build_enumerator(options = nil, cursor:)
  opts = options.to_h.with_indifferent_access
  @limit = (opts[:limit] || DEFAULT_LIMIT).to_i
  @model = opts[:model] || CallRecordProcessing::BulkTranscriptionService::DEFAULT_MODEL
  @min_age_days = (opts[:min_age_days] || DEFAULT_MIN_AGE_DAYS).to_i
  @success_count = 0
  @skip_count = 0
  @error_count = 0

  scope = build_scope
  candidate_count = scope.count
  log_info "Starting bulk transcription: #{candidate_count} candidates, limit #{@limit}, model #{@model}, min_age #{@min_age_days}d"

  return nil if candidate_count.zero?

  active_record_records_enumerator(scope.limit(@limit), cursor: cursor)
end

#each_iteration(call_record, *_args) ⇒ Object



62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
# File 'app/workers/call_record_bulk_transcription_worker.rb', line 62

def each_iteration(call_record, *_args)
  service = CallRecordProcessing::BulkTranscriptionService.new(call_record, model: @model)
  result = service.transcribe

  case result[:status]
  when :success
    @success_count += 1
    log_info "Transcribed ##{call_record.id}: #{result[:word_count]} words (#{result[:duration_secs]}s)" if (@success_count % 20).zero?
  when :skipped
    @skip_count += 1
  when :error
    @error_count += 1
    log_error "Failed ##{call_record.id}: #{result[:reason]}#{result[:message]}"
  end

  sleep(rand(0.5..1.5))
rescue StandardError => e
  @error_count += 1
  log_error "Unexpected error for ##{call_record.id}: #{e.message}"
  ErrorReporting.error(e)
end

#on_completeObject



84
85
86
# File 'app/workers/call_record_bulk_transcription_worker.rb', line 84

def on_complete
  log_info "Complete: #{@success_count} transcribed, #{@skip_count} skipped, #{@error_count} errors"
end