diff --git a/app/models/deployment.rb b/app/models/deployment.rb
index 0c679a3075b187eb498784abae0d4291b6a67fa0..5450357fe1b1a4f49aed6b7d068f55904008fc9f 100644
--- a/app/models/deployment.rb
+++ b/app/models/deployment.rb
@@ -39,6 +39,7 @@ class Deployment < ApplicationRecord
   scope :for_status, -> (status) { where(status: status) }
 
   scope :visible, -> { where(status: %i[running success failed canceled]) }
+  scope :stoppable, -> { where.not(on_stop: nil).where.not(deployable_id: nil).success }
 
   state_machine :status, initial: :created do
     event :run do
diff --git a/app/models/environment.rb b/app/models/environment.rb
index 973f1243e6b9d391039b1ad8321d878cd897b2f1..4635b05fcc7557c208341f920c7062734952acc3 100644
--- a/app/models/environment.rb
+++ b/app/models/environment.rb
@@ -61,6 +61,7 @@ class Environment < ApplicationRecord
   scope :in_review_folder, -> { where(environment_type: "review") }
   scope :for_name, -> (name) { where(name: name) }
   scope :preload_cluster, -> { preload(last_deployment: :cluster) }
+  scope :auto_stoppable, -> (limit) { available.where('auto_stop_at < ?', Time.zone.now).limit(limit) }
 
   ##
   # Search environments which have names like the given query.
@@ -107,6 +108,44 @@ def self.find_or_create_by_name(name)
     find_or_create_by(name: name)
   end
 
+  class << self
+    ##
+    # This method returns stop actions (jobs) for multiple environments within one
+    # query. It's useful to avoid N+1 problem.
+    #
+    # NOTE: The count of environments should be small~medium (e.g. < 5000)
+    def stop_actions
+      cte = cte_for_deployments_with_stop_action
+      ci_builds = Ci::Build.arel_table
+
+      inner_join_stop_actions = ci_builds.join(cte.table).on(
+        ci_builds[:project_id].eq(cte.table[:project_id])
+          .and(ci_builds[:ref].eq(cte.table[:ref]))
+          .and(ci_builds[:name].eq(cte.table[:on_stop]))
+      ).join_sources
+
+      pipeline_ids = ci_builds.join(cte.table).on(
+        ci_builds[:id].eq(cte.table[:deployable_id])
+      ).project(:commit_id)
+
+      Ci::Build.joins(inner_join_stop_actions)
+               .with(cte.to_arel)
+               .where(ci_builds[:commit_id].in(pipeline_ids))
+               .where(status: HasStatus::BLOCKED_STATUS)
+               .preload_project_and_pipeline_project
+               .preload(:user, :metadata, :deployment)
+    end
+
+    private
+
+    def cte_for_deployments_with_stop_action
+      Gitlab::SQL::CTE.new(:deployments_with_stop_action,
+        Deployment.where(environment_id: select(:id))
+          .distinct_on_environment
+          .stoppable)
+    end
+  end
+
   def clear_prometheus_reactive_cache!(query_name)
     cluster_prometheus_adapter&.clear_prometheus_reactive_cache!(query_name, self)
   end
diff --git a/app/services/ci/stop_environments_service.rb b/app/services/ci/stop_environments_service.rb
index d9a800791f2a305311ec436d2abef9b53cdaebff..14ef744ada175f4625dfcdf38c88aab3025cebca 100644
--- a/app/services/ci/stop_environments_service.rb
+++ b/app/services/ci/stop_environments_service.rb
@@ -16,6 +16,22 @@ def execute_for_merge_request(merge_request)
       merge_request.environments.each { |environment| stop(environment) }
     end
 
+    ##
+    # This method is for stopping multiple environments in a batch style.
+    # The maximum acceptable count of environments is roughly 5000. Please
+    # apply acceptable `LIMIT` clause to the `environments` relation.
+    def self.execute_in_batch(environments)
+      stop_actions = environments.stop_actions.load
+
+      environments.update_all(auto_stop_at: nil, state: 'stopped')
+
+      stop_actions.each do |stop_action|
+        stop_action.play(stop_action.user)
+      rescue => e
+        Gitlab::ErrorTracking.track_error(e, deployable_id: stop_action.id)
+      end
+    end
+
     private
 
     def environments
