Rails Event Store (Pub/Sub Event Bus)

A persistent, auditable event bus for decoupled domain events across the application.
Replaced the unmaintained wisper / wisper-sidekiq gems.

Overview

Domain events are published by models whenever something meaningful happens.
Handlers subscribe to those events and react — synchronously or asynchronously —
without the publisher needing to know who is listening.

Rails Event Store (RES) adds two important things over the old wisper setup:

  • Persistence. Every published event is written to the database before any
    handler runs. You have a permanent, queryable log of everything that happened.
  • First-class async. Async dispatch goes through ActiveJob (Sidekiq), with no
    separate companion gem and no YAML deserialization workarounds.

Architecture

Models                    Event Store (RES)             Handlers
──────                    ─────────────────             ────────
Party           ──►  Events::ProductInterestChanged ──► Party::ProductInterestHandler   (sync)
Customer        ──►  Events::BuyingGroupChanged      ──► Party::BuyingGroupHandler       (sync)
Address         ──►  Events::LocationChanged         ──► Party::LocationHandler          (sync)
Quote           ──►  Events::QuoteCompleted          ──► Quote::QuoteCompletedHandler    (sync)
EmailPreference ──►  Events::EmailUnsubscribed       ──► EmailUnsubscribeHandler         (sync)
CatalogItem     ──►  Events::PriceUpdated            ──► CatalogItem::PriceUpdatedHandler (async via Sidekiq)

Activity ──► Activity::Observer (called directly from AR callbacks — no event bus)

Event Classes

Defined in app/events/events.rb as thin subclasses of RailsEventStore::Event:

module Events
  class ProductInterestChanged < RailsEventStore::Event; end
  class BuyingGroupChanged     < RailsEventStore::Event; end
  class LocationChanged        < RailsEventStore::Event; end
  class QuoteCompleted         < RailsEventStore::Event; end
  class EmailUnsubscribed      < RailsEventStore::Event; end
  class PriceUpdated           < RailsEventStore::Event; end
end

Each event carries a data: hash. The schemas are:

Event Data fields
ProductInterestChanged party_id, product_line_ids_added, product_line_ids_removed
BuyingGroupChanged party_id, buying_group_id_added, buying_group_id_removed
LocationChanged party_id, latitude, longitude
QuoteCompleted quote_id
EmailUnsubscribed email, categories
PriceUpdated catalog_item_id, price_was, price_now

Configuration

config/initializers/event_store.rb creates the global client and registers
all subscriptions inside to_prepare (runs on boot and on code reload in development):

Rails.configuration.event_store = RailsEventStore::Client.new

Rails.configuration.to_prepare do
  es = Rails.configuration.event_store
  es.subscribe(Party::ProductInterestHandler,       to: [Events::ProductInterestChanged])
  es.subscribe(Party::BuyingGroupHandler,           to: [Events::BuyingGroupChanged])
  es.subscribe(Party::LocationHandler,              to: [Events::LocationChanged])
  es.subscribe(Quote::QuoteCompletedHandler,        to: [Events::QuoteCompleted])
  es.subscribe(EmailUnsubscribeHandler,             to: [Events::EmailUnsubscribed])
  es.subscribe(CatalogItem::PriceUpdatedHandler,    to: [Events::PriceUpdated])
end

Access the store anywhere via Rails.configuration.event_store.

Publishing Events

Models publish via Rails.configuration.event_store.publish(...).
Each model has a private method that constructs and publishes the event:

Party (app/models/party.rb)

def broadcast_product_interest_changed(product_line_ids_added:, product_line_ids_removed:)
  return unless product_line_ids_added.present? || product_line_ids_removed.present?
  Rails.configuration.event_store.publish(
    Events::ProductInterestChanged.new(data: { party_id: id, ... })
  )
end

Customer (app/models/customer.rb) — buying_group_changed on buying group update.

Address (app/models/address.rb) — location_changed in after_save_commit
when lat or lng changes.

Quote (app/models/quote.rb) — quote_complete in the state machine
after_transition any - :complete => :complete block.

EmailPreference (app/models/email_preference.rb) — email_unsubscribed
in after_save when one or more disable_* columns flip to true.

CatalogItem (app/models/catalog_item.rb) — price_updated in
after_commit :notify_of_price_update when the amount column changes.

Handlers

Handlers live in app/subscribers/ and implement #call(event).

Sync handlers

All accept a RES event object and read data from event.data:

Handler File Action
Party::ProductInterestHandler app/subscribers/party/product_interest_handler.rb Calls Campaign::AssignDripCampaigns
Party::BuyingGroupHandler app/subscribers/party/buying_group_handler.rb Calls Campaign::AssignDripCampaigns
Party::LocationHandler app/subscribers/party/location_handler.rb Calls Campaign::AssignDripCampaigns
Quote::QuoteCompletedHandler app/subscribers/quote/quote_completed_handler.rb Follow-up, profiling, drip campaigns, large opp assignment, room completion
EmailUnsubscribeHandler app/subscribers/email_unsubscribe_handler.rb Cancels open activities for the email address

