Module: Heatwave::Parallel

Defined in:
app/lib/heatwave/parallel.rb

Overview

Runs a small, fixed set of IO-bound tasks (carrier rate quotes, label and
PDF downloads) concurrently on plain threads.

Each task runs inside the Rails executor via Rails.application.executor.wrap,
as the Rails threading guide requires for any manually spawned thread that
touches application code: the executor checks ActiveRecord connections in
and out, sets up the query cache, and isolates per-thread state. wrap is
re-entrant (a no-op when the executor is already active on the thread), so
nesting is harmless.

This is the project's replacement for the concurrent_rails gem — that gem
did nothing more than this executor.wrap, and the workloads here are pure
IO fan-out (a handful of blocking HTTPS calls), where plain threads —
parallelised by the GVL being released during IO — are the simplest fit.

Results are returned in input order. If a task raises, the exception
propagates from the corresponding Thread#value call, mirroring
Concurrent::Promises::Future#value!.

Examples:

Fan out over a collection

rates = Heatwave::Parallel.map(shippers) { |shipper| shipper.find_rates }

Run a heterogeneous set of tasks

invoice, label = Heatwave::Parallel.all(-> { download_ci }, -> { download_label })

Class Method Summary collapse

Class Method Details

.all(*tasks) ⇒ Array

Runs each given task concurrently. The heterogeneous-task counterpart to
map — accepts any objects responding to call (lambdas, procs, method
objects).

Parameters:

  • tasks (Array<#call>)

    the tasks to run

Returns:

  • (Array)

    each task's result, in argument order



65
66
67
# File 'app/lib/heatwave/parallel.rb', line 65

def all(*tasks)
  map(tasks, &:call)
end

.map(items) {|item| ... } ⇒ Array

Runs block once per element of items, each on its own thread.

Parameters:

  • items (Enumerable)

    the inputs to fan out over

Yield Parameters:

  • item (Object)

    one element of items

Returns:

  • (Array)

    each task's result, in items order



36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
# File 'app/lib/heatwave/parallel.rb', line 36

def map(items)
  threads = items.map do |item|
    Thread.new(item) do |i|
      # The exception is intentionally captured and re-raised by #value
      # below (mirroring Concurrent::Promises::Future#value!), so Ruby's
      # default per-thread backtrace dump to stderr would just be noise.
      Thread.current.report_on_exception = false
      Rails.application.executor.wrap { yield(i) }
    end
  end
  # Join every thread before returning — even when one raises. A thread
  # left running would leak its Rails executor scope (and any checked-out
  # database connection) into whatever code runs next, which can corrupt
  # an unrelated connection. Failures are swallowed here and re-raised in
  # input order by #value below, so the first failure still wins.
  threads.each do |thread|
    thread.join
  rescue StandardError
    nil
  end
  threads.map(&:value)
end