Class: Assistant::PlanOrchestrator

Inherits:
Object
  • Object
show all
Defined in:
app/services/assistant/plan_orchestrator.rb

Overview

Runs multi-step execution plans after +declare_plan+ halts the main chat.
Lives alongside ChatService; mutates the same conversation and streams via
the parent service's callbacks.

Plan execution is one sequential state machine; splitting only for metrics
would scatter persistence, streaming, and cost-cap logic.
rubocop:disable Metrics/ClassLength, Metrics/AbcSize, Metrics/BlockLength, Metrics/CyclomaticComplexity, Metrics/MethodLength, Metrics/PerceivedComplexity, Metrics/ParameterLists

Instance Method Summary collapse

Constructor Details

#initialize(conversation:, chat_service:) ⇒ PlanOrchestrator

Returns a new instance of PlanOrchestrator.



12
13
14
15
# File 'app/services/assistant/plan_orchestrator.rb', line 12

def initialize(conversation:, chat_service:)
  @conversation = conversation
  @chat_service = chat_service
end

Instance Method Details

#build_plan_error_result(message) ⇒ Object

Used from ChatService for halt paths that never entered plan execution.



197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
# File 'app/services/assistant/plan_orchestrator.rb', line 197

def build_plan_error_result(message)
  response_time = Process.clock_gettime(Process::CLOCK_MONOTONIC) - chat_service_ivar(:@start_time)
  chat_service_send(:assign_full_response_if_blank!, message)

  streamer = chat_service_ivar(:@streamer)
  streamer&.call(chat_service_ivar(:@full_response))
  @conversation.add_message(role: :assistant, content: chat_service_ivar(:@full_response))

  ChatService::Result.new(
    content: chat_service_ivar(:@full_response),
    model: chat_service_ivar(:@model_key),
    input_tokens: 0,
    output_tokens: 0,
    cached_tokens: 0,
    cache_creation_tokens: 0,
    response_time: response_time,
    model_reason: chat_service_ivar(:@model_selection_reason),
    thinking_text: nil,
    tool_limit_reached: false,
    tool_stats: {}
  )
end

#execute!(streamer_proc) ⇒ Object



17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
# File 'app/services/assistant/plan_orchestrator.rb', line 17

