Class: CopyVersionsToPartitionedWorker

Inherits:
Object
  • Object
show all
Includes:
Sidekiq::Job
Defined in:
app/workers/copy_versions_to_partitioned_worker.rb

Overview

Background worker for the zero-downtime versions table partitioning.

Copies rows from versions into versions_partitioned in batches of 50,000,
ordered by id. Self-re-enqueues after each batch until all rows are copied.
Runs in the :low queue to avoid impacting production workloads.

Enqueued by: db/versions_migrate/20260305020000_partition_versions_by_year_setup.rb
Final swap: db/versions_migrate/20260305030000_partition_versions_by_year_swap.rb

Monitor progress (run in rails console with versions DB):
RecordVersionBase.connection.select_value("SELECT COUNT() FROM versions_partitioned")
RecordVersionBase.connection.select_value("SELECT COUNT(
) FROM versions")

Re-enqueue manually if worker dies:
last_id = RecordVersionBase.connection.select_value("SELECT MAX(id) FROM versions_partitioned").to_i
CopyVersionsToPartitionedWorker.perform_async(last_id)

See: doc/tasks/20260305_PARTITION_LARGE_TABLES_BY_DATE.md

Constant Summary collapse

BATCH_SIZE =
50_000

Instance Method Summary collapse

Instance Method Details

#perform(min_id = 0) ⇒ Object

Parameters:

  • min_id (Integer) (defaults to: 0)

    Copy rows with id > min_id. Pass 0 to start from the beginning.



29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
# File 'app/workers/copy_versions_to_partitioned_worker.rb', line 29

def perform(min_id = 0)
  conn = RecordVersionBase.connection

  # Guard: do nothing if versions_partitioned doesn't exist yet
  unless conn.table_exists?(:versions_partitioned)
    Rails.logger.warn "CopyVersionsToPartitioned: versions_partitioned does not exist, aborting"
    return
  end

  # Find the id range for this batch
  batch_result = conn.select_one(<<~SQL)
    SELECT MIN(id) AS min_batch_id, MAX(id) AS max_batch_id
    FROM (
      SELECT id FROM versions
      WHERE id > #{min_id.to_i}
      ORDER BY id
      LIMIT #{BATCH_SIZE}
    ) t
  SQL

  max_batch_id = batch_result["max_batch_id"]

  # No more rows — copy is complete
  if max_batch_id.nil?
    Rails.logger.info "CopyVersionsToPartitioned: all rows copied (stopped at min_id=#{min_id})"
    return
  end

  max_batch_id = max_batch_id.to_i
  min_batch_id = batch_result["min_batch_id"].to_i

  # Copy this batch — ON CONFLICT DO NOTHING makes this idempotent
  rows_inserted = conn.execute(<<~SQL).cmd_tuples
    INSERT INTO versions_partitioned
      (id, item_type, item_id, event, whodunnit, created_at, ip, url, object_changes, reference_data)
    SELECT
      id, item_type, item_id, event, whodunnit, created_at, ip, url, object_changes, reference_data
    FROM versions
    WHERE id >= #{min_batch_id} AND id <= #{max_batch_id}
    ON CONFLICT (id, created_at) DO NOTHING
  SQL

  Rails.logger.info "CopyVersionsToPartitioned: copied #{rows_inserted} rows (ids #{min_batch_id}#{max_batch_id})"

  # Enqueue the next batch
  self.class.perform_async(max_batch_id)
end