Async handler

CatalogItem::PriceUpdatedHandler (app/subscribers/catalog_item/price_updated_handler.rb)
inherits from ApplicationJob and prepends RailsEventStore::AsyncHandler.
When RES dispatches this event it calls perform_later, routing through Sidekiq.

class CatalogItem::PriceUpdatedHandler < ApplicationJob
  prepend RailsEventStore::AsyncHandler
  queue_as :default

  def perform(event)
    return if event.data[:price_was] == event.data[:price_now]
    catalog_item = CatalogItem.find(event.data[:catalog_item_id])
    catalog_item.dependent_catalog_items.each(&:item_specs_or_price_updated)
    catalog_item.push_price_message
  end
end

Because RailsEventStore::AsyncHandler#perform passes an event object directly
when it's not a Hash, unit tests can call .new.perform(event_object) without
going through the job queue.

Activity::Observer (not on the event bus)

Activity does not publish RES events. Instead, after_create and
after_update callbacks call Activity::Observer.new directly:

after_create { Activity::Observer.new.after_create(self) }
after_update { Activity::Observer.new.after_update(self) }

The reason is architectural: Activity::Observer relies heavily on ActiveRecord's
transient saved_change_to_* state (e.g. saved_change_to_activity_result_type_id?).
RES events serialize data to a hash; a handler that reloads the record from the
database loses this state. Direct invocation from the AR callback preserves it
without workarounds.

Database Tables

The migration db/migrate/20260223223019_create_event_store_events.rb creates:

Table Purpose
event_store_events One row per published event (UUID, event type, serialized data, timestamps)
event_store_events_in_streams Stream membership — maps events to named streams

Events are stored as YAML-serialized bytea. Use the RES read API to query them:

# All events of a type
Rails.configuration.event_store.read.of_type(Events::QuoteCompleted).to_a

# Recent events
Rails.configuration.event_store.read.limit(20).to_a

# Events in a stream (useful if you later publish to named streams)
Rails.configuration.event_store.read.stream("Quote-42").to_a

Adding a New Event

  1. Add an event class to app/events/events.rb:

    class MyThingHappened < RailsEventStore::Event; end
    
  2. Publish from the model:

    Rails.configuration.event_store.publish(
      Events::MyThingHappened.new(data: { record_id: id, ... })
    )
    
  3. Create a handler in app/subscribers/:

    class MyThingHandler
      def call(event)
        record = MyModel.find(event.data[:record_id])
        # do work
      end
    end
    
  4. Subscribe in config/initializers/event_store.rb:

    es.subscribe(MyThingHandler, to: [Events::MyThingHappened])
    
  5. Write tests in test/subscribers/my_thing_handler_test.rb calling
    handler.call(event) directly.

Testing

Handler unit tests

Call handlers directly — no need to involve the event store:

event = Events::ProductInterestChanged.new(data: {
  party_id: party.id,
  product_line_ids_added: [1, 2],
  product_line_ids_removed: []
})
Party::ProductInterestHandler.new.call(event)

Publishing tests

Use a fresh RailsEventStore::Client (no handlers subscribed) via
capture_events helper defined in test/initializers/event_store_publishing_test.rb:

captured = capture_events(Events::ProductInterestChanged) do
  party.broadcast_product_interest_changed(product_line_ids_added: [1])
end
assert_equal 1, captured.size

The helper wraps the block in EXTERNAL_SERVICE_STUB_MUTEX to safely swap
Rails.configuration.event_store in the parallel test environment.

Named Streams

Every publish call includes a stream_name: so events are queryable per record:

Model Stream pattern
Order "Order-{id}"
Invoice "Invoice-{id}"
Rma "Rma-{id}"
Quote "Quote-{id}"
Party / Customer "Party-{id}"
CatalogItem "CatalogItem-{id}"
EmailPreference "EmailPreference-{id}"

Read a record's stream in the console:

Rails.configuration.event_store.read.stream("Order-42").backward.to_a

CRM "Events" Tab

Orders, Customers, Invoices, and RMAs each have an Events tab in the CRM.
It displays the last 100 domain events from the record's named stream, newest
first. The tab is lazy-loaded via Turbo (no impact on page load time).

RES Browser (admin)

The RES Browser UI is mounted at /res (CRM subdomain, admin-only). It lets
you browse all streams, inspect event payloads, and troubleshoot handler issues.

Event Retention

EventStorePurgeWorker runs nightly at 1 AM CT and deletes events older than
EVENT_STORE_RETENTION_DAYS days (default: 90). Override by setting that
env var in production.

See Also

  • RES documentation: https://railseventstore.org
  • Event classes: app/events/events.rb
  • Initializer / subscriptions: config/initializers/event_store.rb
  • Handler tests: test/subscribers/
  • Publishing tests: test/initializers/event_store_publishing_test.rb
  • Purge worker: app/workers/event_store_purge_worker.rb