diff --git a/.rubocop_todo/rspec/missing_feature_category.yml b/.rubocop_todo/rspec/missing_feature_category.yml index 28a193130bb9ae8108a208426209e92b97ae9098..eeda0475e195ace052c7153cb6d7a995f5eff4e8 100644 --- a/.rubocop_todo/rspec/missing_feature_category.yml +++ b/.rubocop_todo/rspec/missing_feature_category.yml @@ -2761,7 +2761,6 @@ RSpec/MissingFeatureCategory: - 'spec/lib/bulk_imports/projects/pipelines/external_pull_requests_pipeline_spec.rb' - 'spec/lib/bulk_imports/projects/pipelines/issues_pipeline_spec.rb' - 'spec/lib/bulk_imports/projects/pipelines/merge_requests_pipeline_spec.rb' - - 'spec/lib/bulk_imports/projects/pipelines/pipeline_schedules_pipeline_spec.rb' - 'spec/lib/bulk_imports/projects/pipelines/project_attributes_pipeline_spec.rb' - 'spec/lib/bulk_imports/projects/pipelines/project_feature_pipeline_spec.rb' - 'spec/lib/bulk_imports/projects/pipelines/protected_branches_pipeline_spec.rb' diff --git a/config/feature_flags/development/bulk_import_idempotent_workers.yml b/config/feature_flags/development/bulk_import_idempotent_workers.yml new file mode 100644 index 0000000000000000000000000000000000000000..83d5b7f65c706c001bad70ce4377679367a2adea --- /dev/null +++ b/config/feature_flags/development/bulk_import_idempotent_workers.yml @@ -0,0 +1,8 @@ +--- +name: bulk_import_idempotent_workers +introduced_by_url: https://gitlab.com/gitlab-org/gitlab/-/merge_requests/132702 +rollout_issue_url: https://gitlab.com/gitlab-org/gitlab/-/issues/426480 +milestone: '16.5' +type: development +group: group::import and integrate +default_enabled: false diff --git a/ee/spec/lib/bulk_imports/groups/pipelines/epics_pipeline_spec.rb b/ee/spec/lib/bulk_imports/groups/pipelines/epics_pipeline_spec.rb index 78c456be796b2cede59b44e64f1a71a880e5a778..c37ad97e06ac90e9014ead78d515717705611cd2 100644 --- a/ee/spec/lib/bulk_imports/groups/pipelines/epics_pipeline_spec.rb +++ b/ee/spec/lib/bulk_imports/groups/pipelines/epics_pipeline_spec.rb @@ -31,7 +31,7 @@ subject { described_class.new(context) } - describe '#run' do + describe '#run', :clean_gitlab_redis_cache do before do allow(Dir).to receive(:mktmpdir).and_return(tmpdir) allow_next_instance_of(BulkImports::FileDownloadService) do |service| diff --git a/lib/bulk_imports/common/pipelines/badges_pipeline.rb b/lib/bulk_imports/common/pipelines/badges_pipeline.rb index 33a24e61a3ffc955ca2d2ac99ea6734bef40b7fa..f7dfb72124d2b6bea2e22714a8a23aa5f3134815 100644 --- a/lib/bulk_imports/common/pipelines/badges_pipeline.rb +++ b/lib/bulk_imports/common/pipelines/badges_pipeline.rb @@ -33,6 +33,15 @@ def load(context, data) end end + def already_processed?(data, _) + values = Gitlab::Cache::Import::Caching.values_from_set(cache_key) + values.include?(OpenSSL::Digest::SHA256.hexdigest(data.to_s)) + end + + def save_processed_entry(data, _) + Gitlab::Cache::Import::Caching.set_add(cache_key, OpenSSL::Digest::SHA256.hexdigest(data.to_s)) + end + private def group_badge?(data) diff --git a/lib/bulk_imports/ndjson_pipeline.rb b/lib/bulk_imports/ndjson_pipeline.rb index 3c392910c1f5b0a3e2389b54b17e7ad8eddef1aa..ad990c8be070daf5d05f8ea8731c7b361c6a39fa 100644 --- a/lib/bulk_imports/ndjson_pipeline.rb +++ b/lib/bulk_imports/ndjson_pipeline.rb @@ -128,6 +128,15 @@ def relation_definition import_export_config.top_relation_tree(relation) end + def already_processed?(_, index) + last_index = Gitlab::Cache::Import::Caching.read(cache_key) + last_index && last_index.to_i >= index + end + + def save_processed_entry(_, index) + Gitlab::Cache::Import::Caching.write(cache_key, index) + end + def capture_invalid_subrelations(invalid_subrelations) invalid_subrelations.each do |record| BulkImports::Failure.create( diff --git a/lib/bulk_imports/pipeline/extracted_data.rb b/lib/bulk_imports/pipeline/extracted_data.rb index 0b36c0682981faa43dbabbb0086460702ffd4e47..e4640db08735ea843698755a3af78425ac579f8a 100644 --- a/lib/bulk_imports/pipeline/extracted_data.rb +++ b/lib/bulk_imports/pipeline/extracted_data.rb @@ -5,6 +5,8 @@ module Pipeline class ExtractedData attr_reader :data + delegate :each, :each_with_index, to: :data + def initialize(data: nil, page_info: {}) @data = data.is_a?(Enumerator) ? data : Array.wrap(data) @page_info = page_info @@ -20,10 +22,6 @@ def has_next_page? def next_page @page_info&.dig('next_page') end - - def each(&block) - data.each(&block) - end end end end diff --git a/lib/bulk_imports/pipeline/runner.rb b/lib/bulk_imports/pipeline/runner.rb index 1e2d91520473b47509a2acea36d634c02d25e480..e6005478bd90bfd1dd85c52f3e03de9b7c801d34 100644 --- a/lib/bulk_imports/pipeline/runner.rb +++ b/lib/bulk_imports/pipeline/runner.rb @@ -15,7 +15,10 @@ def run extracted_data = extracted_data_from if extracted_data - extracted_data.each do |entry| + extracted_data.each_with_index do |entry, index| + raw_entry = entry.dup + next if Feature.enabled?(:bulk_import_idempotent_workers) && already_processed?(raw_entry, index) + transformers.each do |transformer| entry = run_pipeline_step(:transformer, transformer.class.name) do transformer.transform(context, entry) @@ -25,6 +28,8 @@ def run run_pipeline_step(:loader, loader.class.name) do loader.load(context, entry) end + + save_processed_entry(raw_entry, index) if Feature.enabled?(:bulk_import_idempotent_workers) end tracker.update!( @@ -73,6 +78,19 @@ def extracted_data_from end end + def cache_key + batch_number = context.extra[:batch_number] || 0 + + "#{self.class.name.underscore}/#{tracker.bulk_import_entity_id}/#{batch_number}" + end + + # Overridden by child pipelines with different caching strategies + def already_processed?(*) + false + end + + def save_processed_entry(*); end + def after_run(extracted_data) run if extracted_data.has_next_page? end diff --git a/spec/lib/bulk_imports/common/pipelines/badges_pipeline_spec.rb b/spec/lib/bulk_imports/common/pipelines/badges_pipeline_spec.rb index 6c5465c8a66711a127637a308cdfee812f9ba6b9..a18d26bedf3dfb40ed2fdb1dd4406bb0ece9a8f1 100644 --- a/spec/lib/bulk_imports/common/pipelines/badges_pipeline_spec.rb +++ b/spec/lib/bulk_imports/common/pipelines/badges_pipeline_spec.rb @@ -33,6 +33,24 @@ expect(badge.image_url).to eq(badge_data['image_url']) end + it 'skips already imported records' do + expect { pipeline.run }.to change(Badge, :count).by(2) + + expect { pipeline.run }.to not_change(Badge, :count) + end + + context 'with FF bulk_import_idempotent_workers disabled' do + before do + stub_feature_flags(bulk_import_idempotent_workers: false) + end + + it 'creates duplicated badges' do + expect { pipeline.run }.to change(Badge, :count).by(2) + + expect { pipeline.run }.to change(Badge, :count) + end + end + context 'when project entity' do let(:first_page) { extracted_data(has_next_page: true) } let(:last_page) { extracted_data(name: 'badge2', kind: 'project') } diff --git a/spec/lib/bulk_imports/ndjson_pipeline_spec.rb b/spec/lib/bulk_imports/ndjson_pipeline_spec.rb index 29f42ab3366150b536da568da79c6937dc75fdf8..5611879868d9e443d64a717484854c686ecdcccb 100644 --- a/spec/lib/bulk_imports/ndjson_pipeline_spec.rb +++ b/spec/lib/bulk_imports/ndjson_pipeline_spec.rb @@ -6,6 +6,8 @@ let_it_be(:group) { create(:group) } let_it_be(:project) { create(:project) } let_it_be(:user) { create(:user) } + let(:tracker) { instance_double(BulkImports::Tracker, bulk_import_entity_id: 1) } + let(:context) { instance_double(BulkImports::Pipeline::Context, tracker: tracker, extra: { batch_number: 1 }) } let(:klass) do Class.new do @@ -13,11 +15,12 @@ relation_name 'test' - attr_reader :portable, :current_user + attr_reader :portable, :current_user, :context - def initialize(portable, user) + def initialize(portable, user, context) @portable = portable @current_user = user + @context = context end end end @@ -26,12 +29,29 @@ def initialize(portable, user) stub_const('NdjsonPipelineClass', klass) end - subject { NdjsonPipelineClass.new(group, user) } + subject { NdjsonPipelineClass.new(group, user, context) } it 'marks pipeline as ndjson' do expect(NdjsonPipelineClass.file_extraction_pipeline?).to eq(true) end + describe 'caching' do + it 'saves completed entry in cache' do + subject.save_processed_entry("entry", 10) + + expected_cache_key = "ndjson_pipeline_class/1/1" + expect(Gitlab::Cache::Import::Caching.read(expected_cache_key)).to eq("10") + end + + it 'identifies completed entries' do + subject.save_processed_entry("entry", 10) + + expect(subject.already_processed?("entry", 11)).to be_falsy + expect(subject.already_processed?("entry", 10)).to be_truthy + expect(subject.already_processed?("entry", 9)).to be_truthy + end + end + describe '#deep_transform_relation!' do it 'transforms relation hash' do transformed = subject.deep_transform_relation!({}, 'test', {}) do |key, hash| @@ -238,7 +258,7 @@ def initialize(portable, user) end context 'when portable is project' do - subject { NdjsonPipelineClass.new(project, user) } + subject { NdjsonPipelineClass.new(project, user, context) } it 'returns group relation name override' do expect(subject.relation_key_override('labels')).to eq('project_labels') @@ -254,7 +274,7 @@ def initialize(portable, user) end context 'when portable is project' do - subject { NdjsonPipelineClass.new(project, user) } + subject { NdjsonPipelineClass.new(project, user, context) } it 'returns project relation factory' do expect(subject.relation_factory).to eq(Gitlab::ImportExport::Project::RelationFactory) diff --git a/spec/lib/bulk_imports/pipeline/extracted_data_spec.rb b/spec/lib/bulk_imports/pipeline/extracted_data_spec.rb index 045908de5c45bb5f051e86e07cdc83014ced8299..108ce05cb66dc9cd13c968b8f62bdde9782c8de6 100644 --- a/spec/lib/bulk_imports/pipeline/extracted_data_spec.rb +++ b/spec/lib/bulk_imports/pipeline/extracted_data_spec.rb @@ -50,4 +50,18 @@ end end end + + describe '#each_with_index' do + context 'when block is present' do + it 'yields each data item with index' do + expect { |b| subject.each_with_index(&b) }.to yield_control + end + end + + context 'when block is not present' do + it 'returns enumerator' do + expect(subject.each_with_index).to be_instance_of(Enumerator) + end + end + end end diff --git a/spec/lib/bulk_imports/pipeline/runner_spec.rb b/spec/lib/bulk_imports/pipeline/runner_spec.rb index 2f54ab111c86777ed47cb9d6485b9a9ca248a413..f1742f10407982b1905ddc7e1b175acd519466ff 100644 --- a/spec/lib/bulk_imports/pipeline/runner_spec.rb +++ b/spec/lib/bulk_imports/pipeline/runner_spec.rb @@ -277,6 +277,115 @@ def load(context, data); end it_behaves_like 'failed pipeline', 'StandardError', 'Error!' end + + it 'saves entry in cache for de-duplication' do + expect_next_instance_of(BulkImports::Extractor) do |extractor| + expect(extractor) + .to receive(:extract) + .with(context) + .and_return(extracted_data) + end + + expect_next_instance_of(BulkImports::Transformer) do |transformer| + expect(transformer) + .to receive(:transform) + .with(context, extracted_data.data.first) + .and_return(extracted_data.data.first) + end + + expect_next_instance_of(BulkImports::MyPipeline) do |klass| + expect(klass).to receive(:save_processed_entry).with(extracted_data.data.first, anything) + end + + subject.run + end + + context 'with FF bulk_import_idempotent_workers disabled' do + before do + stub_feature_flags(bulk_import_idempotent_workers: false) + end + + it 'does not touch the cache' do + expect_next_instance_of(BulkImports::Extractor) do |extractor| + expect(extractor) + .to receive(:extract) + .with(context) + .and_return(extracted_data) + end + + expect_next_instance_of(BulkImports::Transformer) do |transformer| + expect(transformer) + .to receive(:transform) + .with(context, extracted_data.data.first) + .and_return(extracted_data.data.first) + end + + expect_next_instance_of(BulkImports::MyPipeline) do |klass| + expect(klass).not_to receive(:save_processed_entry) + end + + subject.run + end + end + end + + context 'when the entry is already processed' do + before do + allow_next_instance_of(BulkImports::MyPipeline) do |klass| + allow(klass).to receive(:already_processed?).and_return true + end + end + + it 'runs pipeline extractor, but not transformer or loader' do + expect_next_instance_of(BulkImports::Extractor) do |extractor| + expect(extractor) + .to receive(:extract) + .with(context) + .and_return(extracted_data) + end + + allow_next_instance_of(BulkImports::Transformer) do |transformer| + expect(transformer) + .not_to receive(:transform) + end + + allow_next_instance_of(BulkImports::Loader) do |loader| + expect(loader) + .not_to receive(:load) + end + + subject.run + end + + context 'with FF bulk_import_idempotent_workers disabled' do + before do + stub_feature_flags(bulk_import_idempotent_workers: false) + end + + it 'calls extractor, transformer, and loader' do + expect_next_instance_of(BulkImports::Extractor) do |extractor| + expect(extractor) + .to receive(:extract) + .with(context) + .and_return(extracted_data) + end + + expect_next_instance_of(BulkImports::Transformer) do |transformer| + expect(transformer) + .to receive(:transform) + .with(context, extracted_data.data.first) + .and_return(extracted_data.data.first) + end + + expect_next_instance_of(BulkImports::Loader) do |loader| + expect(loader) + .to receive(:load) + .with(context, extracted_data.data.first) + end + + subject.run + end + end end context 'when entity is marked as failed' do diff --git a/spec/lib/bulk_imports/projects/pipelines/ci_pipelines_pipeline_spec.rb b/spec/lib/bulk_imports/projects/pipelines/ci_pipelines_pipeline_spec.rb index 63e7cdf2e5af15647fceb5d12d6351371ba54454..0d32af27d4f7835ea5fd3dacfa45b2901de600d0 100644 --- a/spec/lib/bulk_imports/projects/pipelines/ci_pipelines_pipeline_spec.rb +++ b/spec/lib/bulk_imports/projects/pipelines/ci_pipelines_pipeline_spec.rb @@ -43,7 +43,7 @@ subject(:pipeline) { described_class.new(context) } - describe '#run' do + describe '#run', :clean_gitlab_redis_cache do before do group.add_owner(user) diff --git a/spec/lib/bulk_imports/projects/pipelines/issues_pipeline_spec.rb b/spec/lib/bulk_imports/projects/pipelines/issues_pipeline_spec.rb index fd13c10d61e20e9ebfb227cb98345900bf0a449e..625078b1b2aaca9fd0283d50f31915c11bc029b0 100644 --- a/spec/lib/bulk_imports/projects/pipelines/issues_pipeline_spec.rb +++ b/spec/lib/bulk_imports/projects/pipelines/issues_pipeline_spec.rb @@ -36,7 +36,7 @@ subject(:pipeline) { described_class.new(context) } - describe '#run' do + describe '#run', :clean_gitlab_redis_cache do before do group.add_owner(user) issue_with_index = [issue, 0] diff --git a/spec/lib/bulk_imports/projects/pipelines/merge_requests_pipeline_spec.rb b/spec/lib/bulk_imports/projects/pipelines/merge_requests_pipeline_spec.rb index 5b85b3eee7916015c0bf53ffc3d408995dd575bd..3fb7e28036ee96196de89642f5db8bd2de70c04c 100644 --- a/spec/lib/bulk_imports/projects/pipelines/merge_requests_pipeline_spec.rb +++ b/spec/lib/bulk_imports/projects/pipelines/merge_requests_pipeline_spec.rb @@ -83,7 +83,7 @@ subject(:pipeline) { described_class.new(context) } - describe '#run' do + describe '#run', :clean_gitlab_redis_cache do before do group.add_owner(user) group.add_maintainer(another_user) diff --git a/spec/lib/bulk_imports/projects/pipelines/pipeline_schedules_pipeline_spec.rb b/spec/lib/bulk_imports/projects/pipelines/pipeline_schedules_pipeline_spec.rb index 0bfd9410808d02148cfc14a1e93060a7a17b5187..6ba555aa328d6a87caa831931ff86b9feeae87b9 100644 --- a/spec/lib/bulk_imports/projects/pipelines/pipeline_schedules_pipeline_spec.rb +++ b/spec/lib/bulk_imports/projects/pipelines/pipeline_schedules_pipeline_spec.rb @@ -2,7 +2,7 @@ require 'spec_helper' -RSpec.describe BulkImports::Projects::Pipelines::PipelineSchedulesPipeline do +RSpec.describe BulkImports::Projects::Pipelines::PipelineSchedulesPipeline, :clean_gitlab_redis_cache, feature_category: :importers do let_it_be(:user) { create(:user) } let_it_be(:group) { create(:group) } let_it_be(:project) { create(:project, group: group) } diff --git a/spec/lib/bulk_imports/projects/pipelines/references_pipeline_spec.rb b/spec/lib/bulk_imports/projects/pipelines/references_pipeline_spec.rb index af8bce47c3dc15efa5f822f4bc6731de3aebda0c..e2b99fe4db4af4609325d27f849ae08ed0a8b7b8 100644 --- a/spec/lib/bulk_imports/projects/pipelines/references_pipeline_spec.rb +++ b/spec/lib/bulk_imports/projects/pipelines/references_pipeline_spec.rb @@ -134,7 +134,7 @@ def create_username_project_data end end - describe '#transform' do + describe '#transform', :clean_gitlab_redis_cache do it 'updates matching urls and usernames with new ones' do transformed_mr = subject.transform(context, mr) transformed_note = subject.transform(context, mr_note) @@ -154,7 +154,8 @@ def create_username_project_data expect(transformed_system_note.note).not_to include("@old_username") expect(transformed_username_system_note.note).not_to include("@source_username") - expect(transformed_issue.description).to eq('http://localhost:80/namespace1/project-1/-/issues/1') + expect(transformed_issue.description) + .to eq("http://localhost:80/#{transformed_issue.namespace.full_path}/-/issues/1") expect(transformed_mr.description).to eq("#{expected_url} @destination_username? @alice-gdk, @bob-gdk!") expect(transformed_note.note).to eq("#{expected_url} @same_username") expect(transformed_issue_note.note).to include("@newer_username, not_a@username, and @new_username.") diff --git a/spec/lib/bulk_imports/projects/pipelines/releases_pipeline_spec.rb b/spec/lib/bulk_imports/projects/pipelines/releases_pipeline_spec.rb index 339ca727b57c7a371f16b14da9c8a4144432d61c..9e0b5af6bfea4760e7fbbaaebf00454a439a97a2 100644 --- a/spec/lib/bulk_imports/projects/pipelines/releases_pipeline_spec.rb +++ b/spec/lib/bulk_imports/projects/pipelines/releases_pipeline_spec.rb @@ -38,7 +38,7 @@ subject(:pipeline) { described_class.new(context) } - describe '#run' do + describe '#run', :clean_gitlab_redis_cache do before do group.add_owner(user) with_index = [release, 0] diff --git a/spec/lib/bulk_imports/projects/pipelines/snippets_pipeline_spec.rb b/spec/lib/bulk_imports/projects/pipelines/snippets_pipeline_spec.rb index 41b3ea378040dbb2ff5e72704d8c3e706da90ad3..1e3cfe20bf55f139bbd41e12fe6deca242f4fb75 100644 --- a/spec/lib/bulk_imports/projects/pipelines/snippets_pipeline_spec.rb +++ b/spec/lib/bulk_imports/projects/pipelines/snippets_pipeline_spec.rb @@ -40,7 +40,7 @@ subject(:pipeline) { described_class.new(context) } - describe '#run' do + describe '#run', :clean_gitlab_redis_cache do before do group.add_owner(user) snippet_with_index = [exported_snippet.dup, 0]