Skip to content

Rails Event Store (Pub/Sub Event Bus)

A persistent, auditable event bus for decoupled domain events across the application. Replaced the unmaintained wisper / wisper-sidekiq gems.

Domain events are published by models whenever something meaningful happens. Handlers subscribe to those events and react — synchronously or asynchronously — without the publisher needing to know who is listening.

Rails Event Store (RES) adds two important things over the old wisper setup:

  • Persistence. Every published event is written to the database before any handler runs. You have a permanent, queryable log of everything that happened.
  • First-class async. Async dispatch goes through ActiveJob (Sidekiq), with no separate companion gem and no YAML deserialization workarounds.
Models Event Store (RES) Handlers
────── ───────────────── ────────
Party ──► Events::ProductInterestChanged ──► Party::ProductInterestHandler (sync)
Customer ──► Events::BuyingGroupChanged ──► Party::BuyingGroupHandler (sync)
Address ──► Events::LocationChanged ──► Party::LocationHandler (sync)
Quote ──► Events::QuoteCompleted ──► Quote::QuoteCompletedHandler (sync)
EmailPreference ──► Events::EmailUnsubscribed ──► EmailUnsubscribeHandler (sync)
CatalogItem ──► Events::PriceUpdated ──► CatalogItem::PriceUpdatedHandler (async via Sidekiq)
Activity ──► Activity::Observer (called directly from AR callbacks — no event bus)

Defined in app/events/events.rb as thin subclasses of RailsEventStore::Event:

module Events
class ProductInterestChanged < RailsEventStore::Event; end
class BuyingGroupChanged < RailsEventStore::Event; end
class LocationChanged < RailsEventStore::Event; end
class QuoteCompleted < RailsEventStore::Event; end
class EmailUnsubscribed < RailsEventStore::Event; end
class PriceUpdated < RailsEventStore::Event; end
end

Each event carries a data: hash. The schemas are:

EventData fields
ProductInterestChangedparty_id, product_line_ids_added, product_line_ids_removed
BuyingGroupChangedparty_id, buying_group_id_added, buying_group_id_removed
LocationChangedparty_id, latitude, longitude
QuoteCompletedquote_id
EmailUnsubscribedemail, categories
PriceUpdatedcatalog_item_id, price_was, price_now

config/initializers/event_store.rb creates the global client and registers all subscriptions inside to_prepare (runs on boot and on code reload in development):

Rails.configuration.event_store = RailsEventStore::Client.new
Rails.configuration.to_prepare do
es = Rails.configuration.event_store
es.subscribe(Party::ProductInterestHandler, to: [Events::ProductInterestChanged])
es.subscribe(Party::BuyingGroupHandler, to: [Events::BuyingGroupChanged])
es.subscribe(Party::LocationHandler, to: [Events::LocationChanged])
es.subscribe(Quote::QuoteCompletedHandler, to: [Events::QuoteCompleted])
es.subscribe(EmailUnsubscribeHandler, to: [Events::EmailUnsubscribed])
es.subscribe(CatalogItem::PriceUpdatedHandler, to: [Events::PriceUpdated])
end

Access the store anywhere via Rails.configuration.event_store.

Models publish via Rails.configuration.event_store.publish(...). Each model has a private method that constructs and publishes the event:

Party (app/models/party.rb)

def broadcast_product_interest_changed(product_line_ids_added:, product_line_ids_removed:)
return unless product_line_ids_added.present? || product_line_ids_removed.present?
Rails.configuration.event_store.publish(
Events::ProductInterestChanged.new(data: { party_id: id, ... })
)
end

Customer (app/models/customer.rb) — buying_group_changed on buying group update.

Address (app/models/address.rb) — location_changed in after_save_commit when lat or lng changes.

Quote (app/models/quote.rb) — quote_complete in the state machine after_transition any - :complete => :complete block.

EmailPreference (app/models/email_preference.rb) — email_unsubscribed in after_save when one or more disable_* columns flip to true.

CatalogItem (app/models/catalog_item.rb) — price_updated in after_commit :notify_of_price_update when the amount column changes.

Handlers live in app/subscribers/ and implement #call(event).

All accept a RES event object and read data from event.data:

HandlerFileAction
Party::ProductInterestHandlerapp/subscribers/party/product_interest_handler.rbCalls Campaign::AssignDripCampaigns
Party::BuyingGroupHandlerapp/subscribers/party/buying_group_handler.rbCalls Campaign::AssignDripCampaigns
Party::LocationHandlerapp/subscribers/party/location_handler.rbCalls Campaign::AssignDripCampaigns
Quote::QuoteCompletedHandlerapp/subscribers/quote/quote_completed_handler.rbFollow-up, profiling, drip campaigns, large opp assignment, room completion
EmailUnsubscribeHandlerapp/subscribers/email_unsubscribe_handler.rbCancels open activities for the email address