diff --git a/app/services/environments/auto_stop_service.rb b/app/services/environments/auto_stop_service.rb
new file mode 100644
index 0000000000000000000000000000000000000000..6eef8138493b0368b331ddc6caf7af68580b048b
--- /dev/null
+++ b/app/services/environments/auto_stop_service.rb
@@ -0,0 +1,38 @@
+# frozen_string_literal: true
+
+module Environments
+  class AutoStopService
+    include ::Gitlab::ExclusiveLeaseHelpers
+    include ::Gitlab::LoopHelpers
+
+    BATCH_SIZE = 100
+    LOOP_TIMEOUT = 45.minutes
+    LOOP_LIMIT = 1000
+    EXCLUSIVE_LOCK_KEY = 'environments:auto_stop:lock'
+    LOCK_TIMEOUT = 50.minutes
+
+    ##
+    # Stop expired environments on GitLab instance
+    #
+    # This auto stop process cannot run for more than 45 minutes. This is for
+    # preventing multiple `AutoStopCronWorker` CRON jobs run concurrently,
+    # which is scheduled at every hour.
+    def execute
+      in_lock(EXCLUSIVE_LOCK_KEY, ttl: LOCK_TIMEOUT, retries: 1) do
+        loop_until(timeout: LOOP_TIMEOUT, limit: LOOP_LIMIT) do
+          stop_in_batch
+        end
+      end
+    end
+
+    private
+
+    def stop_in_batch
+      environments = Environment.auto_stoppable(BATCH_SIZE)
+
+      return false unless environments.exists? && Feature.enabled?(:auto_stop_environments)
+
+      Ci::StopEnvironmentsService.execute_in_batch(environments)
+    end
+  end
+end
diff --git a/app/workers/all_queues.yml b/app/workers/all_queues.yml
index 3bf46675ad56626b2cd6d00c9128ee28586040f2..5ff1a331b09bb1ae4a5045ef88a412ed545235af 100644
--- a/app/workers/all_queues.yml
+++ b/app/workers/all_queues.yml
@@ -75,6 +75,12 @@
   :latency_sensitive: 
   :resource_boundary: :unknown
   :weight: 1
+- :name: cronjob:environments_auto_stop_cron
+  :feature_category: :continuous_delivery
+  :has_external_dependencies: 
+  :latency_sensitive: 
+  :resource_boundary: :unknown
+  :weight: 1
 - :name: cronjob:expire_build_artifacts
   :feature_category: :continuous_integration
   :has_external_dependencies: 
diff --git a/app/workers/environments/auto_stop_cron_worker.rb b/app/workers/environments/auto_stop_cron_worker.rb
new file mode 100644
index 0000000000000000000000000000000000000000..8fcda35b414d74b62da0c16ee7199c2a19bb621e
--- /dev/null
+++ b/app/workers/environments/auto_stop_cron_worker.rb
@@ -0,0 +1,16 @@
+# frozen_string_literal: true
+
+module Environments
+  class AutoStopCronWorker
+    include ApplicationWorker
+    include CronjobQueue # rubocop:disable Scalability/CronWorkerContext
+
+    feature_category :continuous_delivery
+
+    def perform
+      return unless Feature.enabled?(:auto_stop_environments)
+
+      AutoStopService.new.execute
+    end
+  end
+end
diff --git a/config/gitlab.yml.example b/config/gitlab.yml.example
index 550973e19f7b8eeeff45053e64ebb67ac1f17238..416f8f3867c0396b45e6b453fad16837ec50370c 100644
--- a/config/gitlab.yml.example
+++ b/config/gitlab.yml.example
@@ -373,6 +373,9 @@ production: &base
     # Remove expired build artifacts
     expire_build_artifacts_worker:
       cron: "50 * * * *"
