diff --git a/app/services/ci/pipeline_processing/atomic_processing_service.rb b/app/services/ci/pipeline_processing/atomic_processing_service.rb index 84e5089b0d5384f2ba3db279da94ae56ce8cd8bc..13bc4fdadf6ba2ffc01e591973d2103278891893 100644 --- a/app/services/ci/pipeline_processing/atomic_processing_service.rb +++ b/app/services/ci/pipeline_processing/atomic_processing_service.rb @@ -55,24 +55,62 @@ def update_stages! end def update_stage!(stage) - # Update jobs for a given stage in bulk/slices - @collection - .created_job_ids_in_stage(stage.position) - .in_groups_of(BATCH_SIZE, false) { |ids| update_jobs!(ids) } + if Feature.enabled?(:ci_atomic_processing_ordered_update_stage, project) + sorted_update_stage!(stage) + else + legacy_update_stage!(stage) + end status = @collection.status_of_stage(stage.position) stage.set_status(status) end - def update_jobs!(ids) - created_jobs = pipeline + def sorted_update_stage!(stage) + ordered_jobs(stage).each { |job| update_job!(job) } + end + + def ordered_jobs(stage) + jobs = load_jobs_in_batches(stage) + sorted_job_names = sort_jobs(jobs).each_with_index.to_h + jobs.sort_by { |job| sorted_job_names.fetch(job.name) } + end + + def load_jobs_in_batches(stage) + @collection + .created_job_ids_in_stage(stage.position) + .in_groups_of(BATCH_SIZE, false) + .each_with_object([]) do |ids, jobs| + jobs.concat(load_jobs(ids)) + end + end + + def load_jobs(ids) + pipeline .current_processable_jobs .id_in(ids) .with_project_preload .created .ordered_by_stage .select_with_aggregated_needs(project) + end + + def sort_jobs(jobs) + Gitlab::Ci::YamlProcessor::Dag.order( # rubocop: disable CodeReuse/ActiveRecord -- this is not ActiveRecord + jobs.to_h do |job| + [job.name, job.aggregated_needs_names.to_a] + end + ) + end + def legacy_update_stage!(stage) + # Update jobs for a given stage in bulk/slices + @collection + .created_job_ids_in_stage(stage.position) + .in_groups_of(BATCH_SIZE, false) { |ids| update_jobs!(ids) } + end + + def update_jobs!(ids) + created_jobs = load_jobs(ids) created_jobs.each { |job| update_job!(job) } end diff --git a/config/feature_flags/gitlab_com_derisk/ci_atomic_processing_ordered_update_stage.yml b/config/feature_flags/gitlab_com_derisk/ci_atomic_processing_ordered_update_stage.yml new file mode 100644 index 0000000000000000000000000000000000000000..b8d9daeb21416af572b27cbba3eebd1e9d2805e9 --- /dev/null +++ b/config/feature_flags/gitlab_com_derisk/ci_atomic_processing_ordered_update_stage.yml @@ -0,0 +1,9 @@ +--- +name: ci_atomic_processing_ordered_update_stage +feature_issue_url: https://gitlab.com/gitlab-org/gitlab/-/issues/450395 +introduced_by_url: https://gitlab.com/gitlab-org/gitlab/-/merge_requests/147875 +rollout_issue_url: https://gitlab.com/gitlab-org/gitlab/-/issues/451992 +milestone: '16.11' +group: group::pipeline authoring +type: gitlab_com_derisk +default_enabled: false diff --git a/spec/services/ci/pipeline_processing/atomic_processing_service_spec.rb b/spec/services/ci/pipeline_processing/atomic_processing_service_spec.rb index 6e263e82432eb3dc8c7211fcf63a7d665edfcfbe..d4a966cc474653009650f635a17da73bbef44e7b 100644 --- a/spec/services/ci/pipeline_processing/atomic_processing_service_spec.rb +++ b/spec/services/ci/pipeline_processing/atomic_processing_service_spec.rb @@ -928,6 +928,89 @@ def event_on_pipeline(event) end end + context 'when dependent jobs are listed after job needs in the same stage' do + let(:feature_flag_state) { true } + let(:config) do + <<-YAML + test1: + stage: test + needs: [manual1] + script: exit 0 + + test2: + stage: test + script: exit 0 + + manual1: + stage: test + when: manual + script: exit 0 + YAML + end + + let(:pipeline) do + Ci::CreatePipelineService.new(project, user, { ref: 'master' }).execute(:push).payload + end + + before do + stub_feature_flags(ci_atomic_processing_ordered_update_stage: feature_flag_state) + stub_ci_pipeline_yaml_file(config) + process_pipeline + end + + context 'when ci_atomic_processing_ordered_update_stage is enabled' do + let(:statuses) do + { 'manual1': 'manual', 'test1': 'skipped', 'test2': 'pending' } + end + + it 'test1 is in skipped state' do + expect(all_builds_names_and_statuses).to eq(statuses) + expect(stages).to eq(['pending']) + end + + context 'with multiple batches' do + before do + stub_const("#{described_class}::BATCH_SIZE", 2) + end + + it 'test1 is in skipped state' do + expect(all_builds_names_and_statuses).to eq(statuses) + expect(stages).to eq(['pending']) + end + end + end + + context 'when ci_atomic_processing_ordered_update_stage is disabled' do + let(:feature_flag_state) { false } + let(:manual1) { all_builds.find_by(name: 'manual1') } + let(:test1) { all_builds.find_by(name: 'test1') } + let(:test2) { all_builds.find_by(name: 'test2') } + + shared_examples 'stage jobs played in any order' do + it 'test1 is in created or skipped state' do + expect(manual1.status).to eq('manual') + # With this feature flag disabled, whether manual1 or test1 is run first + # is not deterministic: + # - test1 in created state if it is run first + # - test1 in skipped state if manual1 is run first + expect(test1.status).to eq('created').or eq('skipped') + expect(test2.status).to eq('pending') + expect(stages).to eq(['pending']) + end + end + + it_behaves_like 'stage jobs played in any order' + + context 'with multiple batches' do + before do + stub_const("#{described_class}::BATCH_SIZE", 2) + end + + it_behaves_like 'stage jobs played in any order' + end + end + end + context 'when jobs change from stopped to alive status during pipeline processing' do around do |example| Sidekiq::Testing.fake! { example.run }