diff --git a/.rubocop_todo/database/multiple_databases.yml b/.rubocop_todo/database/multiple_databases.yml index a2e9a7e1e7e47366377a2443188c4e0e661063c4..e555c2f912b321b63d1fa580b6e9d405da381cac 100644 --- a/.rubocop_todo/database/multiple_databases.yml +++ b/.rubocop_todo/database/multiple_databases.yml @@ -10,7 +10,6 @@ Database/MultipleDatabases: - ee/lib/pseudonymizer/pager.rb - ee/lib/system_check/geo/geo_database_configured_check.rb - ee/spec/lib/pseudonymizer/dumper_spec.rb - - ee/spec/models/pg_replication_slot_spec.rb - ee/spec/services/ee/merge_requests/update_service_spec.rb - lib/backup/database.rb - lib/backup/manager.rb diff --git a/app/models/postgresql/replication_slot.rb b/app/models/postgresql/replication_slot.rb index 1a4d3bd57945a9897fde399fb9382ce8232c3b1c..1c38edcca617347955e549c71d719af1a4a1b6d1 100644 --- a/app/models/postgresql/replication_slot.rb +++ b/app/models/postgresql/replication_slot.rb @@ -1,7 +1,7 @@ # frozen_string_literal: true module Postgresql - class ReplicationSlot < ApplicationRecord + class ReplicationSlot < Gitlab::Database::SharedModel self.table_name = 'pg_replication_slots' # Returns true if there are any replication slots in use. diff --git a/app/workers/background_migration/single_database_worker.rb b/app/workers/background_migration/single_database_worker.rb new file mode 100644 index 0000000000000000000000000000000000000000..b6661d4fd14b1a188a4f2558b2fd8e46cdfc8b8f --- /dev/null +++ b/app/workers/background_migration/single_database_worker.rb @@ -0,0 +1,148 @@ +# frozen_string_literal: true + +module BackgroundMigration + module SingleDatabaseWorker + extend ActiveSupport::Concern + + include ApplicationWorker + + MAX_LEASE_ATTEMPTS = 5 + + included do + data_consistency :always + + sidekiq_options retry: 3 + + feature_category :database + urgency :throttled + loggable_arguments 0, 1 + end + + class_methods do + # The minimum amount of time between processing two jobs of the same migration + # class. + # + # This interval is set to 2 or 5 minutes so autovacuuming and other + # maintenance related tasks have plenty of time to clean up after a migration + # has been performed. + def minimum_interval + 2.minutes.to_i + end + + def tracking_database + raise NotImplementedError, "#{self.name} does not implement #{__method__}" + end + + def unhealthy_metric_name + raise NotImplementedError, "#{self.name} does not implement #{__method__}" + end + end + + # Performs the background migration. + # + # See Gitlab::BackgroundMigration.perform for more information. + # + # class_name - The class name of the background migration to run. + # arguments - The arguments to pass to the migration class. + # lease_attempts - The number of times we will try to obtain an exclusive + # lease on the class before giving up. See MR for more discussion. + # https://gitlab.com/gitlab-org/gitlab/-/merge_requests/45298#note_434304956 + def perform(class_name, arguments = [], lease_attempts = MAX_LEASE_ATTEMPTS) + job_coordinator.with_shared_connection do + perform_with_connection(class_name, arguments, lease_attempts) + end + end + + private + + def job_coordinator + @job_coordinator ||= Gitlab::BackgroundMigration.coordinator_for_database(self.class.tracking_database) + end + + def perform_with_connection(class_name, arguments, lease_attempts) + with_context(caller_id: class_name.to_s) do + retried = lease_attempts != MAX_LEASE_ATTEMPTS + attempts_left = lease_attempts - 1 + should_perform, ttl = perform_and_ttl(class_name, attempts_left, retried) + + break if should_perform.nil? + + if should_perform + job_coordinator.perform(class_name, arguments) + else + # If the lease could not be obtained this means either another process is + # running a migration of this class or we ran one recently. In this case + # we'll reschedule the job in such a way that it is picked up again around + # the time the lease expires. + self.class + .perform_in(ttl || self.class.minimum_interval, class_name, arguments, attempts_left) + end + end + end + + def perform_and_ttl(class_name, attempts_left, retried) + # In test environments `perform_in` will run right away. This can then + # lead to stack level errors in the above `#perform`. To work around this + # we'll just perform the migration right away in the test environment. + return [true, nil] if always_perform? + + lease = lease_for(class_name, retried) + lease_obtained = !!lease.try_obtain + healthy_db = healthy_database? + perform = lease_obtained && healthy_db + + database_unhealthy_counter.increment if lease_obtained && !healthy_db + + # When the DB is unhealthy or the lease can't be obtained after several tries, + # then give up on the job and log a warning. Otherwise we could end up in + # an infinite rescheduling loop. Jobs can be tracked in the database with the + # use of Gitlab::Database::BackgroundMigrationJob + if !perform && attempts_left < 0 + msg = if !lease_obtained + 'Job could not get an exclusive lease after several tries. Giving up.' + else + 'Database was unhealthy after several tries. Giving up.' + end + + Sidekiq.logger.warn(class: class_name, message: msg, job_id: jid) + + return [nil, nil] + end + + [perform, lease.ttl] + end + + def lease_for(class_name, retried) + Gitlab::ExclusiveLease + .new(lease_key_for(class_name, retried), timeout: self.class.minimum_interval) + end + + def lease_key_for(class_name, retried) + key = "#{self.class.name}:#{class_name}" + # We use a different exclusive lock key for retried jobs to allow them running concurrently with the scheduled jobs. + # See https://gitlab.com/gitlab-org/gitlab/-/merge_requests/68763 for more information. + key += ":retried" if retried + key + end + + def always_perform? + Rails.env.test? + end + + # Returns true if the database is healthy enough to allow the migration to be + # performed. + # + # class_name - The name of the background migration that we might want to + # run. + def healthy_database? + !Postgresql::ReplicationSlot.lag_too_great? + end + + def database_unhealthy_counter + Gitlab::Metrics.counter( + self.class.unhealthy_metric_name, + 'The number of times a background migration is rescheduled because the database is unhealthy.' + ) + end + end +end diff --git a/app/workers/background_migration_worker.rb b/app/workers/background_migration_worker.rb index b771ab4d4e7be7fb69feabd0865ad333548f089b..6489aad3173e6b0b95fb2961ab56cd7de3be5554 100644 --- a/app/workers/background_migration_worker.rb +++ b/app/workers/background_migration_worker.rb @@ -1,120 +1,13 @@ # frozen_string_literal: true class BackgroundMigrationWorker # rubocop:disable Scalability/IdempotentWorker - include ApplicationWorker + include BackgroundMigration::SingleDatabaseWorker - MAX_LEASE_ATTEMPTS = 5 - - data_consistency :always - - sidekiq_options retry: 3 - - feature_category :database - urgency :throttled - loggable_arguments 0, 1 - - # The minimum amount of time between processing two jobs of the same migration - # class. - # - # This interval is set to 2 or 5 minutes so autovacuuming and other - # maintenance related tasks have plenty of time to clean up after a migration - # has been performed. - def self.minimum_interval - 2.minutes.to_i - end - - # Performs the background migration. - # - # See Gitlab::BackgroundMigration.perform for more information. - # - # class_name - The class name of the background migration to run. - # arguments - The arguments to pass to the migration class. - # lease_attempts - The number of times we will try to obtain an exclusive - # lease on the class before giving up. See MR for more discussion. - # https://gitlab.com/gitlab-org/gitlab/-/merge_requests/45298#note_434304956 - def perform(class_name, arguments = [], lease_attempts = MAX_LEASE_ATTEMPTS) - with_context(caller_id: class_name.to_s) do - retried = lease_attempts != MAX_LEASE_ATTEMPTS - attempts_left = lease_attempts - 1 - should_perform, ttl = perform_and_ttl(class_name, attempts_left, retried) - - break if should_perform.nil? - - if should_perform - Gitlab::BackgroundMigration.perform(class_name, arguments) - else - # If the lease could not be obtained this means either another process is - # running a migration of this class or we ran one recently. In this case - # we'll reschedule the job in such a way that it is picked up again around - # the time the lease expires. - self.class - .perform_in(ttl || self.class.minimum_interval, class_name, arguments, attempts_left) - end - end - end - - def perform_and_ttl(class_name, attempts_left, retried) - # In test environments `perform_in` will run right away. This can then - # lead to stack level errors in the above `#perform`. To work around this - # we'll just perform the migration right away in the test environment. - return [true, nil] if always_perform? - - lease = lease_for(class_name, retried) - lease_obtained = !!lease.try_obtain - healthy_db = healthy_database? - perform = lease_obtained && healthy_db - - database_unhealthy_counter.increment if lease_obtained && !healthy_db - - # When the DB is unhealthy or the lease can't be obtained after several tries, - # then give up on the job and log a warning. Otherwise we could end up in - # an infinite rescheduling loop. Jobs can be tracked in the database with the - # use of Gitlab::Database::BackgroundMigrationJob - if !perform && attempts_left < 0 - msg = if !lease_obtained - 'Job could not get an exclusive lease after several tries. Giving up.' - else - 'Database was unhealthy after several tries. Giving up.' - end - - Sidekiq.logger.warn(class: class_name, message: msg, job_id: jid) - - return [nil, nil] - end - - [perform, lease.ttl] - end - - def lease_for(class_name, retried) - Gitlab::ExclusiveLease - .new(lease_key_for(class_name, retried), timeout: self.class.minimum_interval) - end - - def lease_key_for(class_name, retried) - key = "#{self.class.name}:#{class_name}" - # We use a different exclusive lock key for retried jobs to allow them running concurrently with the scheduled jobs. - # See https://gitlab.com/gitlab-org/gitlab/-/merge_requests/68763 for more information. - key += ":retried" if retried - key - end - - def always_perform? - Rails.env.test? - end - - # Returns true if the database is healthy enough to allow the migration to be - # performed. - # - # class_name - The name of the background migration that we might want to - # run. - def healthy_database? - !Postgresql::ReplicationSlot.lag_too_great? + def self.tracking_database + @tracking_database ||= Gitlab::Database::MAIN_DATABASE_NAME.to_sym end - def database_unhealthy_counter - Gitlab::Metrics.counter( - :background_migration_database_health_reschedules, - 'The number of times a background migration is rescheduled because the database is unhealthy.' - ) + def self.unhealthy_metric_name + @unhealthy_metric_name ||= :background_migration_database_health_reschedules end end diff --git a/spec/models/postgresql/replication_slot_spec.rb b/spec/models/postgresql/replication_slot_spec.rb index c3b67a2e7b870cf3a14381ee393bd521fcae1c34..63a19541ab5e18f5da91228cf22528b7cdddfe2c 100644 --- a/spec/models/postgresql/replication_slot_spec.rb +++ b/spec/models/postgresql/replication_slot_spec.rb @@ -3,6 +3,8 @@ require 'spec_helper' RSpec.describe Postgresql::ReplicationSlot do + it { is_expected.to be_a Gitlab::Database::SharedModel } + describe '.in_use?' do it 'returns true when replication slots are present' do expect(described_class).to receive(:exists?).and_return(true) @@ -73,28 +75,22 @@ before(:all) do skip('max_replication_slots too small') if skip_examples - @current_slot_count = ApplicationRecord + @current_slot_count = described_class .connection - .execute("SELECT COUNT(*) FROM pg_replication_slots;") - .first - .fetch('count') - .to_i + .select_value("SELECT COUNT(*) FROM pg_replication_slots") - @current_unused_count = ApplicationRecord + @current_unused_count = described_class .connection - .execute("SELECT COUNT(*) FROM pg_replication_slots WHERE active = 'f';") - .first - .fetch('count') - .to_i + .select_value("SELECT COUNT(*) FROM pg_replication_slots WHERE active = 'f';") - ApplicationRecord + described_class .connection .execute("SELECT * FROM pg_create_physical_replication_slot('test_slot');") end after(:all) do unless skip_examples - ApplicationRecord + described_class .connection .execute("SELECT pg_drop_replication_slot('test_slot');") end diff --git a/spec/support/shared_examples/workers/background_migration_worker_shared_examples.rb b/spec/support/shared_examples/workers/background_migration_worker_shared_examples.rb new file mode 100644 index 0000000000000000000000000000000000000000..ffde2e11c1d177f636c8ab99e6944b4debb86e15 --- /dev/null +++ b/spec/support/shared_examples/workers/background_migration_worker_shared_examples.rb @@ -0,0 +1,212 @@ +# frozen_string_literal: true + +RSpec.shared_examples 'it runs background migration jobs' do |tracking_database, metric_name| + describe 'defining the job attributes' do + it 'defines the data_consistency as always' do + expect(described_class.get_data_consistency).to eq(:always) + end + + it 'defines the retry count in sidekiq_options' do + expect(described_class.sidekiq_options['retry']).to eq(3) + end + + it 'defines the feature_category as database' do + expect(described_class.get_feature_category).to eq(:database) + end + + it 'defines the urgency as throttled' do + expect(described_class.get_urgency).to eq(:throttled) + end + + it 'defines the loggable_arguments' do + expect(described_class.loggable_arguments).to match_array([0, 1]) + end + end + + describe '.tracking_database' do + it 'does not raise an error' do + expect { described_class.tracking_database }.not_to raise_error + end + + it 'overrides the method to return the tracking database' do + expect(described_class.tracking_database).to eq(tracking_database) + end + end + + describe '.unhealthy_metric_name' do + it 'does not raise an error' do + expect { described_class.unhealthy_metric_name }.not_to raise_error + end + + it 'overrides the method to return the unhealthy metric name' do + expect(described_class.unhealthy_metric_name).to eq(metric_name) + end + end + + describe '.minimum_interval' do + it 'returns 2 minutes' do + expect(described_class.minimum_interval).to eq(2.minutes.to_i) + end + end + + describe '#perform' do + let(:worker) { described_class.new } + + before do + allow(worker).to receive(:jid).and_return(1) + allow(worker).to receive(:always_perform?).and_return(false) + + allow(Postgresql::ReplicationSlot).to receive(:lag_too_great?).and_return(false) + end + + it 'performs jobs using the coordinator for the correct database' do + expect_next_instance_of(Gitlab::BackgroundMigration::JobCoordinator) do |coordinator| + allow(coordinator).to receive(:with_shared_connection).and_yield + + expect(coordinator.database).to eq(tracking_database) + expect(coordinator).to receive(:perform).with('Foo', [10, 20]) + end + + worker.perform('Foo', [10, 20]) + end + + context 'when lease can be obtained' do + let(:coordinator) { double('job coordinator') } + + before do + allow(Gitlab::BackgroundMigration).to receive(:coordinator_for_database) + .with(tracking_database) + .and_return(coordinator) + + allow(coordinator).to receive(:with_shared_connection).and_yield + end + + it 'sets up the shared connection before checking replication' do + expect(coordinator).to receive(:with_shared_connection).and_yield.ordered + expect(Postgresql::ReplicationSlot).to receive(:lag_too_great?).and_return(false).ordered + + expect(coordinator).to receive(:perform).with('Foo', [10, 20]) + + worker.perform('Foo', [10, 20]) + end + + it 'performs a background migration' do + expect(coordinator).to receive(:perform).with('Foo', [10, 20]) + + worker.perform('Foo', [10, 20]) + end + + context 'when lease_attempts is 1' do + it 'performs a background migration' do + expect(coordinator).to receive(:perform).with('Foo', [10, 20]) + + worker.perform('Foo', [10, 20], 1) + end + end + + it 'can run scheduled job and retried job concurrently' do + expect(coordinator) + .to receive(:perform) + .with('Foo', [10, 20]) + .exactly(2).time + + worker.perform('Foo', [10, 20]) + worker.perform('Foo', [10, 20], described_class::MAX_LEASE_ATTEMPTS - 1) + end + + it 'sets the class that will be executed as the caller_id' do + expect(coordinator).to receive(:perform) do + expect(Gitlab::ApplicationContext.current).to include('meta.caller_id' => 'Foo') + end + + worker.perform('Foo', [10, 20]) + end + end + + context 'when lease not obtained (migration of same class was performed recently)' do + let(:timeout) { described_class.minimum_interval } + let(:lease_key) { "#{described_class.name}:Foo" } + let(:coordinator) { double('job coordinator') } + + before do + allow(Gitlab::BackgroundMigration).to receive(:coordinator_for_database) + .with(tracking_database) + .and_return(coordinator) + + allow(coordinator).to receive(:with_shared_connection).and_yield + + expect(coordinator).not_to receive(:perform) + + Gitlab::ExclusiveLease.new(lease_key, timeout: timeout).try_obtain + end + + it 'reschedules the migration and decrements the lease_attempts' do + expect(described_class) + .to receive(:perform_in) + .with(a_kind_of(Numeric), 'Foo', [10, 20], 4) + + worker.perform('Foo', [10, 20], 5) + end + + context 'when lease_attempts is 1' do + let(:lease_key) { "#{described_class.name}:Foo:retried" } + + it 'reschedules the migration and decrements the lease_attempts' do + expect(described_class) + .to receive(:perform_in) + .with(a_kind_of(Numeric), 'Foo', [10, 20], 0) + + worker.perform('Foo', [10, 20], 1) + end + end + + context 'when lease_attempts is 0' do + let(:lease_key) { "#{described_class.name}:Foo:retried" } + + it 'gives up performing the migration' do + expect(described_class).not_to receive(:perform_in) + expect(Sidekiq.logger).to receive(:warn).with( + class: 'Foo', + message: 'Job could not get an exclusive lease after several tries. Giving up.', + job_id: 1) + + worker.perform('Foo', [10, 20], 0) + end + end + end + + context 'when database is not healthy' do + before do + expect(Postgresql::ReplicationSlot).to receive(:lag_too_great?).and_return(true) + end + + it 'reschedules a migration if the database is not healthy' do + expect(described_class) + .to receive(:perform_in) + .with(a_kind_of(Numeric), 'Foo', [10, 20], 4) + + worker.perform('Foo', [10, 20]) + end + + it 'increments the unhealthy counter' do + counter = Gitlab::Metrics.counter(metric_name, 'msg') + + expect(described_class).to receive(:perform_in) + + expect { worker.perform('Foo', [10, 20]) }.to change { counter.get }.by(1) + end + + context 'when lease_attempts is 0' do + it 'gives up performing the migration' do + expect(described_class).not_to receive(:perform_in) + expect(Sidekiq.logger).to receive(:warn).with( + class: 'Foo', + message: 'Database was unhealthy after several tries. Giving up.', + job_id: 1) + + worker.perform('Foo', [10, 20], 0) + end + end + end + end +end diff --git a/spec/workers/background_migration_worker_spec.rb b/spec/workers/background_migration_worker_spec.rb index 7892eb89e808bd57e9d8f2eafdc1bd83dbfa7161..06513861c0e1074d0558cfbdbd3b394a4695bcfe 100644 --- a/spec/workers/background_migration_worker_spec.rb +++ b/spec/workers/background_migration_worker_spec.rb @@ -3,148 +3,5 @@ require 'spec_helper' RSpec.describe BackgroundMigrationWorker, :clean_gitlab_redis_shared_state do - let(:worker) { described_class.new } - - describe '.minimum_interval' do - it 'returns 2 minutes' do - expect(described_class.minimum_interval).to eq(2.minutes.to_i) - end - end - - describe '#perform' do - before do - allow(worker).to receive(:jid).and_return(1) - allow(worker).to receive(:always_perform?).and_return(false) - end - - it 'can run scheduled job and retried job concurrently' do - expect(Gitlab::BackgroundMigration) - .to receive(:perform) - .with('Foo', [10, 20]) - .exactly(2).time - - worker.perform('Foo', [10, 20]) - worker.perform('Foo', [10, 20], described_class::MAX_LEASE_ATTEMPTS - 1) - end - - context 'when lease can be obtained' do - before do - expect(Gitlab::BackgroundMigration) - .to receive(:perform) - .with('Foo', [10, 20]) - end - - it 'performs a background migration' do - worker.perform('Foo', [10, 20]) - end - - context 'when lease_attempts is 1' do - it 'performs a background migration' do - worker.perform('Foo', [10, 20], 1) - end - end - end - - context 'when lease not obtained (migration of same class was performed recently)' do - before do - expect(Gitlab::BackgroundMigration).not_to receive(:perform) - - worker.lease_for('Foo', false).try_obtain - end - - it 'reschedules the migration and decrements the lease_attempts' do - expect(described_class) - .to receive(:perform_in) - .with(a_kind_of(Numeric), 'Foo', [10, 20], 4) - - worker.perform('Foo', [10, 20], 5) - end - - context 'when lease_attempts is 1' do - before do - worker.lease_for('Foo', true).try_obtain - end - - it 'reschedules the migration and decrements the lease_attempts' do - expect(described_class) - .to receive(:perform_in) - .with(a_kind_of(Numeric), 'Foo', [10, 20], 0) - - worker.perform('Foo', [10, 20], 1) - end - end - - context 'when lease_attempts is 0' do - before do - worker.lease_for('Foo', true).try_obtain - end - - it 'gives up performing the migration' do - expect(described_class).not_to receive(:perform_in) - expect(Sidekiq.logger).to receive(:warn).with( - class: 'Foo', - message: 'Job could not get an exclusive lease after several tries. Giving up.', - job_id: 1) - - worker.perform('Foo', [10, 20], 0) - end - end - end - - context 'when database is not healthy' do - before do - allow(worker).to receive(:healthy_database?).and_return(false) - end - - it 'reschedules a migration if the database is not healthy' do - expect(described_class) - .to receive(:perform_in) - .with(a_kind_of(Numeric), 'Foo', [10, 20], 4) - - worker.perform('Foo', [10, 20]) - end - - context 'when lease_attempts is 0' do - it 'gives up performing the migration' do - expect(described_class).not_to receive(:perform_in) - expect(Sidekiq.logger).to receive(:warn).with( - class: 'Foo', - message: 'Database was unhealthy after several tries. Giving up.', - job_id: 1) - - worker.perform('Foo', [10, 20], 0) - end - end - end - - it 'sets the class that will be executed as the caller_id' do - expect(Gitlab::BackgroundMigration).to receive(:perform) do - expect(Gitlab::ApplicationContext.current).to include('meta.caller_id' => 'Foo') - end - - worker.perform('Foo', [10, 20]) - end - end - - describe '#healthy_database?' do - context 'when replication lag is too great' do - it 'returns false' do - allow(Postgresql::ReplicationSlot) - .to receive(:lag_too_great?) - .and_return(true) - - expect(worker.healthy_database?).to eq(false) - end - - context 'when replication lag is small enough' do - it 'returns true' do - allow(Postgresql::ReplicationSlot) - .to receive(:lag_too_great?) - .and_return(false) - - expect(worker.healthy_database?).to eq(true) - end - end - end - end + it_behaves_like 'it runs background migration jobs', :main, :background_migration_database_health_reschedules end