Class: Assistant::ContextCompactor

Inherits:
Object
  • Object
show all
Defined in:
app/services/assistant/context_compactor.rb

Overview

Two-layer context compaction to keep AI assistant conversations within
token budgets and reduce cost.

Strategy 1 — Ephemeral Tool Result Compaction (post-exchange):
After a complete exchange, replace ALL large tool result messages with
compact summaries. Tool results (blog HTML, SQL output, API JSON) are
only needed verbatim during the exchange that consumed them — once the
response is streamed they are stale. Keeping them verbatim forces every
subsequent API call to re-send the same payload, bloating context rapidly.
This mirrors how Cursor treats file reads: ephemeral per-request injection,
not accumulated history.

Strategy 2 — Sliding Window + Summary (pre-exchange):
When estimated context exceeds a threshold, compress older messages
into a cached summary and keep only recent messages verbatim.

Usage:

After a response completes (in the worker):

Assistant::ContextCompactor.compact_tool_results!(conversation)

Before building the LLM payload (in to_llm):

Assistant::ContextCompactor.ensure_context_summary!(conversation)

Constant Summary collapse

TOOL_RESULT_CHAR_THRESHOLD =

── Thresholds ────────────────────────────────────────────────────
Tool results shorter than this (chars) are left as-is.

1_500
CONTEXT_TOKEN_THRESHOLD =

When estimated context exceeds this many tokens, trigger sliding-window.
When actual input_tokens from the API are available (preferred over char
estimation), this threshold is applied directly against those real counts.
Kept conservatively low so that tool-definition overhead + mid-flight
tool-call additions don't push the next request over the 200k limit.

50_000
MAX_MESSAGES_BEFORE_SUMMARY =

Also trigger sliding-window when the raw message count crosses this threshold,
regardless of token estimates. Large-context models (Gemini 1M) never hit the
token ceiling naturally, so we need a count-based safety valve. 40 messages ≈
5-8 user turns with tool activity — enough history for coherent continuity.

40
MIN_RECENT_MESSAGES =

Keep at least this many messages verbatim in the recent window.
Ensures the LLM always sees enough context for coherent follow-ups.
6 messages ≈ 2 user/assistant exchanges — sufficient for continuity.
Combined with immediate tool-result compaction, these messages are short.

6
SUMMARIZER_MODEL =
AiModelConstants.id(:summarization)
CHARS_PER_TOKEN =

Rough chars-per-token ratio. Conservative (real ratio is ~3.5 for
English) so we trigger compaction a bit early rather than too late.

4
MAX_FORK_DEPTH =

Maximum number of parent→child forks before we refuse to fork again.
Prevents infinite cascade: conv → cont → cont → cont → …

3
OVERHEAD_TOKENS =

Check whether the conversation needs a sliding-window summary.
If context exceeds the threshold and the cached summary is stale
(or absent), generate a new one.

Returns the summary text (or nil if compaction is not needed).
The caller (to_llm) uses this to decide whether to inject the
summary and truncate old messages.

Estimated token overhead from system prompt and tool schemas that is NOT
reflected in stored message content. Conservative estimate based on typical
Sunny configurations: ~3K system prompt + ~2-15K per tool service.

Returns:

  • (String, nil)

    summary text, or nil if not needed

15_000
DEDUP_STUB =

Collapse repeated identical tool calls across all turns.

When the same tool+args combination appears N times in history (e.g. the model
called get_blog_post(868) in 6 different turns), the context re-sends the same
data on every subsequent API call. This replaces the content of all but the
MOST RECENT result for each unique (tool_name, arguments) signature with a
lightweight stub, dramatically reducing context for chatty read-only tools.

Skips messages already stubbed (content starts with "[Already retrieved").
Safe: the latest result is always preserved so the model can reference it.

'[Already retrieved earlier — omitted to reduce context]'
MIN_CALLS_TO_DEDUP =

only dedup when ≥ this many identical calls exist

2
TRUNCATION_KEEP_CHARS =

Truncation threshold: keep this many leading chars from the tool result.
Enough to preserve key data points (IDs, titles, status, error messages)
without carrying full blog HTML or SQL result sets into future turns.

800
CONTEXT_OVERFLOW_PATTERNS =

Regex patterns that identify a provider-level context-length error across
Anthropic, OpenAI, and Gemini. RubyLLM raises BadRequestError (400) for all.

[
  /prompt is too long/i,
  /maximum context length/i,
  /context_length_exceeded/i,
  /exceeds.*context.*window/i,
  /input.*too long/i,
  /too many tokens/i,
  /tokens.*exceed/i,
  /prompt.*exceeds.*limit/i,
  /reduce.*size.*message/i
].freeze

