Class: Assistant::PlanOrchestrator

Inherits:
Object
  • Object
show all
Defined in:
app/services/assistant/plan_orchestrator.rb

Overview

Runs multi-step execution plans after +declare_plan+ halts the main chat.
Lives alongside ChatService; mutates the same conversation and streams via
the parent service's callbacks.

Plan execution is one sequential state machine; splitting only for metrics
would scatter persistence, streaming, and cost-cap logic.
rubocop:disable Metrics/ClassLength, Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/MethodLength, Metrics/PerceivedComplexity

Constant Summary collapse

PLAN_STEP_MAX_RETRIES =

A single hung streaming request must not be able to consume an entire plan
step's wall-clock budget. The global RubyLLM config retries 5× at a 120s
request timeout — a 720s worst case that EXCEEDS MAX_PLAN_STEP_DURATION (600s),
so a dead Gemini SSE stream silently eats the whole step and the briefing loses
that step's data with zero salvage. (Prod conv 3388, 2026-06-15: step 2 made 9
SQL calls in 33s, then a dead stream retried for ~9.4 min until the 600s step
Timeout fired.) Capping retries here bounds one request's full retry cycle to
120s × (2 + 1) = 360s — comfortably under the step budget — so a doomed request
fails fast and the step degrades gracefully instead of timing out the briefing.

2

Instance Method Summary collapse

Constructor Details

#initialize(conversation:, chat_service:) ⇒ PlanOrchestrator

Returns a new instance of PlanOrchestrator.



23
24
25
26
# File 'app/services/assistant/plan_orchestrator.rb', line 23

def initialize(conversation:, chat_service:)
  @conversation = conversation
  @chat_service = chat_service
end

Instance Method Details

#build_plan_error_result(message) ⇒ Object

Used from ChatService for halt paths that never entered plan execution.



215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
# File 'app/services/assistant/plan_orchestrator.rb', line 215

def build_plan_error_result(message)
  response_time = Process.clock_gettime(Process::CLOCK_MONOTONIC) - chat_service_ivar(:@start_time)
  chat_service_send(:assign_full_response_if_blank!, message)

  streamer = chat_service_ivar(:@streamer)
  streamer&.call(chat_service_ivar(:@full_response))
  @conversation.add_message(role: :assistant, content: chat_service_ivar(:@full_response))

  ChatService::Result.new(
    content: chat_service_ivar(:@full_response),
    model: chat_service_ivar(:@model_key),
    input_tokens: 0,
    output_tokens: 0,
    cached_tokens: 0,
    cache_creation_tokens: 0,
    response_time: response_time,
    model_reason: chat_service_ivar(:@model_selection_reason),
    thinking_text: nil,
    tool_limit_reached: false,
    tool_stats: {}
  )
end

#execute!(streamer_proc) ⇒ Object



28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
# File 'app/services/assistant/plan_orchestrator.rb', line 28

