Skip to content
代码片段 群组 项目
提交 346522e9 编辑于 作者: Rodrigo Tomonari's avatar Rodrigo Tomonari
浏览文件

Merge branch '424954-make-workers-idempotent' into 'master'

No related branches found
No related tags found
无相关合并请求
显示
223 个添加20 个删除
...@@ -2761,7 +2761,6 @@ RSpec/MissingFeatureCategory: ...@@ -2761,7 +2761,6 @@ RSpec/MissingFeatureCategory:
- 'spec/lib/bulk_imports/projects/pipelines/external_pull_requests_pipeline_spec.rb' - 'spec/lib/bulk_imports/projects/pipelines/external_pull_requests_pipeline_spec.rb'
- 'spec/lib/bulk_imports/projects/pipelines/issues_pipeline_spec.rb' - 'spec/lib/bulk_imports/projects/pipelines/issues_pipeline_spec.rb'
- 'spec/lib/bulk_imports/projects/pipelines/merge_requests_pipeline_spec.rb' - 'spec/lib/bulk_imports/projects/pipelines/merge_requests_pipeline_spec.rb'
- 'spec/lib/bulk_imports/projects/pipelines/pipeline_schedules_pipeline_spec.rb'
- 'spec/lib/bulk_imports/projects/pipelines/project_attributes_pipeline_spec.rb' - 'spec/lib/bulk_imports/projects/pipelines/project_attributes_pipeline_spec.rb'
- 'spec/lib/bulk_imports/projects/pipelines/project_feature_pipeline_spec.rb' - 'spec/lib/bulk_imports/projects/pipelines/project_feature_pipeline_spec.rb'
- 'spec/lib/bulk_imports/projects/pipelines/protected_branches_pipeline_spec.rb' - 'spec/lib/bulk_imports/projects/pipelines/protected_branches_pipeline_spec.rb'
......
---
name: bulk_import_idempotent_workers
introduced_by_url: https://gitlab.com/gitlab-org/gitlab/-/merge_requests/132702
rollout_issue_url: https://gitlab.com/gitlab-org/gitlab/-/issues/426480
milestone: '16.5'
type: development
group: group::import and integrate
default_enabled: false
...@@ -31,7 +31,7 @@ ...@@ -31,7 +31,7 @@
subject { described_class.new(context) } subject { described_class.new(context) }
describe '#run' do describe '#run', :clean_gitlab_redis_cache do
before do before do
allow(Dir).to receive(:mktmpdir).and_return(tmpdir) allow(Dir).to receive(:mktmpdir).and_return(tmpdir)
allow_next_instance_of(BulkImports::FileDownloadService) do |service| allow_next_instance_of(BulkImports::FileDownloadService) do |service|
......
...@@ -33,6 +33,15 @@ def load(context, data) ...@@ -33,6 +33,15 @@ def load(context, data)
end end
end end
def already_processed?(data, _)
values = Gitlab::Cache::Import::Caching.values_from_set(cache_key)
values.include?(OpenSSL::Digest::SHA256.hexdigest(data.to_s))
end
def save_processed_entry(data, _)
Gitlab::Cache::Import::Caching.set_add(cache_key, OpenSSL::Digest::SHA256.hexdigest(data.to_s))
end
private private
def group_badge?(data) def group_badge?(data)
......
...@@ -128,6 +128,15 @@ def relation_definition ...@@ -128,6 +128,15 @@ def relation_definition
import_export_config.top_relation_tree(relation) import_export_config.top_relation_tree(relation)
end end
def already_processed?(_, index)
last_index = Gitlab::Cache::Import::Caching.read(cache_key)
last_index && last_index.to_i >= index
end
def save_processed_entry(_, index)
Gitlab::Cache::Import::Caching.write(cache_key, index)
end
def capture_invalid_subrelations(invalid_subrelations) def capture_invalid_subrelations(invalid_subrelations)
invalid_subrelations.each do |record| invalid_subrelations.each do |record|
BulkImports::Failure.create( BulkImports::Failure.create(
......
...@@ -5,6 +5,8 @@ module Pipeline ...@@ -5,6 +5,8 @@ module Pipeline
class ExtractedData class ExtractedData
attr_reader :data attr_reader :data
delegate :each, :each_with_index, to: :data
def initialize(data: nil, page_info: {}) def initialize(data: nil, page_info: {})
@data = data.is_a?(Enumerator) ? data : Array.wrap(data) @data = data.is_a?(Enumerator) ? data : Array.wrap(data)
@page_info = page_info @page_info = page_info
...@@ -20,10 +22,6 @@ def has_next_page? ...@@ -20,10 +22,6 @@ def has_next_page?
def next_page def next_page
@page_info&.dig('next_page') @page_info&.dig('next_page')
end end
def each(&block)
data.each(&block)
end
end end
end end
end end
...@@ -15,7 +15,10 @@ def run ...@@ -15,7 +15,10 @@ def run
extracted_data = extracted_data_from extracted_data = extracted_data_from
if extracted_data if extracted_data
extracted_data.each do |entry| extracted_data.each_with_index do |entry, index|
raw_entry = entry.dup
next if Feature.enabled?(:bulk_import_idempotent_workers) && already_processed?(raw_entry, index)
transformers.each do |transformer| transformers.each do |transformer|
entry = run_pipeline_step(:transformer, transformer.class.name) do entry = run_pipeline_step(:transformer, transformer.class.name) do
transformer.transform(context, entry) transformer.transform(context, entry)
...@@ -25,6 +28,8 @@ def run ...@@ -25,6 +28,8 @@ def run
run_pipeline_step(:loader, loader.class.name) do run_pipeline_step(:loader, loader.class.name) do
loader.load(context, entry) loader.load(context, entry)
end end
save_processed_entry(raw_entry, index) if Feature.enabled?(:bulk_import_idempotent_workers)
end end
tracker.update!( tracker.update!(
...@@ -73,6 +78,19 @@ def extracted_data_from ...@@ -73,6 +78,19 @@ def extracted_data_from
end end
end end
def cache_key
batch_number = context.extra[:batch_number] || 0
"#{self.class.name.underscore}/#{tracker.bulk_import_entity_id}/#{batch_number}"
end
# Overridden by child pipelines with different caching strategies
def already_processed?(*)
false
end
def save_processed_entry(*); end
def after_run(extracted_data) def after_run(extracted_data)
run if extracted_data.has_next_page? run if extracted_data.has_next_page?
end end
......
...@@ -33,6 +33,24 @@ ...@@ -33,6 +33,24 @@
expect(badge.image_url).to eq(badge_data['image_url']) expect(badge.image_url).to eq(badge_data['image_url'])
end end
it 'skips already imported records' do
expect { pipeline.run }.to change(Badge, :count).by(2)
expect { pipeline.run }.to not_change(Badge, :count)
end
context 'with FF bulk_import_idempotent_workers disabled' do
before do
stub_feature_flags(bulk_import_idempotent_workers: false)
end
it 'creates duplicated badges' do
expect { pipeline.run }.to change(Badge, :count).by(2)
expect { pipeline.run }.to change(Badge, :count)
end
end
context 'when project entity' do context 'when project entity' do
let(:first_page) { extracted_data(has_next_page: true) } let(:first_page) { extracted_data(has_next_page: true) }
let(:last_page) { extracted_data(name: 'badge2', kind: 'project') } let(:last_page) { extracted_data(name: 'badge2', kind: 'project') }
......
...@@ -6,6 +6,8 @@ ...@@ -6,6 +6,8 @@
let_it_be(:group) { create(:group) } let_it_be(:group) { create(:group) }
let_it_be(:project) { create(:project) } let_it_be(:project) { create(:project) }
let_it_be(:user) { create(:user) } let_it_be(:user) { create(:user) }
let(:tracker) { instance_double(BulkImports::Tracker, bulk_import_entity_id: 1) }
let(:context) { instance_double(BulkImports::Pipeline::Context, tracker: tracker, extra: { batch_number: 1 }) }
let(:klass) do let(:klass) do
Class.new do Class.new do
...@@ -13,11 +15,12 @@ ...@@ -13,11 +15,12 @@
relation_name 'test' relation_name 'test'
attr_reader :portable, :current_user attr_reader :portable, :current_user, :context
def initialize(portable, user) def initialize(portable, user, context)
@portable = portable @portable = portable
@current_user = user @current_user = user
@context = context
end end
end end
end end
...@@ -26,12 +29,29 @@ def initialize(portable, user) ...@@ -26,12 +29,29 @@ def initialize(portable, user)
stub_const('NdjsonPipelineClass', klass) stub_const('NdjsonPipelineClass', klass)
end end
subject { NdjsonPipelineClass.new(group, user) } subject { NdjsonPipelineClass.new(group, user, context) }
it 'marks pipeline as ndjson' do it 'marks pipeline as ndjson' do
expect(NdjsonPipelineClass.file_extraction_pipeline?).to eq(true) expect(NdjsonPipelineClass.file_extraction_pipeline?).to eq(true)
end end
describe 'caching' do
it 'saves completed entry in cache' do
subject.save_processed_entry("entry", 10)
expected_cache_key = "ndjson_pipeline_class/1/1"
expect(Gitlab::Cache::Import::Caching.read(expected_cache_key)).to eq("10")
end
it 'identifies completed entries' do
subject.save_processed_entry("entry", 10)
expect(subject.already_processed?("entry", 11)).to be_falsy
expect(subject.already_processed?("entry", 10)).to be_truthy
expect(subject.already_processed?("entry", 9)).to be_truthy
end
end
describe '#deep_transform_relation!' do describe '#deep_transform_relation!' do
it 'transforms relation hash' do it 'transforms relation hash' do
transformed = subject.deep_transform_relation!({}, 'test', {}) do |key, hash| transformed = subject.deep_transform_relation!({}, 'test', {}) do |key, hash|
...@@ -238,7 +258,7 @@ def initialize(portable, user) ...@@ -238,7 +258,7 @@ def initialize(portable, user)
end end
context 'when portable is project' do context 'when portable is project' do
subject { NdjsonPipelineClass.new(project, user) } subject { NdjsonPipelineClass.new(project, user, context) }
it 'returns group relation name override' do it 'returns group relation name override' do
expect(subject.relation_key_override('labels')).to eq('project_labels') expect(subject.relation_key_override('labels')).to eq('project_labels')
...@@ -254,7 +274,7 @@ def initialize(portable, user) ...@@ -254,7 +274,7 @@ def initialize(portable, user)
end end
context 'when portable is project' do context 'when portable is project' do
subject { NdjsonPipelineClass.new(project, user) } subject { NdjsonPipelineClass.new(project, user, context) }
it 'returns project relation factory' do it 'returns project relation factory' do
expect(subject.relation_factory).to eq(Gitlab::ImportExport::Project::RelationFactory) expect(subject.relation_factory).to eq(Gitlab::ImportExport::Project::RelationFactory)
......
...@@ -50,4 +50,18 @@ ...@@ -50,4 +50,18 @@
end end
end end
end end
describe '#each_with_index' do
context 'when block is present' do
it 'yields each data item with index' do
expect { |b| subject.each_with_index(&b) }.to yield_control
end
end
context 'when block is not present' do
it 'returns enumerator' do
expect(subject.each_with_index).to be_instance_of(Enumerator)
end
end
end
end end
...@@ -277,6 +277,115 @@ def load(context, data); end ...@@ -277,6 +277,115 @@ def load(context, data); end
it_behaves_like 'failed pipeline', 'StandardError', 'Error!' it_behaves_like 'failed pipeline', 'StandardError', 'Error!'
end end
it 'saves entry in cache for de-duplication' do
expect_next_instance_of(BulkImports::Extractor) do |extractor|
expect(extractor)
.to receive(:extract)
.with(context)
.and_return(extracted_data)
end
expect_next_instance_of(BulkImports::Transformer) do |transformer|
expect(transformer)
.to receive(:transform)
.with(context, extracted_data.data.first)
.and_return(extracted_data.data.first)
end
expect_next_instance_of(BulkImports::MyPipeline) do |klass|
expect(klass).to receive(:save_processed_entry).with(extracted_data.data.first, anything)
end
subject.run
end
context 'with FF bulk_import_idempotent_workers disabled' do
before do
stub_feature_flags(bulk_import_idempotent_workers: false)
end
it 'does not touch the cache' do
expect_next_instance_of(BulkImports::Extractor) do |extractor|
expect(extractor)
.to receive(:extract)
.with(context)
.and_return(extracted_data)
end
expect_next_instance_of(BulkImports::Transformer) do |transformer|
expect(transformer)
.to receive(:transform)
.with(context, extracted_data.data.first)
.and_return(extracted_data.data.first)
end
expect_next_instance_of(BulkImports::MyPipeline) do |klass|
expect(klass).not_to receive(:save_processed_entry)
end
subject.run
end
end
end
context 'when the entry is already processed' do
before do
allow_next_instance_of(BulkImports::MyPipeline) do |klass|
allow(klass).to receive(:already_processed?).and_return true
end
end
it 'runs pipeline extractor, but not transformer or loader' do
expect_next_instance_of(BulkImports::Extractor) do |extractor|
expect(extractor)
.to receive(:extract)
.with(context)
.and_return(extracted_data)
end
allow_next_instance_of(BulkImports::Transformer) do |transformer|
expect(transformer)
.not_to receive(:transform)
end
allow_next_instance_of(BulkImports::Loader) do |loader|
expect(loader)
.not_to receive(:load)
end
subject.run
end
context 'with FF bulk_import_idempotent_workers disabled' do
before do
stub_feature_flags(bulk_import_idempotent_workers: false)
end
it 'calls extractor, transformer, and loader' do
expect_next_instance_of(BulkImports::Extractor) do |extractor|
expect(extractor)
.to receive(:extract)
.with(context)
.and_return(extracted_data)
end
expect_next_instance_of(BulkImports::Transformer) do |transformer|
expect(transformer)
.to receive(:transform)
.with(context, extracted_data.data.first)
.and_return(extracted_data.data.first)
end
expect_next_instance_of(BulkImports::Loader) do |loader|
expect(loader)
.to receive(:load)
.with(context, extracted_data.data.first)
end
subject.run
end
end
end end
context 'when entity is marked as failed' do context 'when entity is marked as failed' do
......
...@@ -43,7 +43,7 @@ ...@@ -43,7 +43,7 @@
subject(:pipeline) { described_class.new(context) } subject(:pipeline) { described_class.new(context) }
describe '#run' do describe '#run', :clean_gitlab_redis_cache do
before do before do
group.add_owner(user) group.add_owner(user)
......
...@@ -36,7 +36,7 @@ ...@@ -36,7 +36,7 @@
subject(:pipeline) { described_class.new(context) } subject(:pipeline) { described_class.new(context) }
describe '#run' do describe '#run', :clean_gitlab_redis_cache do
before do before do
group.add_owner(user) group.add_owner(user)
issue_with_index = [issue, 0] issue_with_index = [issue, 0]
......
...@@ -83,7 +83,7 @@ ...@@ -83,7 +83,7 @@
subject(:pipeline) { described_class.new(context) } subject(:pipeline) { described_class.new(context) }
describe '#run' do describe '#run', :clean_gitlab_redis_cache do
before do before do
group.add_owner(user) group.add_owner(user)
group.add_maintainer(another_user) group.add_maintainer(another_user)
......
...@@ -2,7 +2,7 @@ ...@@ -2,7 +2,7 @@
require 'spec_helper' require 'spec_helper'
RSpec.describe BulkImports::Projects::Pipelines::PipelineSchedulesPipeline do RSpec.describe BulkImports::Projects::Pipelines::PipelineSchedulesPipeline, :clean_gitlab_redis_cache, feature_category: :importers do
let_it_be(:user) { create(:user) } let_it_be(:user) { create(:user) }
let_it_be(:group) { create(:group) } let_it_be(:group) { create(:group) }
let_it_be(:project) { create(:project, group: group) } let_it_be(:project) { create(:project, group: group) }
......
...@@ -134,7 +134,7 @@ def create_username_project_data ...@@ -134,7 +134,7 @@ def create_username_project_data
end end
end end
describe '#transform' do describe '#transform', :clean_gitlab_redis_cache do
it 'updates matching urls and usernames with new ones' do it 'updates matching urls and usernames with new ones' do
transformed_mr = subject.transform(context, mr) transformed_mr = subject.transform(context, mr)
transformed_note = subject.transform(context, mr_note) transformed_note = subject.transform(context, mr_note)
...@@ -154,7 +154,8 @@ def create_username_project_data ...@@ -154,7 +154,8 @@ def create_username_project_data
expect(transformed_system_note.note).not_to include("@old_username") expect(transformed_system_note.note).not_to include("@old_username")
expect(transformed_username_system_note.note).not_to include("@source_username") expect(transformed_username_system_note.note).not_to include("@source_username")
expect(transformed_issue.description).to eq('http://localhost:80/namespace1/project-1/-/issues/1') expect(transformed_issue.description)
.to eq("http://localhost:80/#{transformed_issue.namespace.full_path}/-/issues/1")
expect(transformed_mr.description).to eq("#{expected_url} @destination_username? @alice-gdk, @bob-gdk!") expect(transformed_mr.description).to eq("#{expected_url} @destination_username? @alice-gdk, @bob-gdk!")
expect(transformed_note.note).to eq("#{expected_url} @same_username") expect(transformed_note.note).to eq("#{expected_url} @same_username")
expect(transformed_issue_note.note).to include("@newer_username, not_a@username, and @new_username.") expect(transformed_issue_note.note).to include("@newer_username, not_a@username, and @new_username.")
......
...@@ -38,7 +38,7 @@ ...@@ -38,7 +38,7 @@
subject(:pipeline) { described_class.new(context) } subject(:pipeline) { described_class.new(context) }
describe '#run' do describe '#run', :clean_gitlab_redis_cache do
before do before do
group.add_owner(user) group.add_owner(user)
with_index = [release, 0] with_index = [release, 0]
......
...@@ -40,7 +40,7 @@ ...@@ -40,7 +40,7 @@
subject(:pipeline) { described_class.new(context) } subject(:pipeline) { described_class.new(context) }
describe '#run' do describe '#run', :clean_gitlab_redis_cache do
before do before do
group.add_owner(user) group.add_owner(user)
snippet_with_index = [exported_snippet.dup, 0] snippet_with_index = [exported_snippet.dup, 0]
......
0% 加载中 .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册