Class: AssistantChatWorker
- Inherits:
-
Object
- Object
- AssistantChatWorker
- Includes:
- AssistantChat::Broadcaster, Sidekiq::Job, Sidekiq::Status::Worker
- Defined in:
- app/workers/assistant_chat_worker.rb
Overview
Background worker for AI-powered assistant chat
Processes LLM queries asynchronously and broadcasts chunks via Turbo Streams.
This allows real-time streaming updates to the browser without blocking the web process.
Usage:
job_id = AssistantChatWorker.perform_async(
conversation_id: conversation.id,
user_message: "Show me sales by state",
model: "claude-sonnet"
)
Constant Summary collapse
- HEARTBEAT_INTERVAL =
Heartbeat interval.
30- MAX_COMPACTION_RETRIES =
Maximum compaction retries.
2- ESCALATION_MODEL =
Model used when escalating from a low-tier model that stalled after tool calls.
A reliable non-Gemini model: escalating into another Gemini snapshot (the old
value, 'gemini-pro') risked the same intermittent body-less 400 the escalation
is trying to route around (#3808). 'claude-sonnet'
Constants included from AssistantChat::Broadcaster
AssistantChat::Broadcaster::STATUS_BROADCAST_INTERVAL, AssistantChat::Broadcaster::TOOLS_WITH_SUMMARIES, AssistantChat::Broadcaster::TOOL_DISPLAY_NAMES, AssistantChat::Broadcaster::TOOL_PREFIXES
Constants included from IconHelper
IconHelper::CUSTOM_ICON_MAP, IconHelper::CUSTOM_SVG_DIR, IconHelper::DEFAULT_FAMILY
Instance Method Summary collapse
Methods included from IconHelper
#account_nav_icon, #fa_icon, #star_rating_html
Instance Method Details
#perform(conversation_id, user_message, model, tool_services = [], user_context = {}, upload_ids = []) ⇒ Object
28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 |
# File 'app/workers/assistant_chat_worker.rb', line 28 def perform(conversation_id, , model, tool_services = [], user_context = {}, upload_ids = []) @conversation = AssistantConversation.find(conversation_id) @job_id = jid @cancelled = false @escalated = false @accumulated_content = '' @last_stream_broadcast_at = 0.0 @tool_services = Array(tool_services) @column_metadata = {} @worker_started_at = Time.current @round_count = 0 # Resolve the sending user from user_context for processing lock @sending_user = Party.find_by(id: user_context&.dig('party_id')) # Advisory lock prevents concurrent workers from processing the same conversation. # Non-blocking (timeout: 0) — if another worker is already processing, skip. result = AssistantConversation.with_advisory_lock_result( @conversation.processing_lock_key, timeout_seconds: 0 ) do # Acquire the column-based processing lock (visible to UI). # If the advisory lock was obtained but the column lock fails, the previous # holder must have crashed without cleanup — force-take the lock. sender = @sending_user || Party.new(id: 0) unless @conversation.acquire_processing_lock!(sender, job_id: @job_id) Rails.logger.warn("[AssistantChatWorker] Stale processing lock on conversation #{@conversation.id} " \ "(by_id=#{@conversation.processing_by_id}, since=#{@conversation.processing_since}). " \ "Force-acquiring — advisory lock proves no other worker is active.") @conversation.force_processing_lock!(sender, job_id: @job_id) end begin broadcast_lock_state(locked: true) process_query(, model, tool_services, user_context, Array(upload_ids)) ensure @conversation.release_processing_lock! broadcast_lock_state(locked: false) end end # If the advisory lock was not acquired, another worker is already processing unless result&.lock_was_acquired? Rails.logger.info("[AssistantChatWorker] Skipped — conversation #{conversation_id} is locked by another worker") broadcast_error('This conversation is currently being processed. Please try again in a moment.') end # Schedule continuation AFTER all locks are released so the new worker # can immediately acquire the advisory lock without collision. if @continuation_pending schedule_continuation( @continuation_pending[:model], @continuation_pending[:tool_services], @continuation_pending[:user_context] ) end rescue StandardError => e response_body = e.respond_to?(:response) ? e.response&.body.to_s.truncate(500) : nil Rails.logger.error("[AssistantChatWorker] Error: #{e.}\n#{e.backtrace.first(5).join("\n")}#{"\nResponse body: #{response_body}" if response_body}") @conversation&.release_processing_lock! @conversation&.track_error! # `user_id` is a filterable tag; the worker/conversation/model and the # provider response body ride along as custom data so they're visible on the # AppSignal sample. CAVEAT (#3808): Faraday does not buffer the response body # on STREAMING turns (assistant chat), so `response_body` is empty for those # — the generic provider 400 carries no body to capture. It is still # populated for non-streaming provider calls (e.g. invalid-model 400s). ErrorReporting.error(e, source: :background, user_id: @sending_user&.id, custom_data: { worker: 'AssistantChatWorker', conversation_id: @conversation&.id, model: model, response_body: response_body }) # Pass the raw error message to broadcast_error so its pattern matchers can # produce the correct friendly copy (e.g. the /\Amodel:/i pattern for # invalid-model 400s). Generic unrecognised messages fall through as-is. broadcast_error(e.) ensure end |