Class: CopyVersionsToPartitionedWorker
- Inherits:
-
Object
- Object
- CopyVersionsToPartitionedWorker
- Includes:
- Sidekiq::Job
- Defined in:
- app/workers/copy_versions_to_partitioned_worker.rb
Overview
Background worker for the zero-downtime versions table partitioning.
Copies rows from versions into versions_partitioned in batches of 50,000,
ordered by id. Self-re-enqueues after each batch until all rows are copied.
Runs in the :low queue to avoid impacting production workloads.
Enqueued by: db/versions_migrate/20260305020000_partition_versions_by_year_setup.rb
Final swap: db/versions_migrate/20260305030000_partition_versions_by_year_swap.rb
Monitor progress (run in rails console with versions DB):
RecordVersionBase.connection.select_value("SELECT COUNT() FROM versions_partitioned")
RecordVersionBase.connection.select_value("SELECT COUNT() FROM versions")
Re-enqueue manually if worker dies:
last_id = RecordVersionBase.connection.select_value("SELECT MAX(id) FROM versions_partitioned").to_i
CopyVersionsToPartitionedWorker.perform_async(last_id)
See: doc/tasks/20260305_PARTITION_LARGE_TABLES_BY_DATE.md
Constant Summary collapse
- BATCH_SIZE =
50_000
Instance Method Summary collapse
Instance Method Details
#perform(min_id = 0) ⇒ Object
29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 |
# File 'app/workers/copy_versions_to_partitioned_worker.rb', line 29 def perform(min_id = 0) conn = RecordVersionBase.connection # Guard: do nothing if versions_partitioned doesn't exist yet unless conn.table_exists?(:versions_partitioned) Rails.logger.warn "CopyVersionsToPartitioned: versions_partitioned does not exist, aborting" return end # Find the id range for this batch batch_result = conn.select_one(<<~SQL) SELECT MIN(id) AS min_batch_id, MAX(id) AS max_batch_id FROM ( SELECT id FROM versions WHERE id > #{min_id.to_i} ORDER BY id LIMIT #{BATCH_SIZE} ) t SQL max_batch_id = batch_result["max_batch_id"] # No more rows — copy is complete if max_batch_id.nil? Rails.logger.info "CopyVersionsToPartitioned: all rows copied (stopped at min_id=#{min_id})" return end max_batch_id = max_batch_id.to_i min_batch_id = batch_result["min_batch_id"].to_i # Copy this batch — ON CONFLICT DO NOTHING makes this idempotent rows_inserted = conn.execute(<<~SQL).cmd_tuples INSERT INTO versions_partitioned (id, item_type, item_id, event, whodunnit, created_at, ip, url, object_changes, reference_data) SELECT id, item_type, item_id, event, whodunnit, created_at, ip, url, object_changes, reference_data FROM versions WHERE id >= #{min_batch_id} AND id <= #{max_batch_id} ON CONFLICT (id, created_at) DO NOTHING SQL Rails.logger.info "CopyVersionsToPartitioned: copied #{rows_inserted} rows (ids #{min_batch_id}–#{max_batch_id})" # Enqueue the next batch self.class.perform_async(max_batch_id) end |