+    # Stop expired environments
+    environments_auto_stop_cron_worker:
+      cron: "24 * * * *"
     # Periodically run 'git fsck' on all repositories. If started more than
     # once per hour you will have concurrent 'git fsck' jobs.
     repository_check_worker:
diff --git a/config/initializers/1_settings.rb b/config/initializers/1_settings.rb
index a6fbb8608b3a7a2e8d6c8cda45314f0fd4129127..f52272f3ca6521c3b89d6954301dea324b70020b 100644
--- a/config/initializers/1_settings.rb
+++ b/config/initializers/1_settings.rb
@@ -403,6 +403,9 @@
 Settings.cron_jobs['expire_build_artifacts_worker'] ||= Settingslogic.new({})
 Settings.cron_jobs['expire_build_artifacts_worker']['cron'] ||= '50 * * * *'
 Settings.cron_jobs['expire_build_artifacts_worker']['job_class'] = 'ExpireBuildArtifactsWorker'
+Settings.cron_jobs['environments_auto_stop_cron_worker'] ||= Settingslogic.new({})
+Settings.cron_jobs['environments_auto_stop_cron_worker']['cron'] ||= '24 * * * *'
+Settings.cron_jobs['environments_auto_stop_cron_worker']['job_class'] = 'Environments::AutoStopCronWorker'
 Settings.cron_jobs['repository_check_worker'] ||= Settingslogic.new({})
 Settings.cron_jobs['repository_check_worker']['cron'] ||= '20 * * * *'
 Settings.cron_jobs['repository_check_worker']['job_class'] = 'RepositoryCheck::DispatchWorker'
diff --git a/db/migrate/20200131140428_create_index_on_auto_stop_in.rb b/db/migrate/20200131140428_create_index_on_auto_stop_in.rb
new file mode 100644
index 0000000000000000000000000000000000000000..526e7dcfe3006f954b316344d129e1ed58655e8f
--- /dev/null
+++ b/db/migrate/20200131140428_create_index_on_auto_stop_in.rb
@@ -0,0 +1,17 @@
+# frozen_string_literal: true
+
+class CreateIndexOnAutoStopIn < ActiveRecord::Migration[6.0]
+  include Gitlab::Database::MigrationHelpers
+
+  DOWNTIME = false
+
+  disable_ddl_transaction!
+
+  def up
+    add_concurrent_index :environments, %i[state auto_stop_at], where: "auto_stop_at IS NOT NULL AND state = 'available'"
+  end
+
+  def down
+    remove_concurrent_index :environments, %i[state auto_stop_at]
+  end
+end
diff --git a/db/schema.rb b/db/schema.rb
index 90f2bb5cf0f4247d49111e03e1bbeed7cc482945..cef0237fcb877544241135556f6cd025ba4a2691 100644
--- a/db/schema.rb
+++ b/db/schema.rb
@@ -1494,10 +1494,12 @@
     t.string "state", default: "available", null: false
     t.string "slug", null: false
     t.datetime_with_timezone "auto_stop_at"
+    t.index ["auto_stop_at"], name: "index_environments_on_auto_stop_at", where: "(auto_stop_at IS NOT NULL)"
     t.index ["name"], name: "index_environments_on_name_varchar_pattern_ops", opclass: :varchar_pattern_ops
     t.index ["project_id", "name"], name: "index_environments_on_project_id_and_name", unique: true
     t.index ["project_id", "slug"], name: "index_environments_on_project_id_and_slug", unique: true
     t.index ["project_id", "state", "environment_type"], name: "index_environments_on_project_id_state_environment_type"
+    t.index ["state", "auto_stop_at"], name: "index_environments_on_state_and_auto_stop_at", where: "((auto_stop_at IS NOT NULL) AND ((state)::text = 'available'::text))"
   end
 
   create_table "epic_issues", id: :serial, force: :cascade do |t|
