diff --git a/app/models/ci/pipeline.rb b/app/models/ci/pipeline.rb
index 6ed7434397bc4937e9edb4cfdd25b174751d9f29..23c2164135dc59b2b2e099a0f69ae7f410229108 100644
--- a/app/models/ci/pipeline.rb
+++ b/app/models/ci/pipeline.rb
@@ -151,7 +151,7 @@ class Pipeline < Ci::ApplicationRecord
     accepts_nested_attributes_for :variables, reject_if: :persisted?
 
     delegate :full_path, to: :project, prefix: true
-    delegate :name, :auto_cancel_on_job_failure, to: :pipeline_metadata, allow_nil: true
+    delegate :name, to: :pipeline_metadata, allow_nil: true
 
     validates :sha, presence: { unless: :importing? }
     validates :ref, presence: { unless: :importing? }
@@ -1393,6 +1393,10 @@ def merge_request_diff
       merge_request.merge_request_diff_for(merge_request_diff_sha)
     end
 
+    def auto_cancel_on_job_failure
+      pipeline_metadata&.auto_cancel_on_job_failure || 'none'
+    end
+
     def auto_cancel_on_new_commit
       pipeline_metadata&.auto_cancel_on_new_commit || 'conservative'
     end
diff --git a/app/workers/all_queues.yml b/app/workers/all_queues.yml
index 76680071360d9fd4ffd1f8230b2085fef679ebac..56deedb0e35042b6a08caf65e21d5242fe10e4c0 100644
--- a/app/workers/all_queues.yml
+++ b/app/workers/all_queues.yml
@@ -2865,6 +2865,15 @@
   :weight: 1
   :idempotent: true
   :tags: []
+- :name: ci_user_cancel_pipeline
+  :worker_name: Ci::UserCancelPipelineWorker
+  :feature_category: :continuous_integration
+  :has_external_dependencies: false
+  :urgency: :high
+  :resource_boundary: :unknown
+  :weight: 1
+  :idempotent: true
+  :tags: []
 - :name: click_house_audit_event_partition_sync
   :worker_name: ClickHouse::AuditEventPartitionSyncWorker
   :feature_category: :compliance_management
diff --git a/app/workers/ci/build_finished_worker.rb b/app/workers/ci/build_finished_worker.rb
index 2113f7ae07b06d763084abfb00df624e2f6724ec..31b42652584c48f35c1c9624b7ea7ad0539b3402 100644
--- a/app/workers/ci/build_finished_worker.rb
+++ b/app/workers/ci/build_finished_worker.rb
@@ -43,6 +43,22 @@ def process_build(build)
       build.remove_token!
 
       if build.failed? && !build.auto_retry_expected?
+        if Feature.enabled?(:auto_cancel_pipeline_on_job_failure, build.pipeline.project)
+          case build.pipeline.auto_cancel_on_job_failure
+          when 'none'
+            # no-op
+          when 'all'
+            ::Ci::UserCancelPipelineWorker.perform_async(
+              build.pipeline_id,
+              build.pipeline_id,
+              build.user.id
+            )
+          else
+            raise ArgumentError,
+              "Unknown auto_cancel_on_job_failure value: #{build.pipeline.auto_cancel_on_job_failure}"
+          end
+        end
+
         ::Ci::MergeRequests::AddTodoWhenBuildFailsWorker.perform_async(build.id)
       end
 
