Class: CallRecordBulkTranscriptionWorker
- Inherits:
-
Object
- Object
- CallRecordBulkTranscriptionWorker
- Includes:
- Sidekiq::IterableJob, Sidekiq::Job
- Defined in:
- app/workers/call_record_bulk_transcription_worker.rb
Overview
Nightly worker to backfill historical call record transcriptions using the
cheap Gemini 3.1 Flash-Lite native-audio model via RubyLLM (model resolved
from AiModelConstants.id(:transcription)).
Uses Sidekiq::IterableJob so progress is saved after each record — a deploy
or worker restart resumes from the last successful call rather than restarting.
Each iteration downloads the audio, transcribes synchronously, saves the
transcript, and queues an EmbeddingWorker job. This is intentionally serial
(not fan-out) to control API throughput and costs.
Processes newest-first (order: created_at DESC) within the eligible window —
any pending/error call older than min_age_days. The premium AssemblyAI
pipeline (at-import enqueue + DailyCallRecordTranscriptionWorker 24h catch-up
- StaleTranscriptionRecoveryWorker) gets first crack on recent calls; this
worker sweeps whatever it didn't complete once the call ages past min_age_days,
which is what closes the former 24h–365d coverage gap.
Scheduled: Nightly via config/sidekiq_production_schedule.yml
Constant Summary collapse
- DEFAULT_LIMIT =
Conservative default when no limit is passed. The scheduled entry overrides
this; raising the scheduled limit to drain the backlog faster is a
bulk-op decision (count-first + confirm) — see CLAUDE.md bulk-op protocol. 200- DEFAULT_MIN_AGE_DAYS =
Cheap pipeline handles any pending/error call older than this. The premium
AssemblyAI pipeline (at-import enqueue + DailyCallRecordTranscriptionWorker
24h catch-up + StaleTranscriptionRecoveryWorker + retries) gets first crack
on recent calls; a 7-day floor leaves that premium window comfortably clear,
and anything still un-transcribed after 7 days falls through to this cheap
backfill instead of sitting pending forever (the prior 365-day floor left a
24h–365d coverage gap). 7- MIN_DURATION_SECONDS =
Minimum duration seconds.
CallRecord::MIN_TRANSCRIPTION_DURATION
- MIN_DURATION_SECONDS_VOICEMAIL =
Minimum duration seconds voicemail.
CallRecord::MIN_TRANSCRIPTION_DURATION_VOICEMAIL
Instance Method Summary collapse
- #build_enumerator(options = nil, cursor:) ⇒ Object
- #each_iteration(call_record, *_args) ⇒ Object
- #on_complete ⇒ Object
Instance Method Details
#build_enumerator(options = nil, cursor:) ⇒ Object
58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 |
# File 'app/workers/call_record_bulk_transcription_worker.rb', line 58 def build_enumerator( = nil, cursor:) opts = .to_h.with_indifferent_access @limit = (opts[:limit] || DEFAULT_LIMIT).to_i @model = opts[:model] || CallRecordProcessing::BulkTranscriptionService::DEFAULT_MODEL @min_age_days = (opts[:min_age_days] || DEFAULT_MIN_AGE_DAYS).to_i @success_count = 0 @skip_count = 0 @error_count = 0 scope = build_scope candidate_count = scope.count log_info "Starting bulk transcription: #{candidate_count} candidates, limit #{@limit}, model #{@model}, min_age #{@min_age_days}d" return nil if candidate_count.zero? active_record_records_enumerator(scope.limit(@limit), cursor: cursor) end |
#each_iteration(call_record, *_args) ⇒ Object
76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 |
# File 'app/workers/call_record_bulk_transcription_worker.rb', line 76 def each_iteration(call_record, *_args) service = CallRecordProcessing::BulkTranscriptionService.new(call_record, model: @model) result = service.transcribe case result[:status] when :success @success_count += 1 log_info "Transcribed ##{call_record.id}: #{result[:word_count]} words (#{result[:duration_secs]}s)" if (@success_count % 20).zero? when :skipped @skip_count += 1 when :error @error_count += 1 log_error "Failed ##{call_record.id}: #{result[:reason]} — #{result[:message]}" end sleep(rand(0.5..1.5)) rescue StandardError => e @error_count += 1 log_error "Unexpected error for ##{call_record.id}: #{e.}" ErrorReporting.error(e) end |
#on_complete ⇒ Object
98 99 100 |
# File 'app/workers/call_record_bulk_transcription_worker.rb', line 98 def on_complete log_info "Complete: #{@success_count} transcribed, #{@skip_count} skipped, #{@error_count} errors" end |