diff --git a/spec/factories/environments.rb b/spec/factories/environments.rb
index 323ea2d478b6ebebf10bebf3ea9650213f730a9c..998672ebe7cf2fd2e230e8fd7aa46ed9e623d2af 100644
--- a/spec/factories/environments.rb
+++ b/spec/factories/environments.rb
@@ -45,7 +45,7 @@
       self.when { 'manual' }
     end
 
-    trait :auto_stopped do
+    trait :auto_stoppable do
       auto_stop_at { 1.day.ago }
     end
 
diff --git a/spec/models/deployment_spec.rb b/spec/models/deployment_spec.rb
index bdbe38afc56a4fae88d92b75488e47834a323ec1..89fb4eb3ff27592d4995e25d9c5a35d0de95a092 100644
--- a/spec/models/deployment_spec.rb
+++ b/spec/models/deployment_spec.rb
@@ -51,6 +51,22 @@
     end
   end
 
+  describe '.stoppable' do
+    subject { described_class.stoppable }
+
+    context 'when deployment is stoppable' do
+      let!(:deployment) { create(:deployment, :success, on_stop: 'stop-review') }
+
+      it { is_expected.to eq([deployment]) }
+    end
+
+    context 'when deployment is not stoppable' do
+      let!(:deployment) { create(:deployment, :failed) }
+
+      it { is_expected.to be_empty }
+    end
+  end
+
   describe '.success' do
     subject { described_class.success }
 
diff --git a/spec/models/environment_spec.rb b/spec/models/environment_spec.rb
index af7ab24d7d68ec4f71c740b0a3797875c4b5455d..72143d69fc8cf0d44d40c94f3b52767b30c22f01 100644
--- a/spec/models/environment_spec.rb
+++ b/spec/models/environment_spec.rb
@@ -7,6 +7,7 @@
   using RSpec::Parameterized::TableSyntax
   include RepoHelpers
   include StubENV
+  include CreateEnvironmentsHelpers
 
   let(:project) { create(:project, :repository) }
 
@@ -114,6 +115,72 @@
     end
   end
 
+  describe '.auto_stoppable' do
+    subject { described_class.auto_stoppable(limit) }
+
+    let(:limit) { 100 }
+
+    context 'when environment is auto-stoppable' do
+      let!(:environment) { create(:environment, :auto_stoppable) }
+
+      it { is_expected.to eq([environment]) }
+    end
+
+    context 'when environment is not auto-stoppable' do
+      let!(:environment) { create(:environment) }
+
+      it { is_expected.to be_empty }
+    end
+  end
+
+  describe '.stop_actions' do
+    subject { environments.stop_actions }
+
+    let_it_be(:project) { create(:project, :repository) }
+    let_it_be(:user) { create(:user) }
+    let(:environments) { Environment.all }
+
+    before_all do
+      project.add_developer(user)
+      project.repository.add_branch(user, 'review/feature-1', 'master')
+      project.repository.add_branch(user, 'review/feature-2', 'master')
+    end
+
+    shared_examples_for 'correct filtering' do
+      it 'returns stop actions for available environments only' do
+        expect(subject.count).to eq(1)
+        expect(subject.first.name).to eq('stop_review_app')
+        expect(subject.first.ref).to eq('review/feature-1')
+      end
+    end
+
+    before do
+      create_review_app(user, project, 'review/feature-1')
+      create_review_app(user, project, 'review/feature-2')
+    end
+
+    it 'returns stop actions for environments' do
+      expect(subject.count).to eq(2)
+      expect(subject).to match_array(Ci::Build.where(name: 'stop_review_app'))
+    end
+
+    context 'when one of the stop actions has already been executed' do
+      before do
+        Ci::Build.where(ref: 'review/feature-2').find_by_name('stop_review_app').enqueue!
+      end
+
+      it_behaves_like 'correct filtering'
+    end
+
+    context 'when one of the deployments does not have stop action' do
+      before do
+        Deployment.where(ref: 'review/feature-2').update_all(on_stop: nil)
+      end
+
+      it_behaves_like 'correct filtering'
+    end
+  end
+
   describe '.pluck_names' do
     subject { described_class.pluck_names }
 
