Class: AssistantChatWorker

Inherits:
Object
  • Object
show all
Includes:
AssistantChat::Broadcaster, Sidekiq::Job, Sidekiq::Status::Worker
Defined in:
app/workers/assistant_chat_worker.rb

Overview

Background worker for AI-powered assistant chat

Processes LLM queries asynchronously and broadcasts chunks via Turbo Streams.
This allows real-time streaming updates to the browser without blocking the web process.

Usage:
job_id = AssistantChatWorker.perform_async(
conversation_id: conversation.id,
user_message: "Show me sales by state",
model: "claude-sonnet"
)

Constant Summary collapse

HEARTBEAT_INTERVAL =
30
MAX_COMPACTION_RETRIES =
2

Constants included from AssistantChat::Broadcaster

AssistantChat::Broadcaster::STATUS_BROADCAST_INTERVAL, AssistantChat::Broadcaster::TOOLS_WITH_SUMMARIES, AssistantChat::Broadcaster::TOOL_DISPLAY_NAMES

Instance Method Summary collapse

Instance Method Details

#perform(conversation_id, user_message, model, tool_services = [], user_context = {}, upload_ids = []) ⇒ Object

Parameters:

  • conversation_id (Integer)

    The conversation to process

  • user_message (String)

    The user's query

  • model (String)

    The LLM model key to use

  • tool_services (Array<String>) (defaults to: [])

    Optional service keys for tool access (e.g. ['content', 'app_db'])

  • user_context (Hash) (defaults to: {})

    Optional user identity for personalized queries (name, party_id, department, etc.)

  • upload_ids (Array<Integer>) (defaults to: [])

    Optional IDs of Upload records to attach to this message



28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
# File 'app/workers/assistant_chat_worker.rb', line 28

def perform(conversation_id, user_message, model, tool_services = [], user_context = {}, upload_ids = [])
  @conversation = AssistantConversation.find(conversation_id)
  @job_id = jid
  @cancelled = false
  @accumulated_content = ''
  @last_stream_broadcast_at = 0.0
  @tool_services = Array(tool_services)
  @column_metadata = {}
  @worker_started_at = Time.current

  # Resolve the sending user from user_context for processing lock
  @sending_user = Party.find_by(id: user_context&.dig('party_id'))

  # Advisory lock prevents concurrent workers from processing the same conversation.
  # Non-blocking (timeout: 0) — if another worker is already processing, skip.
  result = AssistantConversation.with_advisory_lock_result(
    @conversation.processing_lock_key, timeout_seconds: 0
  ) do
    # Acquire the column-based processing lock (visible to UI).
    # If the advisory lock was obtained but the column lock fails, the previous
    # holder must have crashed without cleanup — force-take the lock.
    sender = @sending_user || Party.new(id: 0)
    unless @conversation.acquire_processing_lock!(sender, job_id: @job_id)
      Rails.logger.warn("[AssistantChatWorker] Stale processing lock on conversation #{@conversation.id} " \
                        "(by_id=#{@conversation.processing_by_id}, since=#{@conversation.processing_since}). " \
                        "Force-acquiring — advisory lock proves no other worker is active.")
      @conversation.force_processing_lock!(sender, job_id: @job_id)
    end

    begin
      broadcast_lock_state(locked: true)
      process_query(user_message, model, tool_services, user_context, Array(upload_ids))
    ensure
      @conversation.release_processing_lock!
      broadcast_lock_state(locked: false)
    end
  end

  # If the advisory lock was not acquired, another worker is already processing
  unless result&.lock_was_acquired?
    Rails.logger.info("[AssistantChatWorker] Skipped — conversation #{conversation_id} is locked by another worker")
    broadcast_error('This conversation is currently being processed. Please try again in a moment.')
  end

  # Schedule continuation AFTER all locks are released so the new worker
  # can immediately acquire the advisory lock without collision.
  if @continuation_pending
    schedule_continuation(
      @continuation_pending[:model],
      @continuation_pending[:tool_services],
      @continuation_pending[:user_context]
    )
  end
rescue StandardError => e
  response_body = e.respond_to?(:response) ? e.response&.body.to_s.truncate(500) : nil
  Rails.logger.error("[AssistantChatWorker] Error: #{e.message}\n#{e.backtrace.first(5).join("\n")}#{response_body ? "\nResponse body: #{response_body}" : ''}")
  @conversation&.release_processing_lock!
  @conversation&.track_error!
  ErrorReporting.error(e, source: :background, context: {
    conversation_id: @conversation&.id,
    user_id: @sending_user&.id,
    worker: 'AssistantChatWorker',
    response_body: response_body
  })
  # Pass the raw error message to broadcast_error so its pattern matchers can
  # produce the correct friendly copy (e.g. the /\Amodel:/i pattern for
  # invalid-model 400s). Generic unrecognised messages fall through as-is.
  broadcast_error(e.message)
ensure
  cleanup_attachment_tempfiles
end