diff --git a/app/workers/ci/user_cancel_pipeline_worker.rb b/app/workers/ci/user_cancel_pipeline_worker.rb
new file mode 100644
index 0000000000000000000000000000000000000000..2f1a06fa53a39dc5351008de9fae97cb037141d2
--- /dev/null
+++ b/app/workers/ci/user_cancel_pipeline_worker.rb
@@ -0,0 +1,27 @@
+# frozen_string_literal: true
+
+module Ci
+  class UserCancelPipelineWorker
+    include ApplicationWorker
+
+    data_consistency :always # rubocop:disable SidekiqLoadBalancing/WorkerDataConsistency -- Lots of updates to ci_builds
+    feature_category :continuous_integration
+    idempotent!
+    deduplicate :until_executed
+    urgency :high
+    loggable_arguments 1
+
+    def perform(pipeline_id, auto_canceled_by_pipeline_id, current_user_id, params = {}) # rubocop:disable Lint/UnusedMethodArgument -- Allowing for future expansion
+      ::Ci::Pipeline.find_by_id(pipeline_id).try do |pipeline|
+        # cascade_to_children is false because we iterate through children
+        # we also cancel bridges prior to prevent more children
+        ::Ci::CancelPipelineService.new(
+          pipeline: pipeline,
+          current_user: User.find_by_id(current_user_id),
+          cascade_to_children: false,
+          auto_canceled_by_pipeline: ::Ci::Pipeline.find_by_id(auto_canceled_by_pipeline_id)
+        ).execute
+      end
+    end
+  end
+end
diff --git a/config/sidekiq_queues.yml b/config/sidekiq_queues.yml
index a7e43aae9465adf2ffdf5b80d1d480090d7183f6..2e5fa7923b32e4cdba197f31240300862dbc3c3a 100644
--- a/config/sidekiq_queues.yml
+++ b/config/sidekiq_queues.yml
@@ -179,6 +179,8 @@
   - 1
 - - ci_upstream_projects_subscriptions_cleanup
   - 1
+- - ci_user_cancel_pipeline
+  - 1
 - - click_house_audit_event_partition_sync
   - 1
 - - click_house_ci_finished_builds_sync
diff --git a/spec/models/ci/pipeline_spec.rb b/spec/models/ci/pipeline_spec.rb
index 60665f4b779b75216b1bbcd5d1ec301ec9eabe46..bac3ea5b0212a0653f32fa218f0ed67d897ced6e 100644
--- a/spec/models/ci/pipeline_spec.rb
+++ b/spec/models/ci/pipeline_spec.rb
@@ -5794,4 +5794,36 @@ def create_bridge(upstream:, downstream:, depends: false)
       end
     end
   end
+
+  describe '#auto_cancel_on_job_failure' do
+    let_it_be_with_reload(:pipeline) { create(:ci_pipeline, project: project) }
+
+    subject(:auto_cancel_on_job_failure) { pipeline.auto_cancel_on_job_failure }
+
+    context 'when pipeline_metadata is not present' do
+      it { is_expected.to eq('none') }
+    end
+
+    context 'when pipeline_metadata is present' do
+      before_all do
+        create(:ci_pipeline_metadata, project: pipeline.project, pipeline: pipeline)
+      end
+
+      context 'when auto_cancel_on_job_failure is nil' do
+        before do
+          pipeline.pipeline_metadata.auto_cancel_on_job_failure = nil
+        end
+
+        it { is_expected.to eq('none') }
+      end
+
+      context 'when auto_cancel_on_job_failure is a valid value' do
+        before do
+          pipeline.pipeline_metadata.auto_cancel_on_job_failure = 'all'
+        end
+
+        it { is_expected.to eq('all') }
+      end
+    end
+  end
 end
diff --git a/spec/workers/ci/build_finished_worker_spec.rb b/spec/workers/ci/build_finished_worker_spec.rb
index 6da30a86b54e6c9bf43089dbb44a11c82b17b975..9c550b1959870304d93c247a5b46202a36cbe986 100644
--- a/spec/workers/ci/build_finished_worker_spec.rb
+++ b/spec/workers/ci/build_finished_worker_spec.rb
@@ -9,7 +9,9 @@
 
   describe '#perform' do
     context 'when build exists' do
-      let_it_be(:build) { create(:ci_build, :success, pipeline: create(:ci_pipeline)) }
+      let_it_be(:build) do
+        create(:ci_build, :success, user: create(:user), pipeline: create(:ci_pipeline))
+      end
 
       before do
         expect(Ci::Build).to receive(:find_by).with({ id: build.id }).and_return(build)
@@ -38,6 +40,82 @@
           subject
         end
 
