Module: AssistantConversationProcessingLockable

Extended by:
ActiveSupport::Concern
Included in:
AssistantConversation
Defined in:
app/models/concerns/assistant_conversation_processing_lockable.rb

Overview

Concern mixed into a model: assistant conversation processing lockable.

Instance Method Summary collapse

Instance Method Details

#acquire_processing_lock!(party, job_id: nil) ⇒ Object

Column-based lock provides UI-visible state (who is processing).
Paired with with_advisory_lock in the worker for true mutual exclusion.



9
10
11
12
13
14
15
16
17
18
19
20
21
# File 'app/models/concerns/assistant_conversation_processing_lockable.rb', line 9

def acquire_processing_lock!(party, job_id: nil)
  now = Time.current
  rows = self.class.where(id: id)
             .where('processing_by_id IS NULL OR processing_since < ?', AssistantConversation::LOCK_STALE_AFTER.ago)
             .update_all(processing_by_id: party.id, processing_since: now)
  acquired = rows == 1
  if acquired
    self.processing_by_id = party.id
    self.processing_since = now
    store_processing_job_id!(job_id) if job_id
  end
  acquired
end

#force_processing_lock!(party, job_id: nil) ⇒ Object

Unconditionally take the processing lock, bypassing the stale check.
Only safe to call when the caller already holds the advisory lock,
which guarantees no other worker is actively processing.



26
27
28
29
30
31
32
# File 'app/models/concerns/assistant_conversation_processing_lockable.rb', line 26

def force_processing_lock!(party, job_id: nil)
  now = Time.current
  update_columns(processing_by_id: party.id, processing_since: now)
  self.processing_by_id = party.id
  self.processing_since = now
  store_processing_job_id!(job_id) if job_id
end

#heartbeat_stale?Boolean

Heartbeat has stopped (worker likely killed) but lock hasn't expired yet.
Used by the processing_status endpoint so the client can show an early warning.

Returns:

  • (Boolean)


51
52
53
54
55
56
# File 'app/models/concerns/assistant_conversation_processing_lockable.rb', line 51

def heartbeat_stale?
  processing_by_id.present? &&
    processing_since.present? &&
    processing_since < AssistantConversation::HEARTBEAT_STALE_AFTER.ago &&
    processing_since > AssistantConversation::LOCK_STALE_AFTER.ago
end

#processing?Boolean

Returns:

  • (Boolean)


44
45
46
47
# File 'app/models/concerns/assistant_conversation_processing_lockable.rb', line 44

def processing?
  processing_by_id.present? && processing_since.present? &&
    processing_since > AssistantConversation::LOCK_STALE_AFTER.ago
end

#processing_job_idObject

Sidekiq job ID of the worker currently processing this conversation.
Stored in Redis to avoid JSONB contention with concurrent metadata writes.



60
61
62
# File 'app/models/concerns/assistant_conversation_processing_lockable.rb', line 60

def processing_job_id
  Sidekiq.redis { |conn| conn.get("assistant_processing_jid:#{id}") }
end

#processing_lock_keyObject



64
65
66
# File 'app/models/concerns/assistant_conversation_processing_lockable.rb', line 64

def processing_lock_key
  "assistant_conversation_#{id}"
end

#refresh_processing_ttl!Object



68
69
70
71
72
73
74
# File 'app/models/concerns/assistant_conversation_processing_lockable.rb', line 68

def refresh_processing_ttl!
  Sidekiq.redis { |conn| conn.expire("assistant_processing_jid:#{id}", AssistantConversation::PROCESSING_JID_TTL) }
  # Use an atomic DB check to avoid ghost-lock race: a page-load may have
  # cleared processing_by_id while the worker still holds the in-memory copy.
  self.class.where(id: id).where.not(processing_by_id: nil)
      .update_all(processing_since: Time.current)
end

#release_processing_lock!Object



34
35
36
37
38
39
40
41
42
# File 'app/models/concerns/assistant_conversation_processing_lockable.rb', line 34

def release_processing_lock!
  self.class.where(id: id).update_all(
    processing_by_id: nil,
    processing_since: nil
  )
  clear_processing_job_id!
  self.processing_by_id = nil
  self.processing_since = nil
end