Skip to content
代码片段 群组 项目
提交 694ed7a5 编辑于 作者: Luke Duncalfe's avatar Luke Duncalfe
浏览文件

Merge branch...

Merge branch '416777-increase-max_retries_after_interruption-for-github-import-stages' into 'master' 

Increase interruption retries for GitHub workers

See merge request https://gitlab.com/gitlab-org/gitlab/-/merge_requests/134949



Merged-by: default avatarLuke Duncalfe <lduncalfe@gitlab.com>
Approved-by: default avatarMadelein van Niekerk <mvanniekerk@gitlab.com>
Reviewed-by: default avatarGeorge Koltsov <gkoltsov@gitlab.com>
Co-authored-by: default avatarMadelein van Niekerk <mvanniekerk@gitlab.com>
Co-authored-by: default avatarLuke Duncalfe <lduncalfe@eml.cc>
No related branches found
No related tags found
无相关合并请求
显示
97 个添加4 个删除
...@@ -152,7 +152,7 @@ PATH ...@@ -152,7 +152,7 @@ PATH
PATH PATH
remote: vendor/gems/sidekiq-reliable-fetch remote: vendor/gems/sidekiq-reliable-fetch
specs: specs:
gitlab-sidekiq-fetcher (0.9.0) gitlab-sidekiq-fetcher (0.10.0)
json (>= 2.5) json (>= 2.5)
sidekiq (~> 6.1) sidekiq (~> 6.1)
......
...@@ -5,6 +5,8 @@ module GithubImport ...@@ -5,6 +5,8 @@ module GithubImport
module StageMethods module StageMethods
extend ActiveSupport::Concern extend ActiveSupport::Concern
MAX_RETRIES_AFTER_INTERRUPTION = 20
included do included do
include ApplicationWorker include ApplicationWorker
...@@ -18,6 +20,29 @@ module StageMethods ...@@ -18,6 +20,29 @@ module StageMethods
end end
end end
class_methods do
# We can increase the number of times a GitHubImport::Stage worker is retried
# after being interrupted if the importer it executes can restart exactly
# from where it left off.
#
# It is not safe to call this method if the importer loops over its data from
# the beginning when restarted, even if it skips data that is already imported
# inside the loop, as there is a possibility the importer will never reach
# the end of the loop.
#
# Examples of stage workers that call this method are ones that execute services that:
#
# - Continue paging an endpoint from where it left off:
# https://gitlab.com/gitlab-org/gitlab/-/blob/487521cc/lib/gitlab/github_import/parallel_scheduling.rb#L114-117
# - Continue their loop from where it left off:
# https://gitlab.com/gitlab-org/gitlab/-/blob/024235ec/lib/gitlab/github_import/importer/pull_requests/review_requests_importer.rb#L15
def resumes_work_when_interrupted!
return unless Feature.enabled?(:github_importer_raise_max_interruptions)
sidekiq_options max_retries_after_interruption: MAX_RETRIES_AFTER_INTERRUPTION
end
end
# project_id - The ID of the GitLab project to import the data into. # project_id - The ID of the GitLab project to import the data into.
def perform(project_id) def perform(project_id)
info(project_id, message: 'starting stage') info(project_id, message: 'starting stage')
......
...@@ -11,6 +11,8 @@ class ImportAttachmentsWorker # rubocop:disable Scalability/IdempotentWorker ...@@ -11,6 +11,8 @@ class ImportAttachmentsWorker # rubocop:disable Scalability/IdempotentWorker
include GithubImport::Queue include GithubImport::Queue
include StageMethods include StageMethods
resumes_work_when_interrupted!
# client - An instance of Gitlab::GithubImport::Client. # client - An instance of Gitlab::GithubImport::Client.
# project - An instance of Project. # project - An instance of Project.
def import(client, project) def import(client, project)
......
...@@ -11,6 +11,8 @@ class ImportIssueEventsWorker # rubocop:disable Scalability/IdempotentWorker ...@@ -11,6 +11,8 @@ class ImportIssueEventsWorker # rubocop:disable Scalability/IdempotentWorker
include GithubImport::Queue include GithubImport::Queue
include StageMethods include StageMethods
resumes_work_when_interrupted!
# client - An instance of Gitlab::GithubImport::Client. # client - An instance of Gitlab::GithubImport::Client.
# project - An instance of Project. # project - An instance of Project.
def import(client, project) def import(client, project)
......
...@@ -11,6 +11,8 @@ class ImportIssuesAndDiffNotesWorker # rubocop:disable Scalability/IdempotentWor ...@@ -11,6 +11,8 @@ class ImportIssuesAndDiffNotesWorker # rubocop:disable Scalability/IdempotentWor
include GithubImport::Queue include GithubImport::Queue
include StageMethods include StageMethods
resumes_work_when_interrupted!
# client - An instance of Gitlab::GithubImport::Client. # client - An instance of Gitlab::GithubImport::Client.
# project - An instance of Project. # project - An instance of Project.
def import(client, project) def import(client, project)
......
...@@ -11,6 +11,11 @@ class ImportLfsObjectsWorker # rubocop:disable Scalability/IdempotentWorker ...@@ -11,6 +11,11 @@ class ImportLfsObjectsWorker # rubocop:disable Scalability/IdempotentWorker
include GithubImport::Queue include GithubImport::Queue
include StageMethods include StageMethods
# Importer::LfsObjectsImporter can resume work when interrupted as
# it uses Projects::LfsPointers::LfsObjectDownloadListService which excludes LFS objects that already exist.
# https://gitlab.com/gitlab-org/gitlab/-/blob/eabf0800/app/services/projects/lfs_pointers/lfs_object_download_list_service.rb#L69-71
resumes_work_when_interrupted!
def perform(project_id) def perform(project_id)
return unless (project = find_project(project_id)) return unless (project = find_project(project_id))
......
...@@ -11,6 +11,8 @@ class ImportNotesWorker # rubocop:disable Scalability/IdempotentWorker ...@@ -11,6 +11,8 @@ class ImportNotesWorker # rubocop:disable Scalability/IdempotentWorker
include GithubImport::Queue include GithubImport::Queue
include StageMethods include StageMethods
resumes_work_when_interrupted!
# client - An instance of Gitlab::GithubImport::Client. # client - An instance of Gitlab::GithubImport::Client.
# project - An instance of Project. # project - An instance of Project.
def import(client, project) def import(client, project)
......
...@@ -11,6 +11,8 @@ class ImportPullRequestsMergedByWorker # rubocop:disable Scalability/IdempotentW ...@@ -11,6 +11,8 @@ class ImportPullRequestsMergedByWorker # rubocop:disable Scalability/IdempotentW
include GithubImport::Queue include GithubImport::Queue
include StageMethods include StageMethods
resumes_work_when_interrupted!
# client - An instance of Gitlab::GithubImport::Client. # client - An instance of Gitlab::GithubImport::Client.
# project - An instance of Project. # project - An instance of Project.
def import(client, project) def import(client, project)
......
...@@ -11,6 +11,8 @@ class ImportPullRequestsReviewRequestsWorker # rubocop:disable Scalability/Idemp ...@@ -11,6 +11,8 @@ class ImportPullRequestsReviewRequestsWorker # rubocop:disable Scalability/Idemp
include GithubImport::Queue include GithubImport::Queue
include StageMethods include StageMethods
resumes_work_when_interrupted!
# client - An instance of Gitlab::GithubImport::Client. # client - An instance of Gitlab::GithubImport::Client.
# project - An instance of Project. # project - An instance of Project.
def import(client, project) def import(client, project)
......
...@@ -11,6 +11,8 @@ class ImportPullRequestsReviewsWorker # rubocop:disable Scalability/IdempotentWo ...@@ -11,6 +11,8 @@ class ImportPullRequestsReviewsWorker # rubocop:disable Scalability/IdempotentWo
include GithubImport::Queue include GithubImport::Queue
include StageMethods include StageMethods
resumes_work_when_interrupted!
# client - An instance of Gitlab::GithubImport::Client. # client - An instance of Gitlab::GithubImport::Client.
# project - An instance of Project. # project - An instance of Project.
def import(client, project) def import(client, project)
......
...@@ -11,6 +11,8 @@ class ImportPullRequestsWorker # rubocop:disable Scalability/IdempotentWorker ...@@ -11,6 +11,8 @@ class ImportPullRequestsWorker # rubocop:disable Scalability/IdempotentWorker
include GithubImport::Queue include GithubImport::Queue
include StageMethods include StageMethods
resumes_work_when_interrupted!
# client - An instance of Gitlab::GithubImport::Client. # client - An instance of Gitlab::GithubImport::Client.
# project - An instance of Project. # project - An instance of Project.
def import(client, project) def import(client, project)
......
---
name: github_importer_raise_max_interruptions
introduced_by_url: https://gitlab.com/gitlab-org/gitlab/-/merge_requests/134949
rollout_issue_url: https://gitlab.com/gitlab-org/gitlab/-/issues/429306
milestone: '16.6'
type: development
group: group::import and integrate
default_enabled: false
...@@ -185,4 +185,30 @@ def self.name ...@@ -185,4 +185,30 @@ def self.name
expect(worker.find_project(-1)).to be_nil expect(worker.find_project(-1)).to be_nil
end end
end end
describe '.resumes_work_when_interrupted!' do
subject(:sidekiq_options) { worker.class.sidekiq_options }
it 'does not set the `max_retries_after_interruption` if not called' do
is_expected.not_to have_key('max_retries_after_interruption')
end
it 'sets the `max_retries_after_interruption`' do
worker.class.resumes_work_when_interrupted!
is_expected.to include('max_retries_after_interruption' => 20)
end
context 'when the flag is disabled' do
before do
stub_feature_flags(github_importer_raise_max_interruptions: false)
end
it 'does not set `max_retries_after_interruption`' do
worker.class.resumes_work_when_interrupted!
is_expected.not_to have_key('max_retries_after_interruption')
end
end
end
end end
PATH PATH
remote: . remote: .
specs: specs:
gitlab-sidekiq-fetcher (0.9.0) gitlab-sidekiq-fetcher (0.10.0)
json (>= 2.5) json (>= 2.5)
sidekiq (~> 6.1) sidekiq (~> 6.1)
......
Gem::Specification.new do |s| Gem::Specification.new do |s|
s.name = 'gitlab-sidekiq-fetcher' s.name = 'gitlab-sidekiq-fetcher'
s.version = '0.9.0' s.version = '0.10.0'
s.authors = ['TEA', 'GitLab'] s.authors = ['TEA', 'GitLab']
s.email = 'valery@gitlab.com' s.email = 'valery@gitlab.com'
s.license = 'LGPL-3.0' s.license = 'LGPL-3.0'
......
...@@ -230,7 +230,7 @@ def max_retries_after_interruption(worker_class) ...@@ -230,7 +230,7 @@ def max_retries_after_interruption(worker_class)
max_retries_after_interruption = nil max_retries_after_interruption = nil
max_retries_after_interruption ||= begin max_retries_after_interruption ||= begin
Object.const_get(worker_class).sidekiq_options[:max_retries_after_interruption] Object.const_get(worker_class).sidekiq_options['max_retries_after_interruption']
rescue NameError rescue NameError
end end
......
...@@ -76,6 +76,19 @@ ...@@ -76,6 +76,19 @@
expect(queue2.size).to eq 1 expect(queue2.size).to eq 1
expect(Sidekiq::InterruptedSet.new.size).to eq 0 expect(Sidekiq::InterruptedSet.new.size).to eq 0
end end
it 'does not put jobs into interrupted queue if it is disabled on the worker' do
stub_const('Bob', double(sidekiq_options: { 'max_retries_after_interruption' => -1 }))
uow = described_class::UnitOfWork
interrupted_job = Sidekiq.dump_json(class: 'Bob', args: [1, 2, 'foo'], interrupted_count: 3)
jobs = [ uow.new('queue:foo', interrupted_job), uow.new('queue:foo', job), uow.new('queue:bar', job) ]
described_class.new(options).bulk_requeue(jobs, nil)
expect(queue1.size).to eq 2
expect(queue2.size).to eq 1
expect(Sidekiq::InterruptedSet.new.size).to eq 0
end
end end
it 'sets heartbeat' do it 'sets heartbeat' do
......
0% 加载中 .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册