Class Method Summary collapse

Class Method Details

.compact_tool_results!(conversation) ⇒ Object

Summarize large tool-result messages from the most recent exchange.
Called after finalize_response in the worker so the assistant has
already consumed the raw data.

Parameters:



68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
# File 'app/services/assistant/context_compactor.rb', line 68

def self.compact_tool_results!(conversation)
  tool_messages = large_tool_results(conversation)

  unless tool_messages.empty?
    Rails.logger.info do
      "[ContextCompactor] Compacting #{tool_messages.size} tool result(s) " \
        "for conversation #{conversation.id}"
    end
    tool_messages.each { |msg| summarize_tool_result!(msg, conversation: conversation) }
  end

  # Deduplicate: when the same tool+args has been called multiple times across
  # turns, stub out older results so the context doesn't keep re-sending the
  # same data. The most recent result is preserved — it reflects current state.
  deduplicate_repeated_tool_calls!(conversation)
rescue StandardError => error
  # Compaction is best-effort — never break the main flow.
  Rails.logger.warn("[ContextCompactor] Tool result compaction failed: #{error.message}")
end

.compaction_cutoff_id(conversation) ⇒ Integer?

Return the message ID through which the cached summary covers.
Used by to_llm to filter old messages.

Parameters:

Returns:

  • (Integer, nil)


207
208
209
# File 'app/services/assistant/context_compactor.rb', line 207

def self.compaction_cutoff_id(conversation)
  conversation.compaction_through_message_id
end

.context_overflow?(error) ⇒ Boolean

Returns true when the error message matches a known context-overflow pattern.

Parameters:

  • error (StandardError)

Returns:

  • (Boolean)


614
615
616
617
# File 'app/services/assistant/context_compactor.rb', line 614

def self.context_overflow?(error)
  message_text = error.message.to_s
  CONTEXT_OVERFLOW_PATTERNS.any? { |pattern| message_text.match?(pattern) }
end

.emergency_compact!(conversation, level: 1) ⇒ Boolean

── Strategy 3: Emergency Compaction (context overflow recovery) ──

Called when the LLM rejects a request due to context length. Performs
progressively more aggressive compaction and returns true if the context
was reduced (caller should retry the LLM call).

Levels:

  1. Force-compact all tool results (including small ones from current turn)
    and regenerate the sliding-window summary with a tighter recent window.
  2. Nuclear option — summarize everything except the last 2 messages,
    compact ALL tool results regardless of size.

Parameters:

Returns:

  • (Boolean)

    true if compaction was performed (caller should retry)



369
370
371
372
373
374
375
376
377
378
# File 'app/services/assistant/context_compactor.rb', line 369

def self.emergency_compact!(conversation, level: 1)
  case level
  when 1 then emergency_compact_level_one!(conversation)
  when 2 then emergency_compact_level_two!(conversation)
  else false
  end
rescue StandardError => error
  Rails.logger.warn("[ContextCompactor] Emergency compaction level #{level} failed: #{error.message}")
  false
end

.ensure_context_summary!(conversation) ⇒ Object



105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
# File 'app/services/assistant/context_compactor.rb', line 105

