diff --git a/ee/app/workers/elastic/migration_worker.rb b/ee/app/workers/elastic/migration_worker.rb index 984d997a0b550b2e39b04c0dfbb10d0cbebceaea..2bf4973d2ca8bc30ca8c1a973e412046094a5b38 100644 --- a/ee/app/workers/elastic/migration_worker.rb +++ b/ee/app/workers/elastic/migration_worker.rb @@ -31,12 +31,12 @@ def perform migration = Elastic::MigrationRecord.current_migration unless migration - logger.info(structured_payload(message: 'MigrationWorker: no migration available')) + log message: 'MigrationWorker: no migration available' break false end if migration.halted? - logger.error(structured_payload(message: "MigrationWorker: migration[#{migration.name}] has been halted. All future migrations will be halted because of that. Exiting")) + error message: "MigrationWorker: migration[#{migration.name}] has been halted. All future migrations will be halted because of that. Exiting" unpause_indexing!(migration) break false @@ -45,11 +45,11 @@ def perform if !migration.started? && migration.space_requirements? free_size_bytes = helper.cluster_free_size_bytes space_required_bytes = migration.space_required_bytes - logger.info(structured_payload(message: "MigrationWorker: migration[#{migration.name}] checking free space in cluster. Required space #{number_to_human_size(space_required_bytes)}. Free space #{number_to_human_size(free_size_bytes)}.")) + log message: "MigrationWorker: migration[#{migration.name}] checking free space in cluster. Required space #{number_to_human_size(space_required_bytes)}. Free space #{number_to_human_size(free_size_bytes)}." if free_size_bytes < space_required_bytes - logger.warn(structured_payload(message: "MigrationWorker: migration[#{migration.name}] You should have at least #{number_to_human_size(space_required_bytes)} of free space in the cluster to run this migration. Please increase the storage in your Elasticsearch cluster.")) - logger.info(structured_payload(message: "MigrationWorker: migration[#{migration.name}] updating with halted: true")) + warn message: "MigrationWorker: migration[#{migration.name}] You should have at least #{number_to_human_size(space_required_bytes)} of free space in the cluster to run this migration. Please increase the storage in your Elasticsearch cluster." + log message: "MigrationWorker: migration[#{migration.name}] updating with halted: true" migration.halt break false @@ -67,11 +67,23 @@ def perform enqueue_next_batch(migration) end rescue StandardError => e - logger.error(structured_payload(message: "#{self.class.name}: #{e.class} #{e.message}", backtrace: e.backtrace.join("\n"))) + error message: "#{self.class.name}: #{e.class} #{e.message}", backtrace: e.backtrace.join("\n") end private + def log(...) + logger.info(structured_payload(...)) + end + + def warn(...) + logger.warn(structured_payload(...)) + end + + def error(...) + logger.error(structured_payload(...)) + end + def preflight_check_successful? return false if Feature.disabled?(:elastic_migration_worker, type: :ops) return false unless Gitlab::CurrentSettings.elasticsearch_indexing? @@ -79,25 +91,30 @@ def preflight_check_successful? return false if Elastic::ReindexingTask.current if helper.unsupported_version? - logger.info(structured_payload(message: 'MigrationWorker: You are using an unsupported version of Elasticsearch. Indexing will be paused to prevent data loss')) + log message: 'MigrationWorker: You are using an unsupported version of Elasticsearch. Indexing will be paused to prevent data loss' Gitlab::CurrentSettings.update!(elasticsearch_pause_indexing: true) return false end + unless Search::ClusterHealthCheck::Elastic.healthy? + error message: "Advanced search cluster is unhealthy. Execution is skipped." + return false + end + true end def execute_migration(migration) if migration.started? && !migration.batched? && !migration.retry_on_failure? - logger.info(structured_payload(message: "MigrationWorker: migration[#{migration.name}] did not execute migrate method since it was already executed. Waiting for migration to complete")) + log message: "MigrationWorker: migration[#{migration.name}] did not execute migrate method since it was already executed. Waiting for migration to complete" return end pause_indexing!(migration) - logger.info(structured_payload(message: "MigrationWorker: migration[#{migration.name}] executing migrate method")) + log message: "MigrationWorker: migration[#{migration.name}] executing migrate method" migration.migrate rescue StandardError => e retry_migration(migration, e) if migration.retry_on_failure? @@ -108,11 +125,11 @@ def execute_migration(migration) def retry_migration(migration, exception) if migration.current_attempt >= migration.max_attempts message = "MigrationWorker: migration has failed with #{exception.class}:#{exception.message}, no retries left" - logger.error(structured_payload(message: message, backtrace: exception.backtrace.join("\n"))) + error message: message, backtrace: exception.backtrace.join("\n") migration.fail(message: message) else - logger.info(structured_payload(message: "MigrationWorker: increasing previous_attempts to #{migration.current_attempt}")) + log message: "MigrationWorker: increasing previous_attempts to #{migration.current_attempt}" migration.save_state!(previous_attempts: migration.current_attempt) end end @@ -120,13 +137,13 @@ def retry_migration(migration, exception) def enqueue_next_batch(migration) return unless migration.batched? && !migration.completed? - logger.info(structured_payload(message: "MigrationWorker: migration[#{migration.name}] kicking off next migration batch")) + log message: "MigrationWorker: migration[#{migration.name}] kicking off next migration batch" Elastic::MigrationWorker.perform_in(migration.throttle_delay) end def check_and_save_completed_status(migration) migration.completed?.tap do |status| - logger.info(structured_payload(message: "MigrationWorker: migration[#{migration.name}] updating with completed: #{status}")) + log message: "MigrationWorker: migration[#{migration.name}] updating with completed: #{status}" migration.save!(completed: status) end end @@ -139,7 +156,7 @@ def pause_indexing!(migration) migration.save_state!(pause_indexing: pause_indexing) if pause_indexing - logger.info(structured_payload(message: 'MigrationWorker: Pausing indexing')) + log message: 'MigrationWorker: Pausing indexing' Gitlab::CurrentSettings.update!(elasticsearch_pause_indexing: true) end end @@ -149,7 +166,7 @@ def unpause_indexing!(migration) return unless migration.load_state[:pause_indexing] return if migration.load_state[:halted_indexing_unpaused] - logger.info(structured_payload(message: 'MigrationWorker: unpausing indexing')) + log message: 'MigrationWorker: unpausing indexing' Gitlab::CurrentSettings.update!(elasticsearch_pause_indexing: false) migration.save_state!(halted_indexing_unpaused: true) if migration.halted? @@ -158,7 +175,7 @@ def unpause_indexing!(migration) def create_migrations_index_if_not_exists return if helper.migrations_index_exists? - logger.info(structured_payload(message: 'MigrationWorker: creating migrations index')) + log message: 'MigrationWorker: creating migrations index' helper.create_migrations_index end diff --git a/ee/spec/workers/elastic/migration_worker_spec.rb b/ee/spec/workers/elastic/migration_worker_spec.rb index 7633c1eee1bbf52eb0a7f72fcf2c4f163f832835..58c76bcb5add9b3c106ae257ce90653eb4e442dd 100644 --- a/ee/spec/workers/elastic/migration_worker_spec.rb +++ b/ee/spec/workers/elastic/migration_worker_spec.rb @@ -47,6 +47,20 @@ end end + context 'cluster is unhealthy' do + before do + stub_ee_application_setting(elasticsearch_indexing: true) + allow(Search::ClusterHealthCheck::Elastic).to receive(:healthy?).and_return(false) + allow(::Gitlab::Elasticsearch::Logger).to receive(:build).and_return(logger) + end + + it 'raises an error and does not execute migration' do + expect(subject).not_to receive(:execute_migration) + expect(logger).to receive(:error) + expect(subject.perform).to be_falsey + end + end + context 'reindexing task is in progress' do let!(:task) { create(:elastic_reindexing_task) }