CatalogItem::PriceUpdatedHandler (app/subscribers/catalog_item/price_updated_handler.rb) inherits from ApplicationJob and prepends RailsEventStore::AsyncHandler. When RES dispatches this event it calls perform_later, routing through Sidekiq.

class CatalogItem::PriceUpdatedHandler < ApplicationJob
prepend RailsEventStore::AsyncHandler
queue_as :default
def perform(event)
return if event.data[:price_was] == event.data[:price_now]
catalog_item = CatalogItem.find(event.data[:catalog_item_id])
catalog_item.dependent_catalog_items.each(&:item_specs_or_price_updated)
catalog_item.push_price_message
end
end

Because RailsEventStore::AsyncHandler#perform passes an event object directly when it’s not a Hash, unit tests can call .new.perform(event_object) without going through the job queue.

Activity does not publish RES events. Instead, after_create and after_update callbacks call Activity::Observer.new directly:

after_create { Activity::Observer.new.after_create(self) }
after_update { Activity::Observer.new.after_update(self) }

The reason is architectural: Activity::Observer relies heavily on ActiveRecord’s transient saved_change_to_* state (e.g. saved_change_to_activity_result_type_id?). RES events serialize data to a hash; a handler that reloads the record from the database loses this state. Direct invocation from the AR callback preserves it without workarounds.

The migration db/migrate/20260223223019_create_event_store_events.rb creates:

TablePurpose
event_store_eventsOne row per published event (UUID, event type, serialized data, timestamps)
event_store_events_in_streamsStream membership — maps events to named streams

Events are stored as YAML-serialized bytea. Use the RES read API to query them:

# All events of a type
Rails.configuration.event_store.read.of_type(Events::QuoteCompleted).to_a
# Recent events
Rails.configuration.event_store.read.limit(20).to_a
# Events in a stream (useful if you later publish to named streams)
Rails.configuration.event_store.read.stream("Quote-42").to_a
  1. Add an event class to app/events/events.rb:

    class MyThingHappened < RailsEventStore::Event; end
  2. Publish from the model:

    Rails.configuration.event_store.publish(
    Events::MyThingHappened.new(data: { record_id: id, ... })
    )
  3. Create a handler in app/subscribers/:

    class MyThingHandler
    def call(event)
    record = MyModel.find(event.data[:record_id])
    # do work
    end
    end
  4. Subscribe in config/initializers/event_store.rb:

    es.subscribe(MyThingHandler, to: [Events::MyThingHappened])
  5. Write tests in test/subscribers/my_thing_handler_test.rb calling handler.call(event) directly.

Call handlers directly — no need to involve the event store:

event = Events::ProductInterestChanged.new(data: {
party_id: party.id,
product_line_ids_added: [1, 2],
product_line_ids_removed: []
})
Party::ProductInterestHandler.new.call(event)

Use a fresh RailsEventStore::Client (no handlers subscribed) via capture_events helper defined in test/initializers/event_store_publishing_test.rb:

captured = capture_events(Events::ProductInterestChanged) do
party.broadcast_product_interest_changed(product_line_ids_added: [1])
end
assert_equal 1, captured.size

The helper wraps the block in EXTERNAL_SERVICE_STUB_MUTEX to safely swap Rails.configuration.event_store in the parallel test environment.

Every publish call includes a stream_name: so events are queryable per record:

ModelStream pattern
Order"Order-{id}"
Invoice"Invoice-{id}"
Rma"Rma-{id}"
Quote"Quote-{id}"
Party / Customer"Party-{id}"
CatalogItem"CatalogItem-{id}"
EmailPreference"EmailPreference-{id}"

Read a record’s stream in the console:

Rails.configuration.event_store.read.stream("Order-42").backward.to_a

Orders, Customers, Invoices, and RMAs each have an Events tab in the CRM. It displays the last 100 domain events from the record’s named stream, newest first. The tab is lazy-loaded via Turbo (no impact on page load time).

The RES Browser UI is mounted at /res (CRM subdomain, admin-only). It lets you browse all streams, inspect event payloads, and troubleshoot handler issues.

EventStorePurgeWorker runs nightly at 1 AM CT and deletes events older than EVENT_STORE_RETENTION_DAYS days (default: 90). Override by setting that env var in production.

  • RES documentation: https://railseventstore.org
  • Event classes: app/events/events.rb
  • Initializer / subscriptions: config/initializers/event_store.rb
  • Handler tests: test/subscribers/
  • Publishing tests: test/initializers/event_store_publishing_test.rb
  • Purge worker: app/workers/event_store_purge_worker.rb