Module: AssistantConversationProcessingLockable
- Extended by:
- ActiveSupport::Concern
- Included in:
- AssistantConversation
- Defined in:
- app/models/concerns/assistant_conversation_processing_lockable.rb
Instance Method Summary collapse
-
#acquire_processing_lock!(party, job_id: nil) ⇒ Object
Column-based lock provides UI-visible state (who is processing).
-
#force_processing_lock!(party, job_id: nil) ⇒ Object
Unconditionally take the processing lock, bypassing the stale check.
-
#heartbeat_stale? ⇒ Boolean
Heartbeat has stopped (worker likely killed) but lock hasn't expired yet.
- #processing? ⇒ Boolean
-
#processing_job_id ⇒ Object
Sidekiq job ID of the worker currently processing this conversation.
- #processing_lock_key ⇒ Object
- #refresh_processing_ttl! ⇒ Object
- #release_processing_lock! ⇒ Object
Instance Method Details
#acquire_processing_lock!(party, job_id: nil) ⇒ Object
Column-based lock provides UI-visible state (who is processing).
Paired with with_advisory_lock in the worker for true mutual exclusion.
8 9 10 11 12 13 14 15 16 17 18 19 20 |
# File 'app/models/concerns/assistant_conversation_processing_lockable.rb', line 8 def acquire_processing_lock!(party, job_id: nil) now = Time.current rows = self.class.where(id: id) .where('processing_by_id IS NULL OR processing_since < ?', AssistantConversation::LOCK_STALE_AFTER.ago) .update_all(processing_by_id: party.id, processing_since: now) acquired = rows == 1 if acquired self.processing_by_id = party.id self.processing_since = now store_processing_job_id!(job_id) if job_id end acquired end |
#force_processing_lock!(party, job_id: nil) ⇒ Object
Unconditionally take the processing lock, bypassing the stale check.
Only safe to call when the caller already holds the advisory lock,
which guarantees no other worker is actively processing.
25 26 27 28 29 30 31 |
# File 'app/models/concerns/assistant_conversation_processing_lockable.rb', line 25 def force_processing_lock!(party, job_id: nil) now = Time.current update_columns(processing_by_id: party.id, processing_since: now) self.processing_by_id = party.id self.processing_since = now store_processing_job_id!(job_id) if job_id end |
#heartbeat_stale? ⇒ Boolean
Heartbeat has stopped (worker likely killed) but lock hasn't expired yet.
Used by the processing_status endpoint so the client can show an early warning.
50 51 52 53 54 55 |
# File 'app/models/concerns/assistant_conversation_processing_lockable.rb', line 50 def heartbeat_stale? processing_by_id.present? && processing_since.present? && processing_since < AssistantConversation::HEARTBEAT_STALE_AFTER.ago && processing_since > AssistantConversation::LOCK_STALE_AFTER.ago end |
#processing? ⇒ Boolean
43 44 45 46 |
# File 'app/models/concerns/assistant_conversation_processing_lockable.rb', line 43 def processing? processing_by_id.present? && processing_since.present? && processing_since > AssistantConversation::LOCK_STALE_AFTER.ago end |
#processing_job_id ⇒ Object
Sidekiq job ID of the worker currently processing this conversation.
Stored in Redis to avoid JSONB contention with concurrent metadata writes.
59 60 61 |
# File 'app/models/concerns/assistant_conversation_processing_lockable.rb', line 59 def processing_job_id Sidekiq.redis { |conn| conn.get("assistant_processing_jid:#{id}") } end |
#processing_lock_key ⇒ Object
63 64 65 |
# File 'app/models/concerns/assistant_conversation_processing_lockable.rb', line 63 def processing_lock_key "assistant_conversation_#{id}" end |
#refresh_processing_ttl! ⇒ Object
67 68 69 70 71 72 73 |
# File 'app/models/concerns/assistant_conversation_processing_lockable.rb', line 67 def refresh_processing_ttl! Sidekiq.redis { |conn| conn.expire("assistant_processing_jid:#{id}", AssistantConversation::PROCESSING_JID_TTL) } # Use an atomic DB check to avoid ghost-lock race: a page-load may have # cleared processing_by_id while the worker still holds the in-memory copy. self.class.where(id: id).where.not(processing_by_id: nil) .update_all(processing_since: Time.current) end |
#release_processing_lock! ⇒ Object
33 34 35 36 37 38 39 40 41 |
# File 'app/models/concerns/assistant_conversation_processing_lockable.rb', line 33 def release_processing_lock! self.class.where(id: id).update_all( processing_by_id: nil, processing_since: nil ) clear_processing_job_id! self.processing_by_id = nil self.processing_since = nil end |