def execute!(streamer_proc)
  @conversation.reload
  chat_service_send(:reset_full_response_for_plan!, +'')
  plan = @conversation.['execution_plan']
  return build_plan_error_result('Plan data was missing after declaration. Please try again.') unless plan.is_a?(Hash) && plan['steps'].is_a?(Array) && plan['steps'].any?

  goal = plan['goal'].to_s
  steps = plan['steps']
  total_in = 0
  total_out = 0
  total_cached = 0
  plan_cost = 0.0
  step_results = []
  plan_tool_limit_reached = false
  merged_tool_stats = { total_tool_calls: 0, plan_steps: steps.size }
  silent_step_streamer = proc { |_| }
  @current_plan_step_index = nil

  plan_sticky_keys = Assistant::ChatToolBuilder.sticky_service_keys_for_plan(
    steps, chat_service_ivar(:@tool_services), goal: goal
  )
  cross_step_cache = {}
  # SQL HYGIENE: a single Set shared across every step's ToolLoopGuard so
  # a describe_available_data call in step 1 still satisfies the gate when
  # SQL referencing the same view runs in a later step. Without this, the
  # model would be forced to re-describe each step and burn budget.
  shared_described_views = Set.new
  # Services whose tools were successfully invoked in earlier steps stay
  # available for later ones. Steps in a plan are usually correlated, and
  # a successful prior tool call is a far stronger signal that a service
  # is relevant than a regex match against the step description. Without
  # this, conversation 1657 lost blog_management on step 3 even though
  # steps 1 and 2 had just edited the same blog post with it.
  services_used_so_far = []

  steps.each_with_index do |step, idx|
    @current_plan_step_index = idx
    if chat_service_ivar(:@cancel_check)&.call
      remaining_steps = steps[idx..]
      remaining_steps.each_with_index do |_remaining_step, offset|
        persist_plan_step_metadata!(step_index: idx + offset, result_text: 'Cancelled by user', status: 'skipped')
      end
      break
    end

    persist_plan_step_metadata!(step_index: idx, result_text: nil, status: 'in_progress')
    chat_service_send(:emit_status, { event: :plan_step_start, step_index: idx, step_count: steps.size })

    service_keys = Assistant::ChatToolBuilder.tool_service_keys_for_step(
      step['description'],
      chat_service_ivar(:@tool_services),
      sticky_keys: (plan_sticky_keys + services_used_so_far).uniq
    )
    step_tools = Assistant::ChatToolBuilder.tools_for_services(
      service_keys,
      role: chat_service_ivar(:@user_role),
      allowed_objects: chat_service_ivar(:@allowed_objects),
      audit_context: {
        conversation_id: @conversation.id,
        user_id: @conversation.user_id,
        active_services: chat_service_ivar(:@tool_services)
      },
      account: chat_service_ivar(:@account),
      provider: chat_service_ivar(:@model_config)[:provider],
      include_plan_tools: false
    )
    if step_tools.empty?
      step_tools = Assistant::ChatToolBuilder.tools_for_services(
        chat_service_ivar(:@tool_services),
        role: chat_service_ivar(:@user_role),
        allowed_objects: chat_service_ivar(:@allowed_objects),
        audit_context: {
          conversation_id: @conversation.id,
          user_id: @conversation.user_id,
          active_services: chat_service_ivar(:@tool_services)
        },
        account: chat_service_ivar(:@account),
        provider: chat_service_ivar(:@model_config)[:provider],
        include_plan_tools: false
      )
    end

    step_guard = Assistant::ToolLoopGuard.new(
      role: chat_service_ivar(:@user_role),
      conversation_id: @conversation.id,
      plan_step_mode: true,
      cross_step_cache: cross_step_cache,
      supports_thinking: chat_service_ivar(:@model_config)&.dig(:supports_thinking) == true,
      cancel_check: chat_service_ivar(:@cancel_check),
      available_tool_names: step_tools.map(&:name),
      described_views: shared_described_views
    )
    step_guard.apply!(step_tools)

    prior_payload = step_results.map do |sr|
      { step: sr[:step], description: sr[:description], result: sr[:result] }
    end

    text, in_t, out_t = run_plan_step_executor(
      step: step,
      step_index: idx,
      goal: goal,
      prior_results: prior_payload,
      step_tools: step_tools,
      streamer_proc: silent_step_streamer
    )
    text = clamp_step_result_for_chain(text)

    step_status = if text.blank? && in_t.zero? && out_t.zero?
                    'skipped'
                  elsif text.start_with?('Step failed:', 'STEP FAILED') || text.blank?
                    'failed'
                  else
                    'completed'
                  end
    persist_plan_step_metadata!(step_index: idx, result_text: text, status: step_status)

    step_results << { step: idx, description: step['description'].to_s, result: text }
    total_in += in_t
    total_out += out_t

    step_cost = Assistant::CostCalculator.cost_for(
      chat_service_ivar(:@model_key),
      input_tokens: in_t,
      output_tokens: out_t
    )
    plan_cost += step_cost
    if step_guard.stats
      merged_tool_stats[:total_tool_calls] += step_guard.stats[:total_tool_calls]
      step_used = Array(step_guard.stats[:unique_tool_names])
                  .filter_map { |n| Assistant::ChatToolBuilder.service_for_tool(n) }
                  .uniq & chat_service_ivar(:@tool_services).to_a
      services_used_so_far = (services_used_so_far + step_used).uniq
    end
    plan_tool_limit_reached ||= step_guard.limit_reached?

    if plan_cost > ChatService::MAX_PLAN_COST_USD
      chat_service_send(:emit_status, { event: :plan_cost_cap, estimated_usd: plan_cost.round(4) })
      remaining = steps[(idx + 1)..]
      remaining&.each_with_index do |skipped_step, offset|
        skipped_idx = idx + 1 + offset
        step_results << {
          step: skipped_idx,
          description: skipped_step['description'].to_s,
          result: 'NOT EXECUTED — cost limit reached before this step could run.'
        }
        persist_plan_step_metadata!(
          step_index: skipped_idx,
          result_text: 'Skipped (cost cap)',
          status: 'skipped'
        )
      end
      break
    end

    chat_service_send(:emit_status, { event: :plan_step_complete, step_index: idx })
  end

  chat_service_send(:emit_status, { event: :plan_assembling })
  chat_service_send(:reset_full_response_for_plan!, +'')
  ass_in, ass_out, ass_cached = assemble_plan_response(
    goal: goal,
    step_results: step_results,
    streamer_proc: streamer_proc
  )
  total_in += ass_in
  total_out += ass_out
  total_cached += ass_cached

  @conversation.add_message(role: :assistant, content: chat_service_ivar(:@full_response))

  build_plan_execution_result(
    input_tokens: total_in,
    output_tokens: total_out,
    cached_tokens: total_cached,
    cache_creation_tokens: 0,
    tool_stats: merged_tool_stats,
    plan_tool_limit_reached: plan_tool_limit_reached
  )
rescue StandardError => e
  Rails.logger.error("[Assistant::ChatService] Plan orchestration crashed: #{e.class}: #{e.message}")
  Rails.logger.error(e.backtrace&.first(8)&.join("\n"))
  mark_remaining_plan_steps_failed!(@current_plan_step_index || 0, e.message)
  raise
end