Class: Assistant::PlanOrchestrator
- Inherits:
-
Object
- Object
- Assistant::PlanOrchestrator
- Defined in:
- app/services/assistant/plan_orchestrator.rb
Overview
Runs multi-step execution plans after +declare_plan+ halts the main chat.
Lives alongside ChatService; mutates the same conversation and streams via
the parent service's callbacks.
Plan execution is one sequential state machine; splitting only for metrics
would scatter persistence, streaming, and cost-cap logic.
rubocop:disable Metrics/ClassLength, Metrics/AbcSize, Metrics/BlockLength, Metrics/CyclomaticComplexity, Metrics/MethodLength, Metrics/PerceivedComplexity, Metrics/ParameterLists
Instance Method Summary collapse
-
#build_plan_error_result(message) ⇒ Object
Used from ChatService for halt paths that never entered plan execution.
- #execute!(streamer_proc) ⇒ Object
-
#initialize(conversation:, chat_service:) ⇒ PlanOrchestrator
constructor
A new instance of PlanOrchestrator.
Constructor Details
#initialize(conversation:, chat_service:) ⇒ PlanOrchestrator
Returns a new instance of PlanOrchestrator.
12 13 14 15 |
# File 'app/services/assistant/plan_orchestrator.rb', line 12 def initialize(conversation:, chat_service:) @conversation = conversation @chat_service = chat_service end |
Instance Method Details
#build_plan_error_result(message) ⇒ Object
Used from ChatService for halt paths that never entered plan execution.
197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 |
# File 'app/services/assistant/plan_orchestrator.rb', line 197 def build_plan_error_result() response_time = Process.clock_gettime(Process::CLOCK_MONOTONIC) - chat_service_ivar(:@start_time) chat_service_send(:assign_full_response_if_blank!, ) streamer = chat_service_ivar(:@streamer) streamer&.call(chat_service_ivar(:@full_response)) @conversation.(role: :assistant, content: chat_service_ivar(:@full_response)) ChatService::Result.new( content: chat_service_ivar(:@full_response), model: chat_service_ivar(:@model_key), input_tokens: 0, output_tokens: 0, cached_tokens: 0, cache_creation_tokens: 0, response_time: response_time, model_reason: chat_service_ivar(:@model_selection_reason), thinking_text: nil, tool_limit_reached: false, tool_stats: {} ) end |
#execute!(streamer_proc) ⇒ Object
17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 |
# File 'app/services/assistant/plan_orchestrator.rb', line 17 def execute!(streamer_proc) @conversation.reload chat_service_send(:reset_full_response_for_plan!, +'') plan = @conversation.['execution_plan'] return build_plan_error_result('Plan data was missing after declaration. Please try again.') unless plan.is_a?(Hash) && plan['steps'].is_a?(Array) && plan['steps'].any? goal = plan['goal'].to_s steps = plan['steps'] total_in = 0 total_out = 0 total_cached = 0 plan_cost = 0.0 step_results = [] plan_tool_limit_reached = false merged_tool_stats = { total_tool_calls: 0, plan_steps: steps.size } silent_step_streamer = proc { |_| } @current_plan_step_index = nil plan_sticky_keys = Assistant::ChatToolBuilder.sticky_service_keys_for_plan( steps, chat_service_ivar(:@tool_services), goal: goal ) cross_step_cache = {} # Services whose tools were successfully invoked in earlier steps stay # available for later ones. Steps in a plan are usually correlated, and # a successful prior tool call is a far stronger signal that a service # is relevant than a regex match against the step description. Without # this, conversation 1657 lost blog_management on step 3 even though # steps 1 and 2 had just edited the same blog post with it. services_used_so_far = [] steps.each_with_index do |step, idx| @current_plan_step_index = idx if chat_service_ivar(:@cancel_check)&.call remaining_steps = steps[idx..] remaining_steps.each_with_index do |_remaining_step, offset| (step_index: idx + offset, result_text: 'Cancelled by user', status: 'skipped') end break end (step_index: idx, result_text: nil, status: 'in_progress') chat_service_send(:emit_status, { event: :plan_step_start, step_index: idx, step_count: steps.size }) service_keys = Assistant::ChatToolBuilder.tool_service_keys_for_step( step['description'], chat_service_ivar(:@tool_services), sticky_keys: (plan_sticky_keys + services_used_so_far).uniq ) step_tools = Assistant::ChatToolBuilder.tools_for_services( service_keys, role: chat_service_ivar(:@user_role), allowed_objects: chat_service_ivar(:@allowed_objects), audit_context: { conversation_id: @conversation.id, user_id: @conversation.user_id, active_services: chat_service_ivar(:@tool_services) }, account: chat_service_ivar(:@account), provider: chat_service_ivar(:@model_config)[:provider], include_plan_tools: false ) if step_tools.empty? step_tools = Assistant::ChatToolBuilder.tools_for_services( chat_service_ivar(:@tool_services), role: chat_service_ivar(:@user_role), allowed_objects: chat_service_ivar(:@allowed_objects), audit_context: { conversation_id: @conversation.id, user_id: @conversation.user_id, active_services: chat_service_ivar(:@tool_services) }, account: chat_service_ivar(:@account), provider: chat_service_ivar(:@model_config)[:provider], include_plan_tools: false ) end step_guard = Assistant::ToolLoopGuard.new( role: chat_service_ivar(:@user_role), conversation_id: @conversation.id, plan_step_mode: true, cross_step_cache: cross_step_cache, supports_thinking: chat_service_ivar(:@model_config)&.dig(:supports_thinking) == true, cancel_check: chat_service_ivar(:@cancel_check) ) step_guard.apply!(step_tools) prior_payload = step_results.map do |sr| { step: sr[:step], description: sr[:description], result: sr[:result] } end text, in_t, out_t = run_plan_step_executor( step: step, step_index: idx, goal: goal, prior_results: prior_payload, step_tools: step_tools, streamer_proc: silent_step_streamer ) text = clamp_step_result_for_chain(text) step_status = if text.blank? && in_t.zero? && out_t.zero? 'skipped' elsif text.start_with?('Step failed:') || text.blank? 'failed' else 'completed' end (step_index: idx, result_text: text, status: step_status) step_results << { step: idx, description: step['description'].to_s, result: text } total_in += in_t total_out += out_t step_cost = Assistant::CostCalculator.cost_for( chat_service_ivar(:@model_key), input_tokens: in_t, output_tokens: out_t ) plan_cost += step_cost if step_guard.stats merged_tool_stats[:total_tool_calls] += step_guard.stats[:total_tool_calls] step_used = Array(step_guard.stats[:unique_tool_names]) .filter_map { |n| Assistant::ChatToolBuilder.service_for_tool(n) } .uniq & chat_service_ivar(:@tool_services).to_a services_used_so_far = (services_used_so_far + step_used).uniq end plan_tool_limit_reached ||= step_guard.limit_reached? if plan_cost > ChatService::MAX_PLAN_COST_USD chat_service_send(:emit_status, { event: :plan_cost_cap, estimated_usd: plan_cost.round(4) }) remaining = steps[(idx + 1)..] remaining&.each_with_index do |skipped_step, offset| skipped_idx = idx + 1 + offset step_results << { step: skipped_idx, description: skipped_step['description'].to_s, result: 'NOT EXECUTED — cost limit reached before this step could run.' } ( step_index: skipped_idx, result_text: 'Skipped (cost cap)', status: 'skipped' ) end break end chat_service_send(:emit_status, { event: :plan_step_complete, step_index: idx }) end chat_service_send(:emit_status, { event: :plan_assembling }) chat_service_send(:reset_full_response_for_plan!, +'') ass_in, ass_out, ass_cached = assemble_plan_response( goal: goal, step_results: step_results, streamer_proc: streamer_proc ) total_in += ass_in total_out += ass_out total_cached += ass_cached @conversation.(role: :assistant, content: chat_service_ivar(:@full_response)) build_plan_execution_result( input_tokens: total_in, output_tokens: total_out, cached_tokens: total_cached, cache_creation_tokens: 0, tool_stats: merged_tool_stats, plan_tool_limit_reached: plan_tool_limit_reached ) rescue StandardError => e Rails.logger.error("[Assistant::ChatService] Plan orchestration crashed: #{e.class}: #{e.}") Rails.logger.error(e.backtrace&.first(8)&.join("\n")) mark_remaining_plan_steps_failed!(@current_plan_step_index || 0, e.) raise end |