diff --git a/lib/gitlab/cleanup/orphan_job_artifact_final_objects/process_list.rb b/lib/gitlab/cleanup/orphan_job_artifact_final_objects/process_list.rb index 49ef74c8e2049ce74a1196643a2c6b58c26634ad..ffc46b38f51f86474d0e4377bbfe9c2d7f57108c 100644 --- a/lib/gitlab/cleanup/orphan_job_artifact_final_objects/process_list.rb +++ b/lib/gitlab/cleanup/orphan_job_artifact_final_objects/process_list.rb @@ -6,13 +6,14 @@ module OrphanJobArtifactFinalObjects class ProcessList BATCH_SIZE = Rails.env.development? ? 5 : 1000 DELETED_LIST_FILENAME_PREFIX = 'deleted_from--' - CURSOR_TRACKER_REDIS_KEY = 'orphan-job-artifact-objects-cleanup-cursor-tracker' + CURSOR_TRACKER_REDIS_KEY_PREFIX = 'orphan-job-artifact-objects-cleanup-cursor-tracker--' def initialize(filename: nil, force_restart: false, logger: Gitlab::AppLogger) @force_restart = force_restart @logger = logger @orphan_list_filename = filename || GenerateList::DEFAULT_FILENAME @deleted_list_filename = build_deleted_list_filename + @cursor_tracker_key = build_cursor_tracker_key end def run! @@ -36,7 +37,7 @@ def run! attr_reader :orphan_list_file, :orphan_list_filename, :deleted_list_file, :deleted_list_filename, - :force_restart, :logger + :cursor_tracker_key, :force_restart, :logger def build_deleted_list_filename dirname = File.dirname(orphan_list_filename) @@ -50,6 +51,10 @@ def build_deleted_list_filename ) end + def build_cursor_tracker_key + "#{CURSOR_TRACKER_REDIS_KEY_PREFIX}#{File.basename(orphan_list_filename)}" + end + def initialize_files @orphan_list_file = File.open(orphan_list_filename, 'r') @@ -88,10 +93,10 @@ def resume_from_last_cursor_position def get_last_cursor_position Gitlab::Redis::SharedState.with do |redis| - position = redis.get(CURSOR_TRACKER_REDIS_KEY) + position = redis.get(cursor_tracker_key) if position - log_info("Resuming from last cursor position: #{position}") + log_info("Resuming from last cursor position tracked in #{cursor_tracker_key}: #{position}") else log_info("No last cursor position found, starting from beginning.") end @@ -103,14 +108,14 @@ def get_last_cursor_position def save_current_cursor_position(position) Gitlab::Redis::SharedState.with do |redis| # Set TTL to 1 week (86400 * 7 seconds) - redis.set(CURSOR_TRACKER_REDIS_KEY, position, ex: 604_800) + redis.set(cursor_tracker_key, position, ex: 604_800) log_info("Saved current cursor position: #{position}") end end def clear_last_cursor_position Gitlab::Redis::SharedState.with do |redis| - redis.del(CURSOR_TRACKER_REDIS_KEY) + redis.del(cursor_tracker_key) end end diff --git a/lib/gitlab/cleanup/orphan_job_artifact_final_objects/rollback_deleted_objects.rb b/lib/gitlab/cleanup/orphan_job_artifact_final_objects/rollback_deleted_objects.rb index ca47c5a545b56f9603ccd63d6bc545f0fb9000a4..098d2c9e931079cef4fa5bce0cf20aee6a09bef9 100644 --- a/lib/gitlab/cleanup/orphan_job_artifact_final_objects/rollback_deleted_objects.rb +++ b/lib/gitlab/cleanup/orphan_job_artifact_final_objects/rollback_deleted_objects.rb @@ -15,12 +15,13 @@ class RollbackDeletedObjects GenerateList::DEFAULT_FILENAME ].join.freeze - CURSOR_TRACKER_REDIS_KEY = 'orphan-job-artifact-objects-cleanup-rollback-cursor-tracker' + CURSOR_TRACKER_REDIS_KEY_PREFIX = 'orphan-job-artifact-objects-cleanup-rollback-cursor-tracker--' def initialize(filename: nil, force_restart: false, logger: Gitlab::AppLogger) @force_restart = force_restart @logger = logger @filename = filename || DEFAULT_DELETED_LIST_FILENAME + @cursor_tracker_key = build_cursor_tracker_key end def run! @@ -41,7 +42,11 @@ def run! private - attr_reader :file, :filename, :force_restart, :logger + attr_reader :file, :filename, :force_restart, :logger, :cursor_tracker_key + + def build_cursor_tracker_key + "#{CURSOR_TRACKER_REDIS_KEY_PREFIX}#{File.basename(filename)}" + end def ensure_supported_provider! return if configuration.connection.provider.downcase == GOOGLE_PROVIDER @@ -103,10 +108,10 @@ def resume_from_last_cursor_position def get_last_cursor_position Gitlab::Redis::SharedState.with do |redis| - position = redis.get(CURSOR_TRACKER_REDIS_KEY) + position = redis.get(cursor_tracker_key) if position - log_info("Resuming from last cursor position: #{position}") + log_info("Resuming from last cursor position tracked in #{cursor_tracker_key}: #{position}") else log_info("No last cursor position found, starting from beginning.") end @@ -118,14 +123,14 @@ def get_last_cursor_position def save_current_cursor_position(position) Gitlab::Redis::SharedState.with do |redis| # Set TTL to 1 week (86400 * 7 seconds) - redis.set(CURSOR_TRACKER_REDIS_KEY, position, ex: 604_800) + redis.set(cursor_tracker_key, position, ex: 604_800) log_info("Saved current cursor position: #{position}") end end def clear_last_cursor_position Gitlab::Redis::SharedState.with do |redis| - redis.del(CURSOR_TRACKER_REDIS_KEY) + redis.del(cursor_tracker_key) end end diff --git a/spec/lib/gitlab/cleanup/orphan_job_artifact_final_objects/process_list_spec.rb b/spec/lib/gitlab/cleanup/orphan_job_artifact_final_objects/process_list_spec.rb index b145555ef366f2bee48d91cff210620bbdede55e..e6c3270b86c644bbc0d6b139cef787cdc8006cb4 100644 --- a/spec/lib/gitlab/cleanup/orphan_job_artifact_final_objects/process_list_spec.rb +++ b/spec/lib/gitlab/cleanup/orphan_job_artifact_final_objects/process_list_spec.rb @@ -176,7 +176,7 @@ orphan_final_object_2 ]) - saved_cursor_position = fetch_saved_cursor_position + saved_cursor_position = fetch_saved_cursor_position(orphan_list_filename) new_processor = described_class.new( force_restart: false, @@ -185,7 +185,7 @@ new_processor.run! - expect_resuming_from_cursor_position_log_message(saved_cursor_position) + expect_resuming_from_cursor_position_log_message(orphan_list_filename, saved_cursor_position) expect_deleted_object_log_message(orphan_final_object_3) expect_deleted_object_log_message(orphan_final_object_4) expect_skipping_object_with_job_artifact_record_log_message(non_orphan_final_object) @@ -234,6 +234,127 @@ ]) end end + + context 'and there are multiple processes with different orphan list files interrupted at the same time' do + let(:orphan_list_filename_2) { 'orphan_objects_2.csv' } + let(:deleted_list_filename_2) { "#{described_class::DELETED_LIST_FILENAME_PREFIX}#{orphan_list_filename_2}" } + + let(:processor_2) do + described_class.new( + force_restart: false, + filename: orphan_list_filename_2 + ) + end + + let(:orphan_final_object_5) { create_fog_file } + let(:orphan_final_object_6) { create_fog_file } + let(:orphan_final_object_7) { create_fog_file } + let(:orphan_final_object_8) { create_fog_file } + + before do + File.open(orphan_list_filename_2, 'a') do |file| + file.puts([orphan_final_object_5.key, orphan_final_object_5.content_length].join(',')) + file.puts([orphan_final_object_6.key, orphan_final_object_6.content_length].join(',')) + file.puts([orphan_final_object_7.key, orphan_final_object_7.content_length].join(',')) + file.puts([orphan_final_object_8.key, orphan_final_object_8.content_length].join(',')) + end + + loop_counter = 0 + + allow(processor_2).to receive(:orphans_from_batch).and_wrap_original do |m, *args| + raise dummy_error if loop_counter == 1 + + loop_counter += 1 + m.call(*args) + end + end + + after do + File.delete(orphan_list_filename_2) if File.file?(orphan_list_filename_2) + File.delete(deleted_list_filename_2) if File.file?(deleted_list_filename_2) + end + + it 'resumes from last known cursor position on the next run for each file', :aggregate_failures do + expect { processor.run! }.to raise_error(dummy_error) + + # we have a batch size of 2 here, so we expect only the first 2 lines + # from the CSV has been processed before it got interrupted + expect_deleted_object_log_message(orphan_final_object_1) + expect_object_to_be_deleted(orphan_final_object_1) + expect_deleted_object_log_message(orphan_final_object_2) + expect_object_to_be_deleted(orphan_final_object_2) + expect_object_to_exist(orphan_final_object_3) + expect_object_to_exist(orphan_final_object_4) + + expect_deleted_list_to_contain_exactly(deleted_list_filename, [ + orphan_final_object_1, + orphan_final_object_2 + ]) + + expect { processor_2.run! }.to raise_error(dummy_error) + + # we have a batch size of 2 here, so we expect only the first 2 lines + # from the CSV has been processed before it got interrupted + expect_deleted_object_log_message(orphan_final_object_5) + expect_object_to_be_deleted(orphan_final_object_5) + expect_deleted_object_log_message(orphan_final_object_6) + expect_object_to_be_deleted(orphan_final_object_6) + expect_object_to_exist(orphan_final_object_7) + expect_object_to_exist(orphan_final_object_8) + + expect_deleted_list_to_contain_exactly(deleted_list_filename_2, [ + orphan_final_object_5, + orphan_final_object_6 + ]) + + saved_cursor_position = fetch_saved_cursor_position(orphan_list_filename) + + new_processor = described_class.new( + force_restart: false, + filename: orphan_list_filename + ) + + new_processor.run! + + expect_resuming_from_cursor_position_log_message(orphan_list_filename, saved_cursor_position) + expect_deleted_object_log_message(orphan_final_object_3) + expect_deleted_object_log_message(orphan_final_object_4) + expect_skipping_object_with_job_artifact_record_log_message(non_orphan_final_object) + + expect_object_to_be_deleted(orphan_final_object_3) + expect_object_to_be_deleted(orphan_final_object_4) + expect_object_to_exist(non_orphan_final_object) + + expect_deleted_list_to_contain_exactly(deleted_list_filename, [ + orphan_final_object_1, + orphan_final_object_2, + orphan_final_object_3, + orphan_final_object_4 + ]) + + saved_cursor_position_2 = fetch_saved_cursor_position(orphan_list_filename_2) + + new_processor_2 = described_class.new( + force_restart: false, + filename: orphan_list_filename_2 + ) + + new_processor_2.run! + + expect_resuming_from_cursor_position_log_message(orphan_list_filename_2, saved_cursor_position_2) + expect_deleted_object_log_message(orphan_final_object_7) + expect_deleted_object_log_message(orphan_final_object_8) + expect_object_to_be_deleted(orphan_final_object_7) + expect_object_to_be_deleted(orphan_final_object_8) + + expect_deleted_list_to_contain_exactly(deleted_list_filename_2, [ + orphan_final_object_5, + orphan_final_object_6, + orphan_final_object_7, + orphan_final_object_8 + ]) + end + end end end diff --git a/spec/lib/gitlab/cleanup/orphan_job_artifact_final_objects/rollback_deleted_objects_spec.rb b/spec/lib/gitlab/cleanup/orphan_job_artifact_final_objects/rollback_deleted_objects_spec.rb index e12fbb3d09bf57273b934aee8b6d9785ddc3ef04..137c3973003deedf963639fd5c764750965517e2 100644 --- a/spec/lib/gitlab/cleanup/orphan_job_artifact_final_objects/rollback_deleted_objects_spec.rb +++ b/spec/lib/gitlab/cleanup/orphan_job_artifact_final_objects/rollback_deleted_objects_spec.rb @@ -102,7 +102,7 @@ expect_rolled_back_deleted_object_log_message(deleted_object_1) - saved_cursor_position = fetch_saved_cursor_position + saved_cursor_position = fetch_saved_cursor_position(deleted_list_filename) new_processor = described_class.new( force_restart: false, @@ -121,7 +121,7 @@ new_processor.run! - expect_resuming_from_cursor_position_log_message(saved_cursor_position) + expect_resuming_from_cursor_position_log_message(deleted_list_filename, saved_cursor_position) expect_rolled_back_deleted_object_log_message(deleted_object_2) expect_rolled_back_deleted_object_log_message(deleted_object_3) expect_done_rolling_back_deletion_log_message(deleted_list_filename) diff --git a/spec/support/helpers/orphan_final_artifacts_cleanup_helpers.rb b/spec/support/helpers/orphan_final_artifacts_cleanup_helpers.rb index c2df359e02ebeec761e233541e72d131315eca84..ce67cf0e6b6d9cc72dc937b5f77ad815f49cf31d 100644 --- a/spec/support/helpers/orphan_final_artifacts_cleanup_helpers.rb +++ b/spec/support/helpers/orphan_final_artifacts_cleanup_helpers.rb @@ -59,8 +59,11 @@ def expect_resuming_from_marker_log_message(marker) expect_log_message("Resuming from last page marker: #{marker}", times: 1) end - def expect_resuming_from_cursor_position_log_message(position) - expect_log_message("Resuming from last cursor position: #{position}", times: 1) + def expect_resuming_from_cursor_position_log_message(filename, position) + expect_log_message( + "Resuming from last cursor position tracked in #{cursor_tracker_redis_key(filename)}: #{position}", + times: 1 + ) end def expect_no_resuming_from_marker_log_message @@ -176,9 +179,13 @@ def fetch_saved_marker end end - def fetch_saved_cursor_position + def fetch_saved_cursor_position(filename) Gitlab::Redis::SharedState.with do |redis| - redis.get(described_class::CURSOR_TRACKER_REDIS_KEY) + redis.get(cursor_tracker_redis_key(filename)) end end + + def cursor_tracker_redis_key(filename) + "#{described_class::CURSOR_TRACKER_REDIS_KEY_PREFIX}#{File.basename(filename)}" + end end