+        context 'when auto_cancel_on_job_failure is set to an invalid value' do
+          before do
+            allow(build.pipeline)
+              .to receive(:auto_cancel_on_job_failure)
+              .and_return('invalid value')
+          end
+
+          it 'raises an exception' do
+            expect { subject }.to raise_error(
+              ArgumentError, 'Unknown auto_cancel_on_job_failure value: invalid value')
+          end
+
+          context 'when auto_cancel_pipeline_on_job_failure feature flag is disabled' do
+            before do
+              stub_feature_flags(auto_cancel_pipeline_on_job_failure: false)
+            end
+
+            it 'does not raise an exception' do
+              expect { subject }.not_to raise_error
+            end
+          end
+        end
+
+        context 'when auto_cancel_on_job_failure is set to all' do
+          before do
+            build.pipeline.create_pipeline_metadata!(
+              project: build.pipeline.project, auto_cancel_on_job_failure: 'all'
+            )
+          end
+
+          it 'cancels the pipeline' do
+            expect(::Ci::UserCancelPipelineWorker).to receive(:perform_async)
+              .with(build.pipeline.id, build.pipeline.id, build.user.id)
+
+            subject
+          end
+
+          context 'when auto_cancel_pipeline_on_job_failure feature flag is disabled' do
+            before do
+              stub_feature_flags(auto_cancel_pipeline_on_job_failure: false)
+            end
+
+            it 'does not cancel the pipeline' do
+              expect(::Ci::UserCancelPipelineWorker).not_to receive(:perform_async)
+
+              subject
+            end
+          end
+        end
+
+        context 'when auto_cancel_on_job_failure is set to none' do
+          before do
+            build.pipeline.create_pipeline_metadata!(
+              project: build.pipeline.project, auto_cancel_on_job_failure: 'none'
+            )
+          end
+
+          it 'does not cancel the pipeline' do
+            expect(::Ci::UserCancelPipelineWorker).not_to receive(:perform_async)
+
+            subject
+          end
+
+          context 'when auto_cancel_pipeline_on_job_failure feature flag is disabled' do
+            before do
+              stub_feature_flags(auto_cancel_pipeline_on_job_failure: false)
+            end
+
+            it 'does not cancel the pipeline' do
+              expect(::Ci::UserCancelPipelineWorker).not_to receive(:perform_async)
+
+              subject
+            end
+          end
+        end
+
         context 'when a build can be auto-retried' do
           before do
             allow(build)
@@ -51,6 +129,20 @@
 
             subject
           end
+
+          context 'when auto_cancel_on_job_failure is set to all' do
+            before do
+              build.pipeline.create_pipeline_metadata!(
+                project: build.pipeline.project, auto_cancel_on_job_failure: 'all'
+              )
+            end
+
+            it 'does not cancel the pipeline' do
+              expect(::Ci::UserCancelPipelineWorker).not_to receive(:perform_async)
+
+              subject
+            end
+          end
         end
       end
 