def execute!(streamer_proc)
  @conversation.reload
  chat_service_send(:reset_full_response_for_plan!, +'')
  plan = @conversation.['execution_plan']
  return build_plan_error_result('Plan data was missing after declaration. Please try again.') unless plan.is_a?(Hash) && plan['steps'].is_a?(Array) && plan['steps'].any?

  goal = plan['goal'].to_s
  steps = plan['steps']
  total_in = 0
  total_out = 0
  total_cached = 0
  plan_cost = 0.0
  step_results = []
  plan_tool_limit_reached = false
  merged_tool_stats = { total_tool_calls: 0, plan_steps: steps.size }
  silent_step_streamer = proc { |_| }
  @current_plan_step_index = nil

  plan_sticky_keys = Assistant::ChatToolBuilder.sticky_service_keys_for_plan(
    steps, chat_service_ivar(:@tool_services), goal: goal
  )
  cross_step_cache = {}
  # Services whose tools were successfully invoked in earlier steps stay
  # available for later ones. Steps in a plan are usually correlated, and
  # a successful prior tool call is a far stronger signal that a service
  # is relevant than a regex match against the step description. Without
  # this, conversation 1657 lost blog_management on step 3 even though
  # steps 1 and 2 had just edited the same blog post with it.
  services_used_so_far = []

  steps.each_with_index do |step, idx|
    @current_plan_step_index = idx
    if chat_service_ivar(:@cancel_check)&.call
      remaining_steps = steps[idx..]
      remaining_steps.each_with_index do |_remaining_step, offset|
        persist_plan_step_metadata!(step_index: idx + offset, result_text: 'Cancelled by user', status: 'skipped')
      end
      break
    end

    persist_plan_step_metadata!(step_index: idx, result_text: nil, status: 'in_progress')
    chat_service_send(:emit_status, { event: :plan_step_start, step_index: idx, step_count: steps.size })

    service_keys = Assistant::ChatToolBuilder.tool_service_keys_for_step(
      step['description'],
      chat_service_ivar(:@tool_services),
      sticky_keys: (plan_sticky_keys + services_used_so_far).uniq
    )
    step_tools = Assistant::ChatToolBuilder.tools_for_services(
      service_keys,
      role: chat_service_ivar(:@user_role),
      allowed_objects: chat_service_ivar(:@allowed_objects),
      audit_context: {
        conversation_id: @conversation.id,
        user_id: @conversation.user_id,
        active_services: chat_service_ivar(:@tool_services)
      },
      account: chat_service_ivar(:@account),
      provider: chat_service_ivar(:@model_config)[:provider],
      include_plan_tools: false
    )
    if step_tools.empty?
      step_tools = Assistant::ChatToolBuilder.tools_for_services(
        chat_service_ivar(:@tool_services),
        role: chat_service_ivar(:@user_role),
        allowed_objects: chat_service_ivar(:@allowed_objects),
        audit_context: {
          conversation_id: @conversation.id,
          user_id: @conversation.user_id,
          active_services: chat_service_ivar(:@tool_services)
        },
        account: chat_service_ivar(:@account),
        provider: chat_service_ivar(:@model_config)[:provider],
        include_plan_tools: false
      )
    end

    step_guard = Assistant::ToolLoopGuard.new(
      role: chat_service_ivar(:@user_role),
      conversation_id: @conversation.id,
      plan_step_mode: true,
      cross_step_cache: cross_step_cache,
      supports_thinking: chat_service_ivar(:@model_config)&.dig(:supports_thinking) == true,
      cancel_check: chat_service_ivar(:@cancel_check)
    )
    step_guard.apply!(step_tools)

    prior_payload = step_results.map do |sr|
      { step: sr[:step], description: sr[:description], result: sr[:result] }
    end

    text, in_t, out_t = run_plan_step_executor(
      step: step,
      step_index: idx,
      goal: goal,
      prior_results: prior_payload,
      step_tools: step_tools,
      streamer_proc: silent_step_streamer
    )
    text = clamp_step_result_for_chain(text)

    step_status = if text.blank? && in_t.zero? && out_t.zero?
                    'skipped'
                  elsif text.start_with?('Step failed:') || text.blank?
                    'failed'
                  else
                    'completed'
                  end
    persist_plan_step_metadata!(step_index: idx, result_text: text, status: step_status)

    step_results << { step: idx, description: step['description'].to_s, result: text }
    total_in += in_t
    total_out += out_t

    step_cost = Assistant::CostCalculator.cost_for(
      chat_service_ivar(:@model_key),
      input_tokens: in_t,
      output_tokens: out_t
    )
    plan_cost += step_cost
    if step_guard.stats
      merged_tool_stats[:total_tool_calls] += step_guard.stats[:total_tool_calls]
      step_used = Array(step_guard.stats[:unique_tool_names])
                  .filter_map { |n| Assistant::ChatToolBuilder.service_for_tool(n) }
                  .uniq & chat_service_ivar(:@tool_services).to_a
      services_used_so_far = (services_used_so_far + step_used).uniq
    end
    plan_tool_limit_reached ||= step_guard.limit_reached?

    if plan_cost > ChatService::MAX_PLAN_COST_USD
      chat_service_send(:emit_status, { event: :plan_cost_cap, estimated_usd: plan_cost.round(4) })
      remaining = steps[(idx + 1)..]
      remaining&.each_with_index do |skipped_step, offset|
        skipped_idx = idx + 1 + offset
        step_results << {
          step: skipped_idx,
          description: skipped_step['description'].to_s,
          result: 'NOT EXECUTED — cost limit reached before this step could run.'
        }
        persist_plan_step_metadata!(
          step_index: skipped_idx,
          result_text: 'Skipped (cost cap)',
          status: 'skipped'
        )
      end
      break
    end

    chat_service_send(:emit_status, { event: :plan_step_complete, step_index: idx })
  end

  chat_service_send(:emit_status, { event: :plan_assembling })
  chat_service_send(:reset_full_response_for_plan!, +'')
  ass_in, ass_out, ass_cached = assemble_plan_response(
    goal: goal,
    step_results: step_results,
    streamer_proc: streamer_proc
  )
  total_in += ass_in
  total_out += ass_out
  total_cached += ass_cached

  @conversation.add_message(role: :assistant, content: chat_service_ivar(:@full_response))

  build_plan_execution_result(
    input_tokens: total_in,
    output_tokens: total_out,
    cached_tokens: total_cached,
    cache_creation_tokens: 0,
    tool_stats: merged_tool_stats,
    plan_tool_limit_reached: plan_tool_limit_reached
  )
rescue StandardError => e
  Rails.logger.error("[Assistant::ChatService] Plan orchestration crashed: #{e.class}: #{e.message}")
  Rails.logger.error(e.backtrace&.first(8)&.join("\n"))
  mark_remaining_plan_steps_failed!(@current_plan_step_index || 0, e.message)
  raise
end