Class: CallRecordBulkTranscriptionWorker
- Inherits:
-
Object
- Object
- CallRecordBulkTranscriptionWorker
- Includes:
- Sidekiq::IterableJob, Sidekiq::Worker
- Defined in:
- app/workers/call_record_bulk_transcription_worker.rb
Overview
Nightly worker to backfill historical call record transcriptions using the
cheaper GPT-4o-mini-transcribe model via RubyLLM.
Uses Sidekiq::IterableJob so progress is saved after each record — a deploy
or worker restart resumes from the last successful call rather than restarting.
Each iteration downloads the audio, transcribes synchronously, saves the
transcript, and queues an EmbeddingWorker job. This is intentionally serial
(not fan-out) to control API throughput and costs.
Processes oldest-first by default (most recent calls are handled by the
primary AssemblyAI pipeline via DailyCallRecordTranscriptionWorker).
Scheduled: Nightly via config/sidekiq_production_schedule.yml
Constant Summary collapse
- DEFAULT_LIMIT =
Conservative default: ~200 calls/night ≈ 10 hours of audio ≈ ~$1.80/night
200- DEFAULT_MIN_AGE_DAYS =
Only backfill calls older than this to avoid conflicting with the
primary AssemblyAI pipeline (DailyCallRecordTranscriptionWorker) 365- MIN_DURATION_SECONDS =
CallRecord::MIN_TRANSCRIPTION_DURATION
- MIN_DURATION_SECONDS_VOICEMAIL =
CallRecord::MIN_TRANSCRIPTION_DURATION_VOICEMAIL
Instance Method Summary collapse
- #build_enumerator(options = nil, cursor:) ⇒ Object
- #each_iteration(call_record, *_args) ⇒ Object
- #on_complete ⇒ Object
Instance Method Details
#build_enumerator(options = nil, cursor:) ⇒ Object
44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 |
# File 'app/workers/call_record_bulk_transcription_worker.rb', line 44 def build_enumerator( = nil, cursor:) opts = .to_h.with_indifferent_access @limit = (opts[:limit] || DEFAULT_LIMIT).to_i @model = opts[:model] || CallRecordProcessing::BulkTranscriptionService::DEFAULT_MODEL @min_age_days = (opts[:min_age_days] || DEFAULT_MIN_AGE_DAYS).to_i @success_count = 0 @skip_count = 0 @error_count = 0 scope = build_scope candidate_count = scope.count log_info "Starting bulk transcription: #{candidate_count} candidates, limit #{@limit}, model #{@model}, min_age #{@min_age_days}d" return nil if candidate_count.zero? active_record_records_enumerator(scope.limit(@limit), cursor: cursor) end |
#each_iteration(call_record, *_args) ⇒ Object
62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 |
# File 'app/workers/call_record_bulk_transcription_worker.rb', line 62 def each_iteration(call_record, *_args) service = CallRecordProcessing::BulkTranscriptionService.new(call_record, model: @model) result = service.transcribe case result[:status] when :success @success_count += 1 log_info "Transcribed ##{call_record.id}: #{result[:word_count]} words (#{result[:duration_secs]}s)" if (@success_count % 20).zero? when :skipped @skip_count += 1 when :error @error_count += 1 log_error "Failed ##{call_record.id}: #{result[:reason]} — #{result[:message]}" end sleep(rand(0.5..1.5)) rescue StandardError => e @error_count += 1 log_error "Unexpected error for ##{call_record.id}: #{e.}" ErrorReporting.error(e) end |
#on_complete ⇒ Object
84 85 86 |
# File 'app/workers/call_record_bulk_transcription_worker.rb', line 84 def on_complete log_info "Complete: #{@success_count} transcribed, #{@skip_count} skipped, #{@error_count} errors" end |