diff --git a/spec/workers/ci/user_cancel_pipeline_worker_spec.rb b/spec/workers/ci/user_cancel_pipeline_worker_spec.rb
new file mode 100644
index 0000000000000000000000000000000000000000..ec3af20ead0be04bf57d54cdc6ede5da0a8a4b21
--- /dev/null
+++ b/spec/workers/ci/user_cancel_pipeline_worker_spec.rb
@@ -0,0 +1,160 @@
+# frozen_string_literal: true
+
+require 'spec_helper'
+
+RSpec.describe Ci::UserCancelPipelineWorker, :aggregate_failures, feature_category: :continuous_integration do
+  let_it_be(:pipeline) { create(:ci_pipeline, :running) }
+  let_it_be(:current_user) { create(:user) }
+  let(:current_user_id) { current_user.id }
+
+  describe '#perform' do
+    subject(:perform) { described_class.new.perform(pipeline.id, pipeline.id, current_user_id) }
+
+    let(:cancel_service) { instance_double(::Ci::CancelPipelineService) }
+
+    context 'when the user id is nil' do
+      let(:current_user_id) { nil }
+
+      it 'cancels the pipeline by calling force_execute' do
+        allow(::Ci::Pipeline).to receive(:find_by_id).twice.and_return(pipeline)
+        expect(::Ci::CancelPipelineService)
+          .to receive(:new)
+          .with(
+            pipeline: pipeline,
+            current_user: nil,
+            auto_canceled_by_pipeline: pipeline,
+            cascade_to_children: false)
+          .and_return(cancel_service)
+
+        expect(cancel_service).to receive(:execute)
+
+        perform
+      end
+    end
+
+    context 'when the current user id is provided' do
+      context 'when the user does not exist' do
+        let(:current_user_id) { non_existing_record_id }
+
+        it 'cancels the pipeline by calling force_execute' do
+          allow(::Ci::Pipeline).to receive(:find_by_id).twice.and_return(pipeline)
+          expect(::Ci::CancelPipelineService)
+            .to receive(:new)
+            .with(
+              pipeline: pipeline,
+              current_user: nil,
+              auto_canceled_by_pipeline: pipeline,
+              cascade_to_children: false)
+            .and_return(cancel_service)
+
+          expect(cancel_service).to receive(:execute)
+
+          perform
+        end
+      end
+
+      context 'when the user exists' do
+        it 'cancels the pipeline by calling execute' do
+          allow(::Ci::Pipeline).to receive(:find_by_id).twice.and_return(pipeline)
+          expect(::Ci::CancelPipelineService)
+            .to receive(:new)
+            .with(
+              pipeline: pipeline,
+              current_user: current_user,
+              auto_canceled_by_pipeline: pipeline,
+              cascade_to_children: false)
+            .and_return(cancel_service)
+
+          expect(cancel_service).to receive(:execute)
+
+          perform
+        end
+      end
+    end
+
+    context 'if pipeline is deleted' do
+      subject(:perform) { described_class.new.perform(non_existing_record_id, pipeline.id, current_user_id) }
+
+      it 'does not error' do
+        expect(::Ci::CancelPipelineService).not_to receive(:new)
+
+        perform
+      end
+    end
+
+    context 'when auto_canceled_by_pipeline is deleted' do
+      subject(:perform) { described_class.new.perform(pipeline.id, non_existing_record_id, current_user_id) }
+
+      it 'does not error' do
+        expect(::Ci::CancelPipelineService)
+          .to receive(:new)
+          .with(
+            pipeline: an_instance_of(::Ci::Pipeline),
+            current_user: current_user,
+            auto_canceled_by_pipeline: nil,
+            cascade_to_children: false)
+          .and_call_original
+
+        perform
+      end
+    end
+
+    describe 'with builds and state transition side effects', :sidekiq_inline do
+      let!(:build) { create(:ci_build, :running, pipeline: pipeline) }
+      let(:job_args) { [pipeline.id, pipeline.id, current_user_id] }
+
+      context 'when the user id is nil' do
+        let(:current_user_id) { nil }
+
+        it_behaves_like 'an idempotent worker', :sidekiq_inline do
+          it 'does not cancel the pipeline' do
+            perform
+
+            pipeline.reload
+
+            expect(pipeline).not_to be_canceled
+            expect(pipeline.builds.first).not_to be_canceled
+            expect(pipeline.builds.first.auto_canceled_by_id).to be_nil
+            expect(pipeline.auto_canceled_by_id).to be_nil
+          end
+        end
+      end
+
+      context 'when the user id exists' do
+        context 'when the user can cancel the pipeline' do
+          let_it_be(:project) { create(:project) }
+          let_it_be(:pipeline) { create(:ci_pipeline, :running, project: project) }
+          let_it_be(:current_user) { project.owner }
+
+          it_behaves_like 'an idempotent worker', :sidekiq_inline do
+            it 'cancels the pipeline' do
+              perform
+
+              pipeline.reload
+
+              expect(pipeline).to be_canceled
+              expect(pipeline.builds.first).to be_canceled
+              expect(pipeline.builds.first.auto_canceled_by_id).to eq pipeline.id
+              expect(pipeline.auto_canceled_by_id).to eq pipeline.id
+            end
+          end
+        end
+
+        context 'when the user cannot cancel the pipeline' do
+          it_behaves_like 'an idempotent worker', :sidekiq_inline do
+            it 'does not cancel the pipeline' do
+              perform
+
+              pipeline.reload
+
+              expect(pipeline).not_to be_canceled
+              expect(pipeline.builds.first).not_to be_canceled
+              expect(pipeline.builds.first.auto_canceled_by_id).to be_nil
+              expect(pipeline.auto_canceled_by_id).to be_nil
+            end
+          end
+        end
+      end
+    end
+  end
+end