Module: Heatwave::Parallel
- Defined in:
- app/lib/heatwave/parallel.rb
Overview
Runs a small, fixed set of IO-bound tasks (carrier rate quotes, label and
PDF downloads) concurrently on plain threads.
Each task runs inside the Rails executor via Rails.application.executor.wrap,
as the Rails threading guide requires for any manually spawned thread that
touches application code: the executor checks ActiveRecord connections in
and out, sets up the query cache, and isolates per-thread state. wrap is
re-entrant (a no-op when the executor is already active on the thread), so
nesting is harmless.
This is the project's replacement for the concurrent_rails gem — that gem
did nothing more than this executor.wrap, and the workloads here are pure
IO fan-out (a handful of blocking HTTPS calls), where plain threads —
parallelised by the GVL being released during IO — are the simplest fit.
Results are returned in input order. If a task raises, the exception
propagates from the corresponding Thread#value call, mirroring
Concurrent::Promises::Future#value!.
Class Method Summary collapse
-
.all(*tasks) ⇒ Array
Runs each given task concurrently.
-
.map(items) {|item| ... } ⇒ Array
Runs
blockonce per element ofitems, each on its own thread.
Class Method Details
.all(*tasks) ⇒ Array
Runs each given task concurrently. The heterogeneous-task counterpart to
map — accepts any objects responding to call (lambdas, procs, method
objects).
65 66 67 |
# File 'app/lib/heatwave/parallel.rb', line 65 def all(*tasks) map(tasks, &:call) end |
.map(items) {|item| ... } ⇒ Array
Runs block once per element of items, each on its own thread.
36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 |
# File 'app/lib/heatwave/parallel.rb', line 36 def map(items) threads = items.map do |item| Thread.new(item) do |i| # The exception is intentionally captured and re-raised by #value # below (mirroring Concurrent::Promises::Future#value!), so Ruby's # default per-thread backtrace dump to stderr would just be noise. Thread.current.report_on_exception = false Rails.application.executor.wrap { yield(i) } end end # Join every thread before returning — even when one raises. A thread # left running would leak its Rails executor scope (and any checked-out # database connection) into whatever code runs next, which can corrupt # an unrelated connection. Failures are swallowed here and re-raised in # input order by #value below, so the first failure still wins. threads.each do |thread| thread.join rescue StandardError nil end threads.map(&:value) end |