Rails Event Store (Pub/Sub Event Bus)
A persistent, auditable event bus for decoupled domain events across the application.
Replaced the unmaintained wisper / wisper-sidekiq gems.
Overview
Section titled “Overview”Domain events are published by models whenever something meaningful happens. Handlers subscribe to those events and react — synchronously or asynchronously — without the publisher needing to know who is listening.
Rails Event Store (RES) adds two important things over the old wisper setup:
- Persistence. Every published event is written to the database before any handler runs. You have a permanent, queryable log of everything that happened.
- First-class async. Async dispatch goes through ActiveJob (Sidekiq), with no separate companion gem and no YAML deserialization workarounds.
Architecture
Section titled “Architecture”Models Event Store (RES) Handlers────── ───────────────── ────────Party ──► Events::ProductInterestChanged ──► Party::ProductInterestHandler (sync)Customer ──► Events::BuyingGroupChanged ──► Party::BuyingGroupHandler (sync)Address ──► Events::LocationChanged ──► Party::LocationHandler (sync)Quote ──► Events::QuoteCompleted ──► Quote::QuoteCompletedHandler (sync)EmailPreference ──► Events::EmailUnsubscribed ──► EmailUnsubscribeHandler (sync)CatalogItem ──► Events::PriceUpdated ──► CatalogItem::PriceUpdatedHandler (async via Sidekiq)
Activity ──► Activity::Observer (called directly from AR callbacks — no event bus)Event Classes
Section titled “Event Classes”Defined in app/events/events.rb as thin subclasses of RailsEventStore::Event:
module Events class ProductInterestChanged < RailsEventStore::Event; end class BuyingGroupChanged < RailsEventStore::Event; end class LocationChanged < RailsEventStore::Event; end class QuoteCompleted < RailsEventStore::Event; end class EmailUnsubscribed < RailsEventStore::Event; end class PriceUpdated < RailsEventStore::Event; endendEach event carries a data: hash. The schemas are:
| Event | Data fields |
|---|---|
ProductInterestChanged | party_id, product_line_ids_added, product_line_ids_removed |
BuyingGroupChanged | party_id, buying_group_id_added, buying_group_id_removed |
LocationChanged | party_id, latitude, longitude |
QuoteCompleted | quote_id |
EmailUnsubscribed | email, categories |
PriceUpdated | catalog_item_id, price_was, price_now |
Configuration
Section titled “Configuration”config/initializers/event_store.rb creates the global client and registers
all subscriptions inside to_prepare (runs on boot and on code reload in development):
Rails.configuration.event_store = RailsEventStore::Client.new
Rails.configuration.to_prepare do es = Rails.configuration.event_store es.subscribe(Party::ProductInterestHandler, to: [Events::ProductInterestChanged]) es.subscribe(Party::BuyingGroupHandler, to: [Events::BuyingGroupChanged]) es.subscribe(Party::LocationHandler, to: [Events::LocationChanged]) es.subscribe(Quote::QuoteCompletedHandler, to: [Events::QuoteCompleted]) es.subscribe(EmailUnsubscribeHandler, to: [Events::EmailUnsubscribed]) es.subscribe(CatalogItem::PriceUpdatedHandler, to: [Events::PriceUpdated])endAccess the store anywhere via Rails.configuration.event_store.
Publishing Events
Section titled “Publishing Events”Models publish via Rails.configuration.event_store.publish(...).
Each model has a private method that constructs and publishes the event:
Party (app/models/party.rb)
def broadcast_product_interest_changed(product_line_ids_added:, product_line_ids_removed:) return unless product_line_ids_added.present? || product_line_ids_removed.present? Rails.configuration.event_store.publish( Events::ProductInterestChanged.new(data: { party_id: id, ... }) )endCustomer (app/models/customer.rb) — buying_group_changed on buying group update.
Address (app/models/address.rb) — location_changed in after_save_commit
when lat or lng changes.
Quote (app/models/quote.rb) — quote_complete in the state machine
after_transition any - :complete => :complete block.
EmailPreference (app/models/email_preference.rb) — email_unsubscribed
in after_save when one or more disable_* columns flip to true.
CatalogItem (app/models/catalog_item.rb) — price_updated in
after_commit :notify_of_price_update when the amount column changes.
Handlers
Section titled “Handlers”Handlers live in app/subscribers/ and implement #call(event).
Sync handlers
Section titled “Sync handlers”All accept a RES event object and read data from event.data:
| Handler | File | Action |
|---|---|---|
Party::ProductInterestHandler | app/subscribers/party/product_interest_handler.rb | Calls Campaign::AssignDripCampaigns |
Party::BuyingGroupHandler | app/subscribers/party/buying_group_handler.rb | Calls Campaign::AssignDripCampaigns |
Party::LocationHandler | app/subscribers/party/location_handler.rb | Calls Campaign::AssignDripCampaigns |
Quote::QuoteCompletedHandler | app/subscribers/quote/quote_completed_handler.rb | Follow-up, profiling, drip campaigns, large opp assignment, room completion |
EmailUnsubscribeHandler | app/subscribers/email_unsubscribe_handler.rb | Cancels open activities for the email address |
Async handler
Section titled “Async handler”CatalogItem::PriceUpdatedHandler (app/subscribers/catalog_item/price_updated_handler.rb)
inherits from ApplicationJob and prepends RailsEventStore::AsyncHandler.
When RES dispatches this event it calls perform_later, routing through Sidekiq.
class CatalogItem::PriceUpdatedHandler < ApplicationJob prepend RailsEventStore::AsyncHandler queue_as :default
def perform(event) return if event.data[:price_was] == event.data[:price_now] catalog_item = CatalogItem.find(event.data[:catalog_item_id]) catalog_item.dependent_catalog_items.each(&:item_specs_or_price_updated) catalog_item.push_price_message endendBecause RailsEventStore::AsyncHandler#perform passes an event object directly
when it’s not a Hash, unit tests can call .new.perform(event_object) without
going through the job queue.
Activity::Observer (not on the event bus)
Section titled “Activity::Observer (not on the event bus)”Activity does not publish RES events. Instead, after_create and
after_update callbacks call Activity::Observer.new directly:
after_create { Activity::Observer.new.after_create(self) }after_update { Activity::Observer.new.after_update(self) }The reason is architectural: Activity::Observer relies heavily on ActiveRecord’s
transient saved_change_to_* state (e.g. saved_change_to_activity_result_type_id?).
RES events serialize data to a hash; a handler that reloads the record from the
database loses this state. Direct invocation from the AR callback preserves it
without workarounds.
Database Tables
Section titled “Database Tables”The migration db/migrate/20260223223019_create_event_store_events.rb creates:
| Table | Purpose |
|---|---|
event_store_events | One row per published event (UUID, event type, serialized data, timestamps) |
event_store_events_in_streams | Stream membership — maps events to named streams |
Events are stored as YAML-serialized bytea. Use the RES read API to query them:
# All events of a typeRails.configuration.event_store.read.of_type(Events::QuoteCompleted).to_a
# Recent eventsRails.configuration.event_store.read.limit(20).to_a
# Events in a stream (useful if you later publish to named streams)Rails.configuration.event_store.read.stream("Quote-42").to_aAdding a New Event
Section titled “Adding a New Event”-
Add an event class to
app/events/events.rb:class MyThingHappened < RailsEventStore::Event; end -
Publish from the model:
Rails.configuration.event_store.publish(Events::MyThingHappened.new(data: { record_id: id, ... })) -
Create a handler in
app/subscribers/:class MyThingHandlerdef call(event)record = MyModel.find(event.data[:record_id])# do workendend -
Subscribe in
config/initializers/event_store.rb:es.subscribe(MyThingHandler, to: [Events::MyThingHappened]) -
Write tests in
test/subscribers/my_thing_handler_test.rbcallinghandler.call(event)directly.
Testing
Section titled “Testing”Handler unit tests
Section titled “Handler unit tests”Call handlers directly — no need to involve the event store:
event = Events::ProductInterestChanged.new(data: { party_id: party.id, product_line_ids_added: [1, 2], product_line_ids_removed: []})Party::ProductInterestHandler.new.call(event)Publishing tests
Section titled “Publishing tests”Use a fresh RailsEventStore::Client (no handlers subscribed) via
capture_events helper defined in test/initializers/event_store_publishing_test.rb:
captured = capture_events(Events::ProductInterestChanged) do party.broadcast_product_interest_changed(product_line_ids_added: [1])endassert_equal 1, captured.sizeThe helper wraps the block in EXTERNAL_SERVICE_STUB_MUTEX to safely swap
Rails.configuration.event_store in the parallel test environment.
Named Streams
Section titled “Named Streams”Every publish call includes a stream_name: so events are queryable per record:
| Model | Stream pattern |
|---|---|
| Order | "Order-{id}" |
| Invoice | "Invoice-{id}" |
| Rma | "Rma-{id}" |
| Quote | "Quote-{id}" |
| Party / Customer | "Party-{id}" |
| CatalogItem | "CatalogItem-{id}" |
| EmailPreference | "EmailPreference-{id}" |
Read a record’s stream in the console:
Rails.configuration.event_store.read.stream("Order-42").backward.to_aCRM “Events” Tab
Section titled “CRM “Events” Tab”Orders, Customers, Invoices, and RMAs each have an Events tab in the CRM. It displays the last 100 domain events from the record’s named stream, newest first. The tab is lazy-loaded via Turbo (no impact on page load time).
RES Browser (admin)
Section titled “RES Browser (admin)”The RES Browser UI is mounted at /res (CRM subdomain, admin-only). It lets
you browse all streams, inspect event payloads, and troubleshoot handler issues.
Event Retention
Section titled “Event Retention”EventStorePurgeWorker runs nightly at 1 AM CT and deletes events older than
EVENT_STORE_RETENTION_DAYS days (default: 90). Override by setting that
env var in production.
See Also
Section titled “See Also”- RES documentation: https://railseventstore.org
- Event classes:
app/events/events.rb - Initializer / subscriptions:
config/initializers/event_store.rb - Handler tests:
test/subscribers/ - Publishing tests:
test/initializers/event_store_publishing_test.rb - Purge worker:
app/workers/event_store_purge_worker.rb