Class: AssistantChatWorker
- Inherits:
-
Object
- Object
- AssistantChatWorker
- Includes:
- AssistantChat::Broadcaster, Sidekiq::Job, Sidekiq::Status::Worker
- Defined in:
- app/workers/assistant_chat_worker.rb
Overview
Background worker for AI-powered assistant chat
Processes LLM queries asynchronously and broadcasts chunks via Turbo Streams.
This allows real-time streaming updates to the browser without blocking the web process.
Usage:
job_id = AssistantChatWorker.perform_async(
conversation_id: conversation.id,
user_message: "Show me sales by state",
model: "claude-sonnet"
)
Constant Summary collapse
- HEARTBEAT_INTERVAL =
30- MAX_COMPACTION_RETRIES =
2
Constants included from AssistantChat::Broadcaster
AssistantChat::Broadcaster::STATUS_BROADCAST_INTERVAL, AssistantChat::Broadcaster::TOOLS_WITH_SUMMARIES, AssistantChat::Broadcaster::TOOL_DISPLAY_NAMES
Instance Method Summary collapse
Instance Method Details
#perform(conversation_id, user_message, model, tool_services = [], user_context = {}, upload_ids = []) ⇒ Object
28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 |
# File 'app/workers/assistant_chat_worker.rb', line 28 def perform(conversation_id, , model, tool_services = [], user_context = {}, upload_ids = []) @conversation = AssistantConversation.find(conversation_id) @job_id = jid @cancelled = false @accumulated_content = '' @last_stream_broadcast_at = 0.0 @tool_services = Array(tool_services) @column_metadata = {} @worker_started_at = Time.current # Resolve the sending user from user_context for processing lock @sending_user = Party.find_by(id: user_context&.dig('party_id')) # Advisory lock prevents concurrent workers from processing the same conversation. # Non-blocking (timeout: 0) — if another worker is already processing, skip. result = AssistantConversation.with_advisory_lock_result( @conversation.processing_lock_key, timeout_seconds: 0 ) do # Acquire the column-based processing lock (visible to UI). # If the advisory lock was obtained but the column lock fails, the previous # holder must have crashed without cleanup — force-take the lock. sender = @sending_user || Party.new(id: 0) unless @conversation.acquire_processing_lock!(sender, job_id: @job_id) Rails.logger.warn("[AssistantChatWorker] Stale processing lock on conversation #{@conversation.id} " \ "(by_id=#{@conversation.processing_by_id}, since=#{@conversation.processing_since}). " \ "Force-acquiring — advisory lock proves no other worker is active.") @conversation.force_processing_lock!(sender, job_id: @job_id) end begin broadcast_lock_state(locked: true) process_query(, model, tool_services, user_context, Array(upload_ids)) ensure @conversation.release_processing_lock! broadcast_lock_state(locked: false) end end # If the advisory lock was not acquired, another worker is already processing unless result&.lock_was_acquired? Rails.logger.info("[AssistantChatWorker] Skipped — conversation #{conversation_id} is locked by another worker") broadcast_error('This conversation is currently being processed. Please try again in a moment.') end # Schedule continuation AFTER all locks are released so the new worker # can immediately acquire the advisory lock without collision. if @continuation_pending schedule_continuation( @continuation_pending[:model], @continuation_pending[:tool_services], @continuation_pending[:user_context] ) end rescue StandardError => e response_body = e.respond_to?(:response) ? e.response&.body.to_s.truncate(500) : nil Rails.logger.error("[AssistantChatWorker] Error: #{e.}\n#{e.backtrace.first(5).join("\n")}#{response_body ? "\nResponse body: #{response_body}" : ''}") @conversation&.release_processing_lock! @conversation&.track_error! ErrorReporting.error(e, source: :background, context: { conversation_id: @conversation&.id, user_id: @sending_user&.id, worker: 'AssistantChatWorker', response_body: response_body }) # Pass the raw error message to broadcast_error so its pattern matchers can # produce the correct friendly copy (e.g. the /\Amodel:/i pattern for # invalid-model 400s). Generic unrecognised messages fall through as-is. broadcast_error(e.) ensure end |