@@ -449,7 +516,7 @@
   describe '#reset_auto_stop' do
     subject { environment.reset_auto_stop }
 
-    let(:environment) { create(:environment, :auto_stopped) }
+    let(:environment) { create(:environment, :auto_stoppable) }
 
     it 'nullifies the auto_stop_at' do
       expect { subject }.to change(environment, :auto_stop_at).from(Time).to(nil)
diff --git a/spec/services/ci/stop_environments_service_spec.rb b/spec/services/ci/stop_environments_service_spec.rb
index ed92625a2cce42a10c6634fcab108fa36c8f3f1d..88f2f5618c270c87299d82a14bbabbcf7a5f8a73 100644
--- a/spec/services/ci/stop_environments_service_spec.rb
+++ b/spec/services/ci/stop_environments_service_spec.rb
@@ -3,6 +3,8 @@
 require 'spec_helper'
 
 describe Ci::StopEnvironmentsService do
+  include CreateEnvironmentsHelpers
+
   let(:project) { create(:project, :private, :repository) }
   let(:user) { create(:user) }
 
@@ -181,6 +183,55 @@
     end
   end
 
+  describe '.execute_in_batch' do
+    subject { described_class.execute_in_batch(environments) }
+
+    let_it_be(:project) { create(:project, :repository) }
+    let_it_be(:user) { create(:user) }
+    let(:environments) { Environment.available }
+
+    before_all do
+      project.add_developer(user)
+      project.repository.add_branch(user, 'review/feature-1', 'master')
+      project.repository.add_branch(user, 'review/feature-2', 'master')
+    end
+
+    before do
+      create_review_app(user, project, 'review/feature-1')
+      create_review_app(user, project, 'review/feature-2')
+    end
+
+    it 'stops environments' do
+      expect { subject }
+        .to change { project.environments.all.map(&:state).uniq }
+        .from(['available']).to(['stopped'])
+
+      expect(project.environments.all.map(&:auto_stop_at).uniq).to eq([nil])
+    end
+
+    it 'plays stop actions' do
+      expect { subject }
+        .to change { Ci::Build.where(name: 'stop_review_app').map(&:status).uniq }
+        .from(['manual']).to(['pending'])
+    end
+
+    context 'when user does not have a permission to play the stop action' do
+      before do
+        Ci::Build.find_by_ref('review/feature-2').update_column(:user_id, nil)
+      end
+
+      it 'tracks the exception' do
+        deployable = Ci::Build.find_by_ref('review/feature-2')
+
+        expect(Gitlab::ErrorTracking)
+          .to receive(:track_error)
+          .with(Gitlab::Access::AccessDeniedError, deployable_id: deployable.id).once
+
+        subject
+      end
+    end
+  end
+
   def expect_environment_stopped_on(branch)
     expect_any_instance_of(Environment)
       .to receive(:stop!)
