Class: AssistantChatWorker

Inherits:
Object
  • Object
show all
Includes:
AssistantChat::Broadcaster, Sidekiq::Job, Sidekiq::Status::Worker
Defined in:
app/workers/assistant_chat_worker.rb

Overview

Background worker for AI-powered assistant chat

Processes LLM queries asynchronously and broadcasts chunks via Turbo Streams.
This allows real-time streaming updates to the browser without blocking the web process.

Usage:
job_id = AssistantChatWorker.perform_async(
conversation_id: conversation.id,
user_message: "Show me sales by state",
model: "claude-sonnet"
)

Constant Summary collapse

HEARTBEAT_INTERVAL =

Heartbeat interval.

30
MAX_COMPACTION_RETRIES =

Maximum compaction retries.

2
ESCALATION_MODEL =

Model used when escalating from a low-tier model that stalled after tool calls.
A reliable non-Gemini model: escalating into another Gemini snapshot (the old
value, 'gemini-pro') risked the same intermittent body-less 400 the escalation
is trying to route around (#3808).

'claude-sonnet'

Constants included from AssistantChat::Broadcaster

AssistantChat::Broadcaster::STATUS_BROADCAST_INTERVAL, AssistantChat::Broadcaster::TOOLS_WITH_SUMMARIES, AssistantChat::Broadcaster::TOOL_DISPLAY_NAMES, AssistantChat::Broadcaster::TOOL_PREFIXES

Constants included from IconHelper

IconHelper::CUSTOM_ICON_MAP, IconHelper::CUSTOM_SVG_DIR, IconHelper::DEFAULT_FAMILY

Instance Method Summary collapse

Methods included from IconHelper

#account_nav_icon, #fa_icon, #star_rating_html

Instance Method Details

#perform(conversation_id, user_message, model, tool_services = [], user_context = {}, upload_ids = []) ⇒ Object

Parameters:

  • conversation_id (Integer)

    The conversation to process

  • user_message (String)

    The user's query

  • model (String)

    The LLM model key to use

  • tool_services (Array<String>) (defaults to: [])

    Optional service keys for tool access (e.g. ['content', 'app_db'])

  • user_context (Hash) (defaults to: {})

    Optional user identity for personalized queries (name, party_id, department, etc.)

  • upload_ids (Array<Integer>) (defaults to: [])

    Optional IDs of Upload records to attach to this message



28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
# File 'app/workers/assistant_chat_worker.rb', line 28

def perform(conversation_id, user_message, model, tool_services = [], user_context = {}, upload_ids = [])
  @conversation = AssistantConversation.find(conversation_id)
  @job_id = jid
  @cancelled = false
  @escalated = false
  @accumulated_content = ''
  @last_stream_broadcast_at = 0.0
  @tool_services = Array(tool_services)
  @column_metadata = {}
  @worker_started_at = Time.current
  @round_count = 0

  # Resolve the sending user from user_context for processing lock
  @sending_user = Party.find_by(id: user_context&.dig('party_id'))

  # Advisory lock prevents concurrent workers from processing the same conversation.
  # Non-blocking (timeout: 0) — if another worker is already processing, skip.
  result = AssistantConversation.with_advisory_lock_result(
    @conversation.processing_lock_key, timeout_seconds: 0
  ) do
    # Acquire the column-based processing lock (visible to UI).
    # If the advisory lock was obtained but the column lock fails, the previous
    # holder must have crashed without cleanup — force-take the lock.
    sender = @sending_user || Party.new(id: 0)
    unless @conversation.acquire_processing_lock!(sender, job_id: @job_id)
      Rails.logger.warn("[AssistantChatWorker] Stale processing lock on conversation #{@conversation.id} " \
                        "(by_id=#{@conversation.processing_by_id}, since=#{@conversation.processing_since}). " \
                        "Force-acquiring — advisory lock proves no other worker is active.")
      @conversation.force_processing_lock!(sender, job_id: @job_id)
    end

    begin
      broadcast_lock_state(locked: true)
      process_query(user_message, model, tool_services, user_context, Array(upload_ids))
    ensure
      @conversation.release_processing_lock!
      broadcast_lock_state(locked: false)
    end
  end

  # If the advisory lock was not acquired, another worker is already processing
  unless result&.lock_was_acquired?
    Rails.logger.info("[AssistantChatWorker] Skipped — conversation #{conversation_id} is locked by another worker")
    broadcast_error('This conversation is currently being processed. Please try again in a moment.')
  end

  # Schedule continuation AFTER all locks are released so the new worker
  # can immediately acquire the advisory lock without collision.
  if @continuation_pending
    schedule_continuation(
      @continuation_pending[:model],
      @continuation_pending[:tool_services],
      @continuation_pending[:user_context]
    )
  end
rescue StandardError => e
  response_body = e.respond_to?(:response) ? e.response&.body.to_s.truncate(500) : nil
  Rails.logger.error("[AssistantChatWorker] Error: #{e.message}\n#{e.backtrace.first(5).join("\n")}#{"\nResponse body: #{response_body}" if response_body}")
  @conversation&.release_processing_lock!
  @conversation&.track_error!
  # `user_id` is a filterable tag; the worker/conversation/model and the
  # provider response body ride along as custom data so they're visible on the
  # AppSignal sample. CAVEAT (#3808): Faraday does not buffer the response body
  # on STREAMING turns (assistant chat), so `response_body` is empty for those
  # — the generic provider 400 carries no body to capture. It is still
  # populated for non-streaming provider calls (e.g. invalid-model 400s).
  ErrorReporting.error(e,
                       source: :background,
                       user_id: @sending_user&.id,
                       custom_data: {
                         worker: 'AssistantChatWorker',
                         conversation_id: @conversation&.id,
                         model: model,
                         response_body: response_body
                       })
  # Pass the raw error message to broadcast_error so its pattern matchers can
  # produce the correct friendly copy (e.g. the /\Amodel:/i pattern for
  # invalid-model 400s). Generic unrecognised messages fall through as-is.
  broadcast_error(e.message)
ensure
  cleanup_attachment_tempfiles
end