Class: PartyResearch::Orchestrator

Inherits:
Object
  • Object
show all
Defined in:
app/services/party_research/orchestrator.rb

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(run, on_progress: nil) ⇒ Orchestrator

Returns a new instance of Orchestrator.



28
29
30
31
# File 'app/services/party_research/orchestrator.rb', line 28

def initialize(run, on_progress: nil)
  @run = run
  @on_progress = on_progress
end

Class Method Details

.call(run:, on_progress: nil) ⇒ Object



24
25
26
# File 'app/services/party_research/orchestrator.rb', line 24

def self.call(run:, on_progress: nil)
  new(run, on_progress: on_progress).call
end

Instance Method Details

#callObject



33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
# File 'app/services/party_research/orchestrator.rb', line 33

def call
  # Sidekiq may retry the job after a partial failure. If the run
  # already reached a terminal state, don't redo the work — the
  # previous attempt's findings are persisted and the reviewer can
  # rerun explicitly if needed. If it died mid-run (state ==
  # 'running'), pick up where we left off without re-flipping state
  # or re-superseding prior findings.
  return if @run.state.in?(%w[completed failed])

  if @run.state == 'queued'
    @run.start!
    supersede_prior_pending_findings
    seed_adapter_progress!
  end

  adapter_names = @run.adapters_list
  total = adapter_names.size
  adapter_errors = []

  adapter_names.each_with_index do |name, idx|
    # Idempotent resume guard: if the run is already partway through
    # (Sidekiq retry, replayed worker), skip adapters that already
    # reached a terminal state in the previous attempt. Re-running
    # `completed` / `skipped` ones would duplicate pending findings
    # and burn paid API credits twice. `failed` and `running` still
    # rerun — the former because retry is the whole point, the
    # latter because we can't tell if findings were persisted before
    # the crash (Sidekiq's job-uniqueness keeps two live workers off
    # the same run, so this isn't a concurrent-claim hazard).
    prior_state = @run.adapter_progress&.dig(name, 'state')
    if prior_state.in?(%w[completed skipped])
      @on_progress&.call(idx, total, name)
      next
    end

    @on_progress&.call(idx, total, name)
    mark_adapter(name, state: 'running', started_at: Time.current.iso8601)

    begin
      findings_count = run_adapter(name)
      mark_adapter(name, state: findings_count == :skipped ? 'skipped' : 'completed',
                         findings: findings_count == :skipped ? 0 : findings_count,
                         finished_at: Time.current.iso8601)
    rescue StandardError => e
      Rails.logger.warn("[PartyResearch] adapter=#{name} failed: #{e.class}: #{e.message}")
      adapter_errors << "#{name}: #{e.message}"
      mark_adapter(name, state: 'failed', error: e.message.truncate(400),
                         finished_at: Time.current.iso8601)
    end
  end

  if adapter_errors.size == adapter_names.size && adapter_errors.any?
    @run.fail!(adapter_errors.join("\n"))
  else
    @run.update!(error_message: adapter_errors.join("\n")) if adapter_errors.any?
    @run.complete!
  end
rescue StandardError => e
  @run.fail!(e.message)
  raise
end