def self.ensure_context_summary!(conversation)
  message_rows = conversation.assistant_messages
    .where.not(role: 'system')
    .order(:id)
    .pluck(:id, :role, :content)

  # Prefer the most recent REAL input_token count reported by the LLM provider
  # over the naive char-estimate below. The char estimate counts only stored
  # message text and misses tool-definition schemas, tool-call argument JSON
  # in assistant messages, and system prompt overhead.
  recent_actual_input = conversation.assistant_messages
    .where(role: 'assistant')
    .where.not(input_tokens: nil)
    .order(id: :desc)
    .limit(1)
    .pick(:input_tokens)

  estimated_tokens = if recent_actual_input
                       recent_actual_input
                     else
                       content_tokens = message_rows.sum { |_id, _role, content| (content.to_s.length / CHARS_PER_TOKEN) }
                       content_tokens + OVERHEAD_TOKENS
                     end

  # Trigger on message count OR token estimate — whichever fires first.
  # Large-context models (Gemini 1M) may never hit the token threshold, so
  # the count guard ensures we still summarise long-running conversations.
  return nil if estimated_tokens < CONTEXT_TOKEN_THRESHOLD && message_rows.size < MAX_MESSAGES_BEFORE_SUMMARY

  # Find the split point: keep roughly the last 40% of messages,
  # but at least MIN_RECENT_MESSAGES.
  recent_count = [MIN_RECENT_MESSAGES, (message_rows.size * 0.4).ceil].max
  split_index = [message_rows.size - recent_count, 0].max
  return nil if split_index <= 0 # not enough old messages to summarize

  # Snap the split to a safe boundary (don't split tool_call/tool_result pairs).
  split_index = find_safe_split(message_rows, split_index)
  return nil if split_index <= 0

  split_message_id = message_rows[split_index - 1][0] # last message included in "old"

  # Check cached summary validity.
  #
  # The split point shifts as new messages arrive (recent_count grows with
  # message count), so an exact match on cached_through == split_message_id
  # is sufficient to detect that new messages have crossed into the "old"
  # window and the summary needs regeneration. When the split is stable
  # (no boundary shift) the underlying old_messages slice is unchanged,
  # so re-summarizing would produce the same output for the same Gemini
  # spend — return the cache.
  #
  # Regression: an earlier `cache_is_stale = (size - split_index) > 4`
  # check measured the *recent window size* (always > 4), not "new
  # messages since cache". It permanently forced regeneration on every
  # to_llm call within a turn — observed in conversation 1233 as 23
  # identical summarizer calls in 6 minutes.
  cached_through = conversation.compaction_through_message_id

  if conversation.compaction_summary.present? && cached_through == split_message_id
    return conversation.compaction_summary
  end

  # Generate a new summary from old messages
  old_messages = message_rows[0...split_index]
  summary = generate_conversation_summary(old_messages, conversation: conversation)

  # Persist in metadata for future turns.
  conversation.update_columns(
    metadata: conversation..merge(
      'compaction_summary' => summary,
      'compaction_through_message_id' => split_message_id
    )
  )

  # Keep the in-memory record consistent so subsequent ensure_context_summary!
  # calls within the same worker turn hit the cache. MUST use the accessor
  # setters — jsonb_accessor backs these fields with virtual attributes that
  # are NOT refreshed by update_columns mutating the raw metadata hash.
  # Without these two assignments, the cached_through read at the top of this
  # method returns the stale in-memory nil and we regenerate the summary on
  # every to_llm call (observed in conversation 1233 as 23 identical
  # summarizer requests in 6 minutes).
  conversation.compaction_summary = summary
  conversation.compaction_through_message_id = split_message_id

  Rails.logger.info do
    "[ContextCompactor] Generated sliding-window summary for conversation #{conversation.id}: " \
      "summarized #{split_index} messages (through msg #{split_message_id}), " \
      "keeping #{message_rows.size - split_index} recent"
  end

  summary
rescue StandardError => error
  Rails.logger.warn("[ContextCompactor] Sliding window summary failed: #{error.message}")
  nil
end

.fork_continuation!(conversation, pending_user_message: nil, tool_services: [], summarize: true) ⇒ AssistantConversation?

── Strategy 4: Conversation Fork (manual / power-user) ───────

Creates a new "continuation" conversation with the same owner, carrying a
compact summary of the full prior history as injected context. Used by
manual "Continue with context" and "Branch from here" actions.

NOTE: This is NOT used for automatic context overflow recovery — that is
handled by emergency_compact! + retry. Forking is a deliberate user action.

Parameters:

  • conversation (AssistantConversation)

    The source conversation

  • pending_user_message (String, nil) (defaults to: nil)

    The message to carry forward

  • tool_services (Array<String>) (defaults to: [])

    Tool services to carry forward

  • summarize (Boolean) (defaults to: true)

    When true (default) generates the context summary synchronously.
    Pass false for a fast, redirect-friendly fork — the worker will generate the summary
    before its first LLM call via +generate_parent_summary!+.

Returns:



460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
# File 'app/services/assistant/context_compactor.rb', line 460

def self.fork_continuation!(conversation, pending_user_message: nil, tool_services: [], summarize: true)
  if fork_depth(conversation) >= MAX_FORK_DEPTH
    Rails.logger.warn(
      "[ContextCompactor] Fork depth limit (#{MAX_FORK_DEPTH}) reached for conversation #{conversation.id} — refusing to fork"
    )
    return nil
  end

  summary = fork_continuation_summary(conversation, summarize: summarize)
  title_base = conversation.title.to_s.truncate(44)

  continuation = AssistantConversation.new(
    user_id: conversation.user_id,
    parent_conversation_id: conversation.id,
    title: "#{title_base} (cont.)"
  )
  continuation.parent_conversation_summary = summary if summary.present?
  continuation.tool_services = Array(tool_services)
  continuation.save!

  Rails.logger.info do
    "[ContextCompactor] Forked conversation #{conversation.id}#{continuation.id} " \
      "(summary: #{summary ? "#{summary.length} chars" : 'deferred'}, pending: #{pending_user_message.present?})"
  end

  continuation