diff --git a/spec/services/environments/auto_stop_service_spec.rb b/spec/services/environments/auto_stop_service_spec.rb
new file mode 100644
index 0000000000000000000000000000000000000000..3620bf8fe87ec86025c298a996c478d2731ff6b6
--- /dev/null
+++ b/spec/services/environments/auto_stop_service_spec.rb
@@ -0,0 +1,94 @@
+# frozen_string_literal: true
+
+require 'spec_helper'
+
+describe Environments::AutoStopService, :clean_gitlab_redis_shared_state do
+  include CreateEnvironmentsHelpers
+  include ExclusiveLeaseHelpers
+
+  let_it_be(:project) { create(:project, :repository) }
+  let_it_be(:user) { create(:user) }
+  let(:service) { described_class.new }
+
+  before_all do
+    project.add_developer(user)
+  end
+
+  describe '#execute' do
+    subject { service.execute }
+
+    let_it_be(:project) { create(:project, :repository) }
+    let_it_be(:user) { create(:user) }
+    let(:environments) { Environment.all }
+
+    before_all do
+      project.add_developer(user)
+      project.repository.add_branch(user, 'review/feature-1', 'master')
+      project.repository.add_branch(user, 'review/feature-2', 'master')
+    end
+
+    before do
+      create_review_app(user, project, 'review/feature-1')
+      create_review_app(user, project, 'review/feature-2')
+    end
+
+    it 'stops environments and play stop jobs' do
+      expect { subject }
+        .to change { Environment.all.map(&:state).uniq }
+        .from(['available']).to(['stopped'])
+
+      expect(Ci::Build.where(name: 'stop_review_app').map(&:status).uniq).to eq(['pending'])
+    end
+
+    context 'when auto_stop_environments feature flag is disabled' do
+      before do
+        stub_feature_flags(auto_stop_environments: false)
+      end
+
+      it 'does not execute Ci::StopEnvironmentsService' do
+        expect(Ci::StopEnvironmentsService).not_to receive(:execute_in_batch)
+
+        subject
+      end
+    end
+
+    context 'when the other sidekiq worker has already been running' do
+      before do
+        stub_exclusive_lease_taken(described_class::EXCLUSIVE_LOCK_KEY)
+      end
+
+      it 'does not execute stop_in_batch' do
+        expect_next_instance_of(described_class) do |service|
+          expect(service).not_to receive(:stop_in_batch)
+        end
+
+        expect { subject }.to raise_error(Gitlab::ExclusiveLeaseHelpers::FailedToObtainLockError)
+      end
+    end
+
+    context 'when loop reached timeout' do
+      before do
+        stub_const("#{described_class}::LOOP_TIMEOUT", 0.seconds)
+        stub_const("#{described_class}::LOOP_LIMIT", 100_000)
+        allow_next_instance_of(described_class) do |service|
+          allow(service).to receive(:stop_in_batch) { true }
+        end
+      end
+
+      it 'returns false and does not continue the process' do
+        is_expected.to eq(false)
+      end
+    end
+
+    context 'when loop reached loop limit' do
+      before do
+        stub_const("#{described_class}::LOOP_LIMIT", 1)
+        stub_const("#{described_class}::BATCH_SIZE", 1)
+      end
+
+      it 'stops only one available environment' do
+        expect { subject }.to change { Environment.available.count }.by(-1)
+      end
+    end
+  end
+end
diff --git a/spec/support/helpers/create_environments_helpers.rb b/spec/support/helpers/create_environments_helpers.rb
new file mode 100644
index 0000000000000000000000000000000000000000..be105f5862b4e7121e723f0d186aa034b54d27cd
--- /dev/null
+++ b/spec/support/helpers/create_environments_helpers.rb
@@ -0,0 +1,13 @@
+# frozen_string_literal: true
+
+module CreateEnvironmentsHelpers
+  def create_review_app(user, project, ref)
+    common = { project: project, ref: ref, user: user }
+    pipeline = create(:ci_pipeline, **common)
+    start_review = create(:ci_build, :start_review_app, :success, **common, pipeline: pipeline)
+    stop_review = create(:ci_build, :stop_review_app, :manual, **common, pipeline: pipeline)
+    environment = create(:environment, :auto_stoppable, project: project, name: ref)
+    create(:deployment, :success, **common, on_stop: stop_review.name,
+      deployable: start_review, environment: environment)
+  end
+end
diff --git a/spec/workers/environments/auto_stop_cron_worker_spec.rb b/spec/workers/environments/auto_stop_cron_worker_spec.rb
new file mode 100644
index 0000000000000000000000000000000000000000..6773637d4a77996543584f793ec69380b3dc2c84
--- /dev/null
+++ b/spec/workers/environments/auto_stop_cron_worker_spec.rb
@@ -0,0 +1,17 @@
+# frozen_string_literal: true
+
+require 'spec_helper'
+
+describe Environments::AutoStopCronWorker do
+  subject { worker.perform }
+
+  let(:worker) { described_class.new }
+
+  it 'executes Environments::AutoStopService' do
+    expect_next_instance_of(Environments::AutoStopService) do |service|
+      expect(service).to receive(:execute)
+    end
+
+    subject
+  end
+end