Module: AssistantConversationProcessingLockable
- Extended by:
- ActiveSupport::Concern
- Included in:
- AssistantConversation
- Defined in:
- app/models/concerns/assistant_conversation_processing_lockable.rb
Overview
Concern mixed into a model: assistant conversation processing lockable.
Instance Method Summary collapse
-
#acquire_processing_lock!(party, job_id: nil) ⇒ Object
Column-based lock provides UI-visible state (who is processing).
-
#force_processing_lock!(party, job_id: nil) ⇒ Object
Unconditionally take the processing lock, bypassing the stale check.
-
#heartbeat_stale? ⇒ Boolean
Heartbeat has stopped (worker likely killed) but lock hasn't expired yet.
- #processing? ⇒ Boolean
-
#processing_job_id ⇒ Object
Sidekiq job ID of the worker currently processing this conversation.
- #processing_lock_key ⇒ Object
- #refresh_processing_ttl! ⇒ Object
- #release_processing_lock! ⇒ Object
Instance Method Details
#acquire_processing_lock!(party, job_id: nil) ⇒ Object
Column-based lock provides UI-visible state (who is processing).
Paired with with_advisory_lock in the worker for true mutual exclusion.
9 10 11 12 13 14 15 16 17 18 19 20 21 |
# File 'app/models/concerns/assistant_conversation_processing_lockable.rb', line 9 def acquire_processing_lock!(party, job_id: nil) now = Time.current rows = self.class.where(id: id) .where('processing_by_id IS NULL OR processing_since < ?', AssistantConversation::LOCK_STALE_AFTER.ago) .update_all(processing_by_id: party.id, processing_since: now) acquired = rows == 1 if acquired self.processing_by_id = party.id self.processing_since = now store_processing_job_id!(job_id) if job_id end acquired end |
#force_processing_lock!(party, job_id: nil) ⇒ Object
Unconditionally take the processing lock, bypassing the stale check.
Only safe to call when the caller already holds the advisory lock,
which guarantees no other worker is actively processing.
26 27 28 29 30 31 32 |
# File 'app/models/concerns/assistant_conversation_processing_lockable.rb', line 26 def force_processing_lock!(party, job_id: nil) now = Time.current update_columns(processing_by_id: party.id, processing_since: now) self.processing_by_id = party.id self.processing_since = now store_processing_job_id!(job_id) if job_id end |
#heartbeat_stale? ⇒ Boolean
Heartbeat has stopped (worker likely killed) but lock hasn't expired yet.
Used by the processing_status endpoint so the client can show an early warning.
51 52 53 54 55 56 |
# File 'app/models/concerns/assistant_conversation_processing_lockable.rb', line 51 def heartbeat_stale? processing_by_id.present? && processing_since.present? && processing_since < AssistantConversation::HEARTBEAT_STALE_AFTER.ago && processing_since > AssistantConversation::LOCK_STALE_AFTER.ago end |
#processing? ⇒ Boolean
44 45 46 47 |
# File 'app/models/concerns/assistant_conversation_processing_lockable.rb', line 44 def processing? processing_by_id.present? && processing_since.present? && processing_since > AssistantConversation::LOCK_STALE_AFTER.ago end |
#processing_job_id ⇒ Object
Sidekiq job ID of the worker currently processing this conversation.
Stored in Redis to avoid JSONB contention with concurrent metadata writes.
60 61 62 |
# File 'app/models/concerns/assistant_conversation_processing_lockable.rb', line 60 def processing_job_id Sidekiq.redis { |conn| conn.get("assistant_processing_jid:#{id}") } end |
#processing_lock_key ⇒ Object
64 65 66 |
# File 'app/models/concerns/assistant_conversation_processing_lockable.rb', line 64 def processing_lock_key "assistant_conversation_#{id}" end |
#refresh_processing_ttl! ⇒ Object
68 69 70 71 72 73 74 |
# File 'app/models/concerns/assistant_conversation_processing_lockable.rb', line 68 def refresh_processing_ttl! Sidekiq.redis { |conn| conn.expire("assistant_processing_jid:#{id}", AssistantConversation::PROCESSING_JID_TTL) } # Use an atomic DB check to avoid ghost-lock race: a page-load may have # cleared processing_by_id while the worker still holds the in-memory copy. self.class.where(id: id).where.not(processing_by_id: nil) .update_all(processing_since: Time.current) end |
#release_processing_lock! ⇒ Object
34 35 36 37 38 39 40 41 42 |
# File 'app/models/concerns/assistant_conversation_processing_lockable.rb', line 34 def release_processing_lock! self.class.where(id: id).update_all( processing_by_id: nil, processing_since: nil ) clear_processing_job_id! self.processing_by_id = nil self.processing_since = nil end |