rescue StandardError => error
  Rails.logger.error("[ContextCompactor] fork_continuation! failed: #{error.message}")
  nil
end

.fork_from_message!(conversation, message_id:, summarize: true) ⇒ Array(AssistantConversation, String)?

Fork a new conversation branching from a specific user message.

Parameters:

  • conversation (AssistantConversation)

    Source conversation

  • message_id (Integer)

    ID of the AssistantMessage to branch from (must be role 'user')

  • summarize (Boolean) (defaults to: true)

    When true (default) generates the context summary synchronously.
    Pass false for a fast, redirect-friendly fork — the worker will generate the summary
    before its first LLM call via +generate_parent_summary!+.

Returns:



507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
# File 'app/services/assistant/context_compactor.rb', line 507

def self.fork_from_message!(conversation, message_id:, summarize: true)
  fork_msg = conversation.assistant_messages
    .where(role: 'user')
    .find_by(id: message_id)
  return nil unless fork_msg

  summary = nil
  if summarize
    prior_msgs = conversation.assistant_messages
      .where.not(role: 'system')
      .where('id < ?', fork_msg.id)
      .order(:id)
      .pluck(:id, :role, :content)
    summary = prior_msgs.any? ? generate_conversation_summary(prior_msgs, conversation: conversation) : nil
  end

  title_base = conversation.title.to_s.truncate(44)

  fork_convo = AssistantConversation.new(
    user_id:               conversation.user_id,
    parent_conversation_id: conversation.id,
    title:                 "#{title_base} (fork)"
  )
  fork_convo.parent_conversation_summary = summary if summary.present?
  fork_convo.tool_services = Array(conversation.tool_services)
  # Store the fork-point message ID so the worker can summarise the right
  # slice of the parent conversation when summarize: false is used.
  fork_convo. = fork_convo..merge('fork_message_id' => fork_msg.id)
  fork_convo.save!

  Rails.logger.info do
    "[ContextCompactor] Forked conversation #{conversation.id} at message #{fork_msg.id} " \
      "#{fork_convo.id} (prior_summary: #{summary ? "#{summary.length} chars" : 'deferred'})"
  end

  [fork_convo, fork_msg.content.to_s]
rescue StandardError => error
  Rails.logger.error("[ContextCompactor] fork_from_message! failed: #{error.message}")
  nil
end

.generate_parent_summary!(conversation) ⇒ String?

Generate and persist the context summary for a forked conversation whose summary
was deferred at fork time (fork_from_message! with summarize: false).

Reads prior messages from the parent conversation up to the stored fork_message_id,
generates the LLM summary, and persists it directly via a JSONB merge so concurrent
writes (e.g. the AssistantChatWorker token-count sync) cannot clobber it.

Safe to call multiple times — no-op if the summary already exists.

Parameters:

Returns:

  • (String, nil)

    The generated summary, or nil if skipped / failed



559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
# File 'app/services/assistant/context_compactor.rb', line 559

def self.generate_parent_summary!(conversation)
  return nil if conversation.parent_conversation_summary.present?

  parent = AssistantConversation.find_by(id: conversation.parent_conversation_id)
  return nil unless parent

  fork_message_id = conversation.&.dig('fork_message_id')

  prior_msgs = parent.assistant_messages
    .where.not(role: 'system')
    .then { |scope| fork_message_id ? scope.where('id < ?', fork_message_id) : scope }
    .order(:id)
    .pluck(:id, :role, :content)

  return nil if prior_msgs.empty?

  summary = generate_conversation_summary(prior_msgs, conversation: parent)
  return nil if summary.blank?

  # Persist via JSONB merge to avoid racing with worker token-count updates.
  AssistantConversation.where(id: conversation.id)
    .update_all(["metadata = metadata || ?::jsonb", { parent_conversation_summary: summary }.to_json])

  # Keep the in-memory record consistent so the caller's system prompt uses the value.
  # MUST use the accessor setter — jsonb_accessor backs fields with virtual attributes,
  # so mutating the raw hash does not update the getter.
  conversation.parent_conversation_summary = summary

  Rails.logger.info do
    "[ContextCompactor] Generated deferred parent summary for conversation #{conversation.id} " \
      "(#{summary.length} chars)"
  end

  summary
rescue StandardError => error
  Rails.logger.warn("[ContextCompactor] generate_parent_summary! failed: #{error.message}")
  nil
end