Module: AssistantConversationProcessingLockable

Extended by:
ActiveSupport::Concern
Included in:
AssistantConversation
Defined in:
app/models/concerns/assistant_conversation_processing_lockable.rb

Instance Method Summary collapse

Instance Method Details

#acquire_processing_lock!(party, job_id: nil) ⇒ Object

Column-based lock provides UI-visible state (who is processing).
Paired with with_advisory_lock in the worker for true mutual exclusion.



8
9
10
11
12
13
14
15
16
17
18
19
20
# File 'app/models/concerns/assistant_conversation_processing_lockable.rb', line 8

def acquire_processing_lock!(party, job_id: nil)
  now = Time.current
  rows = self.class.where(id: id)
              .where('processing_by_id IS NULL OR processing_since < ?', AssistantConversation::LOCK_STALE_AFTER.ago)
              .update_all(processing_by_id: party.id, processing_since: now)
  acquired = rows == 1
  if acquired
    self.processing_by_id = party.id
    self.processing_since = now
    store_processing_job_id!(job_id) if job_id
  end
  acquired
end

#force_processing_lock!(party, job_id: nil) ⇒ Object

Unconditionally take the processing lock, bypassing the stale check.
Only safe to call when the caller already holds the advisory lock,
which guarantees no other worker is actively processing.



25
26
27
28
29
30
31
# File 'app/models/concerns/assistant_conversation_processing_lockable.rb', line 25

def force_processing_lock!(party, job_id: nil)
  now = Time.current
  update_columns(processing_by_id: party.id, processing_since: now)
  self.processing_by_id = party.id
  self.processing_since = now
  store_processing_job_id!(job_id) if job_id
end

#heartbeat_stale?Boolean

Heartbeat has stopped (worker likely killed) but lock hasn't expired yet.
Used by the processing_status endpoint so the client can show an early warning.

Returns:

  • (Boolean)


50
51
52
53
54
55
# File 'app/models/concerns/assistant_conversation_processing_lockable.rb', line 50

def heartbeat_stale?
  processing_by_id.present? &&
    processing_since.present? &&
    processing_since < AssistantConversation::HEARTBEAT_STALE_AFTER.ago &&
    processing_since > AssistantConversation::LOCK_STALE_AFTER.ago
end

#processing?Boolean

Returns:

  • (Boolean)


43
44
45
46
# File 'app/models/concerns/assistant_conversation_processing_lockable.rb', line 43

def processing?
  processing_by_id.present? && processing_since.present? &&
    processing_since > AssistantConversation::LOCK_STALE_AFTER.ago
end

#processing_job_idObject

Sidekiq job ID of the worker currently processing this conversation.
Stored in Redis to avoid JSONB contention with concurrent metadata writes.



59
60
61
# File 'app/models/concerns/assistant_conversation_processing_lockable.rb', line 59

def processing_job_id
  Sidekiq.redis { |conn| conn.get("assistant_processing_jid:#{id}") }
end

#processing_lock_keyObject



63
64
65
# File 'app/models/concerns/assistant_conversation_processing_lockable.rb', line 63

def processing_lock_key
  "assistant_conversation_#{id}"
end

#refresh_processing_ttl!Object



67
68
69
70
71
72
73
# File 'app/models/concerns/assistant_conversation_processing_lockable.rb', line 67

def refresh_processing_ttl!
  Sidekiq.redis { |conn| conn.expire("assistant_processing_jid:#{id}", AssistantConversation::PROCESSING_JID_TTL) }
  # Use an atomic DB check to avoid ghost-lock race: a page-load may have
  # cleared processing_by_id while the worker still holds the in-memory copy.
  self.class.where(id: id).where.not(processing_by_id: nil)
            .update_all(processing_since: Time.current)
end

#release_processing_lock!Object



33
34
35
36
37
38
39
40
41
# File 'app/models/concerns/assistant_conversation_processing_lockable.rb', line 33

def release_processing_lock!
  self.class.where(id: id).update_all(
    processing_by_id: nil,
    processing_since: nil
  )
  clear_processing_job_id!
  self.processing_by_id = nil
  self.processing_since = nil
end