Skip to content
代码片段 群组 项目
未验证 提交 a7fa0e32 编辑于 作者: Erick Bajao's avatar Erick Bajao 提交者: GitLab
浏览文件

Make cursor tracker redis key include filename

To support running multiple rake tasks in parallel based on separate
list files, we want to make the cursor tracker redis key include the
filename of the list it is processing. This way we will have a way to
resume correctly if ever one of the rake tasks gets interrupted.
上级 18a797e1
No related branches found
No related tags found
无相关合并请求
...@@ -6,13 +6,14 @@ module OrphanJobArtifactFinalObjects ...@@ -6,13 +6,14 @@ module OrphanJobArtifactFinalObjects
class ProcessList class ProcessList
BATCH_SIZE = Rails.env.development? ? 5 : 1000 BATCH_SIZE = Rails.env.development? ? 5 : 1000
DELETED_LIST_FILENAME_PREFIX = 'deleted_from--' DELETED_LIST_FILENAME_PREFIX = 'deleted_from--'
CURSOR_TRACKER_REDIS_KEY = 'orphan-job-artifact-objects-cleanup-cursor-tracker' CURSOR_TRACKER_REDIS_KEY_PREFIX = 'orphan-job-artifact-objects-cleanup-cursor-tracker--'
def initialize(filename: nil, force_restart: false, logger: Gitlab::AppLogger) def initialize(filename: nil, force_restart: false, logger: Gitlab::AppLogger)
@force_restart = force_restart @force_restart = force_restart
@logger = logger @logger = logger
@orphan_list_filename = filename || GenerateList::DEFAULT_FILENAME @orphan_list_filename = filename || GenerateList::DEFAULT_FILENAME
@deleted_list_filename = build_deleted_list_filename @deleted_list_filename = build_deleted_list_filename
@cursor_tracker_key = build_cursor_tracker_key
end end
def run! def run!
...@@ -36,7 +37,7 @@ def run! ...@@ -36,7 +37,7 @@ def run!
attr_reader :orphan_list_file, :orphan_list_filename, attr_reader :orphan_list_file, :orphan_list_filename,
:deleted_list_file, :deleted_list_filename, :deleted_list_file, :deleted_list_filename,
:force_restart, :logger :cursor_tracker_key, :force_restart, :logger
def build_deleted_list_filename def build_deleted_list_filename
dirname = File.dirname(orphan_list_filename) dirname = File.dirname(orphan_list_filename)
...@@ -50,6 +51,10 @@ def build_deleted_list_filename ...@@ -50,6 +51,10 @@ def build_deleted_list_filename
) )
end end
def build_cursor_tracker_key
"#{CURSOR_TRACKER_REDIS_KEY_PREFIX}#{File.basename(orphan_list_filename)}"
end
def initialize_files def initialize_files
@orphan_list_file = File.open(orphan_list_filename, 'r') @orphan_list_file = File.open(orphan_list_filename, 'r')
...@@ -88,10 +93,10 @@ def resume_from_last_cursor_position ...@@ -88,10 +93,10 @@ def resume_from_last_cursor_position
def get_last_cursor_position def get_last_cursor_position
Gitlab::Redis::SharedState.with do |redis| Gitlab::Redis::SharedState.with do |redis|
position = redis.get(CURSOR_TRACKER_REDIS_KEY) position = redis.get(cursor_tracker_key)
if position if position
log_info("Resuming from last cursor position: #{position}") log_info("Resuming from last cursor position tracked in #{cursor_tracker_key}: #{position}")
else else
log_info("No last cursor position found, starting from beginning.") log_info("No last cursor position found, starting from beginning.")
end end
...@@ -103,14 +108,14 @@ def get_last_cursor_position ...@@ -103,14 +108,14 @@ def get_last_cursor_position
def save_current_cursor_position(position) def save_current_cursor_position(position)
Gitlab::Redis::SharedState.with do |redis| Gitlab::Redis::SharedState.with do |redis|
# Set TTL to 1 week (86400 * 7 seconds) # Set TTL to 1 week (86400 * 7 seconds)
redis.set(CURSOR_TRACKER_REDIS_KEY, position, ex: 604_800) redis.set(cursor_tracker_key, position, ex: 604_800)
log_info("Saved current cursor position: #{position}") log_info("Saved current cursor position: #{position}")
end end
end end
def clear_last_cursor_position def clear_last_cursor_position
Gitlab::Redis::SharedState.with do |redis| Gitlab::Redis::SharedState.with do |redis|
redis.del(CURSOR_TRACKER_REDIS_KEY) redis.del(cursor_tracker_key)
end end
end end
......
...@@ -15,12 +15,13 @@ class RollbackDeletedObjects ...@@ -15,12 +15,13 @@ class RollbackDeletedObjects
GenerateList::DEFAULT_FILENAME GenerateList::DEFAULT_FILENAME
].join.freeze ].join.freeze
CURSOR_TRACKER_REDIS_KEY = 'orphan-job-artifact-objects-cleanup-rollback-cursor-tracker' CURSOR_TRACKER_REDIS_KEY_PREFIX = 'orphan-job-artifact-objects-cleanup-rollback-cursor-tracker--'
def initialize(filename: nil, force_restart: false, logger: Gitlab::AppLogger) def initialize(filename: nil, force_restart: false, logger: Gitlab::AppLogger)
@force_restart = force_restart @force_restart = force_restart
@logger = logger @logger = logger
@filename = filename || DEFAULT_DELETED_LIST_FILENAME @filename = filename || DEFAULT_DELETED_LIST_FILENAME
@cursor_tracker_key = build_cursor_tracker_key
end end
def run! def run!
...@@ -41,7 +42,11 @@ def run! ...@@ -41,7 +42,11 @@ def run!
private private
attr_reader :file, :filename, :force_restart, :logger attr_reader :file, :filename, :force_restart, :logger, :cursor_tracker_key
def build_cursor_tracker_key
"#{CURSOR_TRACKER_REDIS_KEY_PREFIX}#{File.basename(filename)}"
end
def ensure_supported_provider! def ensure_supported_provider!
return if configuration.connection.provider.downcase == GOOGLE_PROVIDER return if configuration.connection.provider.downcase == GOOGLE_PROVIDER
...@@ -103,10 +108,10 @@ def resume_from_last_cursor_position ...@@ -103,10 +108,10 @@ def resume_from_last_cursor_position
def get_last_cursor_position def get_last_cursor_position
Gitlab::Redis::SharedState.with do |redis| Gitlab::Redis::SharedState.with do |redis|
position = redis.get(CURSOR_TRACKER_REDIS_KEY) position = redis.get(cursor_tracker_key)
if position if position
log_info("Resuming from last cursor position: #{position}") log_info("Resuming from last cursor position tracked in #{cursor_tracker_key}: #{position}")
else else
log_info("No last cursor position found, starting from beginning.") log_info("No last cursor position found, starting from beginning.")
end end
...@@ -118,14 +123,14 @@ def get_last_cursor_position ...@@ -118,14 +123,14 @@ def get_last_cursor_position
def save_current_cursor_position(position) def save_current_cursor_position(position)
Gitlab::Redis::SharedState.with do |redis| Gitlab::Redis::SharedState.with do |redis|
# Set TTL to 1 week (86400 * 7 seconds) # Set TTL to 1 week (86400 * 7 seconds)
redis.set(CURSOR_TRACKER_REDIS_KEY, position, ex: 604_800) redis.set(cursor_tracker_key, position, ex: 604_800)
log_info("Saved current cursor position: #{position}") log_info("Saved current cursor position: #{position}")
end end
end end
def clear_last_cursor_position def clear_last_cursor_position
Gitlab::Redis::SharedState.with do |redis| Gitlab::Redis::SharedState.with do |redis|
redis.del(CURSOR_TRACKER_REDIS_KEY) redis.del(cursor_tracker_key)
end end
end end
......
...@@ -176,7 +176,7 @@ ...@@ -176,7 +176,7 @@
orphan_final_object_2 orphan_final_object_2
]) ])
saved_cursor_position = fetch_saved_cursor_position saved_cursor_position = fetch_saved_cursor_position(orphan_list_filename)
new_processor = described_class.new( new_processor = described_class.new(
force_restart: false, force_restart: false,
...@@ -185,7 +185,7 @@ ...@@ -185,7 +185,7 @@
new_processor.run! new_processor.run!
expect_resuming_from_cursor_position_log_message(saved_cursor_position) expect_resuming_from_cursor_position_log_message(orphan_list_filename, saved_cursor_position)
expect_deleted_object_log_message(orphan_final_object_3) expect_deleted_object_log_message(orphan_final_object_3)
expect_deleted_object_log_message(orphan_final_object_4) expect_deleted_object_log_message(orphan_final_object_4)
expect_skipping_object_with_job_artifact_record_log_message(non_orphan_final_object) expect_skipping_object_with_job_artifact_record_log_message(non_orphan_final_object)
...@@ -234,6 +234,127 @@ ...@@ -234,6 +234,127 @@
]) ])
end end
end end
context 'and there are multiple processes with different orphan list files interrupted at the same time' do
let(:orphan_list_filename_2) { 'orphan_objects_2.csv' }
let(:deleted_list_filename_2) { "#{described_class::DELETED_LIST_FILENAME_PREFIX}#{orphan_list_filename_2}" }
let(:processor_2) do
described_class.new(
force_restart: false,
filename: orphan_list_filename_2
)
end
let(:orphan_final_object_5) { create_fog_file }
let(:orphan_final_object_6) { create_fog_file }
let(:orphan_final_object_7) { create_fog_file }
let(:orphan_final_object_8) { create_fog_file }
before do
File.open(orphan_list_filename_2, 'a') do |file|
file.puts([orphan_final_object_5.key, orphan_final_object_5.content_length].join(','))
file.puts([orphan_final_object_6.key, orphan_final_object_6.content_length].join(','))
file.puts([orphan_final_object_7.key, orphan_final_object_7.content_length].join(','))
file.puts([orphan_final_object_8.key, orphan_final_object_8.content_length].join(','))
end
loop_counter = 0
allow(processor_2).to receive(:orphans_from_batch).and_wrap_original do |m, *args|
raise dummy_error if loop_counter == 1
loop_counter += 1
m.call(*args)
end
end
after do
File.delete(orphan_list_filename_2) if File.file?(orphan_list_filename_2)
File.delete(deleted_list_filename_2) if File.file?(deleted_list_filename_2)
end
it 'resumes from last known cursor position on the next run for each file', :aggregate_failures do
expect { processor.run! }.to raise_error(dummy_error)
# we have a batch size of 2 here, so we expect only the first 2 lines
# from the CSV has been processed before it got interrupted
expect_deleted_object_log_message(orphan_final_object_1)
expect_object_to_be_deleted(orphan_final_object_1)
expect_deleted_object_log_message(orphan_final_object_2)
expect_object_to_be_deleted(orphan_final_object_2)
expect_object_to_exist(orphan_final_object_3)
expect_object_to_exist(orphan_final_object_4)
expect_deleted_list_to_contain_exactly(deleted_list_filename, [
orphan_final_object_1,
orphan_final_object_2
])
expect { processor_2.run! }.to raise_error(dummy_error)
# we have a batch size of 2 here, so we expect only the first 2 lines
# from the CSV has been processed before it got interrupted
expect_deleted_object_log_message(orphan_final_object_5)
expect_object_to_be_deleted(orphan_final_object_5)
expect_deleted_object_log_message(orphan_final_object_6)
expect_object_to_be_deleted(orphan_final_object_6)
expect_object_to_exist(orphan_final_object_7)
expect_object_to_exist(orphan_final_object_8)
expect_deleted_list_to_contain_exactly(deleted_list_filename_2, [
orphan_final_object_5,
orphan_final_object_6
])
saved_cursor_position = fetch_saved_cursor_position(orphan_list_filename)
new_processor = described_class.new(
force_restart: false,
filename: orphan_list_filename
)
new_processor.run!
expect_resuming_from_cursor_position_log_message(orphan_list_filename, saved_cursor_position)
expect_deleted_object_log_message(orphan_final_object_3)
expect_deleted_object_log_message(orphan_final_object_4)
expect_skipping_object_with_job_artifact_record_log_message(non_orphan_final_object)
expect_object_to_be_deleted(orphan_final_object_3)
expect_object_to_be_deleted(orphan_final_object_4)
expect_object_to_exist(non_orphan_final_object)
expect_deleted_list_to_contain_exactly(deleted_list_filename, [
orphan_final_object_1,
orphan_final_object_2,
orphan_final_object_3,
orphan_final_object_4
])
saved_cursor_position_2 = fetch_saved_cursor_position(orphan_list_filename_2)
new_processor_2 = described_class.new(
force_restart: false,
filename: orphan_list_filename_2
)
new_processor_2.run!
expect_resuming_from_cursor_position_log_message(orphan_list_filename_2, saved_cursor_position_2)
expect_deleted_object_log_message(orphan_final_object_7)
expect_deleted_object_log_message(orphan_final_object_8)
expect_object_to_be_deleted(orphan_final_object_7)
expect_object_to_be_deleted(orphan_final_object_8)
expect_deleted_list_to_contain_exactly(deleted_list_filename_2, [
orphan_final_object_5,
orphan_final_object_6,
orphan_final_object_7,
orphan_final_object_8
])
end
end
end end
end end
......
...@@ -102,7 +102,7 @@ ...@@ -102,7 +102,7 @@
expect_rolled_back_deleted_object_log_message(deleted_object_1) expect_rolled_back_deleted_object_log_message(deleted_object_1)
saved_cursor_position = fetch_saved_cursor_position saved_cursor_position = fetch_saved_cursor_position(deleted_list_filename)
new_processor = described_class.new( new_processor = described_class.new(
force_restart: false, force_restart: false,
...@@ -121,7 +121,7 @@ ...@@ -121,7 +121,7 @@
new_processor.run! new_processor.run!
expect_resuming_from_cursor_position_log_message(saved_cursor_position) expect_resuming_from_cursor_position_log_message(deleted_list_filename, saved_cursor_position)
expect_rolled_back_deleted_object_log_message(deleted_object_2) expect_rolled_back_deleted_object_log_message(deleted_object_2)
expect_rolled_back_deleted_object_log_message(deleted_object_3) expect_rolled_back_deleted_object_log_message(deleted_object_3)
expect_done_rolling_back_deletion_log_message(deleted_list_filename) expect_done_rolling_back_deletion_log_message(deleted_list_filename)
......
...@@ -59,8 +59,11 @@ def expect_resuming_from_marker_log_message(marker) ...@@ -59,8 +59,11 @@ def expect_resuming_from_marker_log_message(marker)
expect_log_message("Resuming from last page marker: #{marker}", times: 1) expect_log_message("Resuming from last page marker: #{marker}", times: 1)
end end
def expect_resuming_from_cursor_position_log_message(position) def expect_resuming_from_cursor_position_log_message(filename, position)
expect_log_message("Resuming from last cursor position: #{position}", times: 1) expect_log_message(
"Resuming from last cursor position tracked in #{cursor_tracker_redis_key(filename)}: #{position}",
times: 1
)
end end
def expect_no_resuming_from_marker_log_message def expect_no_resuming_from_marker_log_message
...@@ -176,9 +179,13 @@ def fetch_saved_marker ...@@ -176,9 +179,13 @@ def fetch_saved_marker
end end
end end
def fetch_saved_cursor_position def fetch_saved_cursor_position(filename)
Gitlab::Redis::SharedState.with do |redis| Gitlab::Redis::SharedState.with do |redis|
redis.get(described_class::CURSOR_TRACKER_REDIS_KEY) redis.get(cursor_tracker_redis_key(filename))
end end
end end
def cursor_tracker_redis_key(filename)
"#{described_class::CURSOR_TRACKER_REDIS_KEY_PREFIX}#{File.basename(filename)}"
end
end end
0% 加载中 .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册