Skip to content
代码片段 群组 项目
提交 61d81c3d 编辑于 作者: Terri Chu's avatar Terri Chu
浏览文件

Merge branch 'add-cluster-health-to-preflight' into 'master'

Add cluster healthcheck to MigrationWorker

See merge request https://gitlab.com/gitlab-org/gitlab/-/merge_requests/135238



Merged-by: default avatarTerri Chu <tchu@gitlab.com>
Approved-by: default avatarSiddharth Dungarwal <sdungarwal@gitlab.com>
Approved-by: default avatarTerri Chu <tchu@gitlab.com>
Reviewed-by: default avatarTerri Chu <tchu@gitlab.com>
Co-authored-by: default avatarDmitry Gruzd <dgruzd@gitlab.com>
No related branches found
No related tags found
无相关合并请求
...@@ -31,12 +31,12 @@ def perform ...@@ -31,12 +31,12 @@ def perform
migration = Elastic::MigrationRecord.current_migration migration = Elastic::MigrationRecord.current_migration
unless migration unless migration
logger.info(structured_payload(message: 'MigrationWorker: no migration available')) log message: 'MigrationWorker: no migration available'
break false break false
end end
if migration.halted? if migration.halted?
logger.error(structured_payload(message: "MigrationWorker: migration[#{migration.name}] has been halted. All future migrations will be halted because of that. Exiting")) error message: "MigrationWorker: migration[#{migration.name}] has been halted. All future migrations will be halted because of that. Exiting"
unpause_indexing!(migration) unpause_indexing!(migration)
break false break false
...@@ -45,11 +45,11 @@ def perform ...@@ -45,11 +45,11 @@ def perform
if !migration.started? && migration.space_requirements? if !migration.started? && migration.space_requirements?
free_size_bytes = helper.cluster_free_size_bytes free_size_bytes = helper.cluster_free_size_bytes
space_required_bytes = migration.space_required_bytes space_required_bytes = migration.space_required_bytes
logger.info(structured_payload(message: "MigrationWorker: migration[#{migration.name}] checking free space in cluster. Required space #{number_to_human_size(space_required_bytes)}. Free space #{number_to_human_size(free_size_bytes)}.")) log message: "MigrationWorker: migration[#{migration.name}] checking free space in cluster. Required space #{number_to_human_size(space_required_bytes)}. Free space #{number_to_human_size(free_size_bytes)}."
if free_size_bytes < space_required_bytes if free_size_bytes < space_required_bytes
logger.warn(structured_payload(message: "MigrationWorker: migration[#{migration.name}] You should have at least #{number_to_human_size(space_required_bytes)} of free space in the cluster to run this migration. Please increase the storage in your Elasticsearch cluster.")) warn message: "MigrationWorker: migration[#{migration.name}] You should have at least #{number_to_human_size(space_required_bytes)} of free space in the cluster to run this migration. Please increase the storage in your Elasticsearch cluster."
logger.info(structured_payload(message: "MigrationWorker: migration[#{migration.name}] updating with halted: true")) log message: "MigrationWorker: migration[#{migration.name}] updating with halted: true"
migration.halt migration.halt
break false break false
...@@ -67,11 +67,23 @@ def perform ...@@ -67,11 +67,23 @@ def perform
enqueue_next_batch(migration) enqueue_next_batch(migration)
end end
rescue StandardError => e rescue StandardError => e
logger.error(structured_payload(message: "#{self.class.name}: #{e.class} #{e.message}", backtrace: e.backtrace.join("\n"))) error message: "#{self.class.name}: #{e.class} #{e.message}", backtrace: e.backtrace.join("\n")
end end
private private
def log(...)
logger.info(structured_payload(...))
end
def warn(...)
logger.warn(structured_payload(...))
end
def error(...)
logger.error(structured_payload(...))
end
def preflight_check_successful? def preflight_check_successful?
return false if Feature.disabled?(:elastic_migration_worker, type: :ops) return false if Feature.disabled?(:elastic_migration_worker, type: :ops)
return false unless Gitlab::CurrentSettings.elasticsearch_indexing? return false unless Gitlab::CurrentSettings.elasticsearch_indexing?
...@@ -79,25 +91,30 @@ def preflight_check_successful? ...@@ -79,25 +91,30 @@ def preflight_check_successful?
return false if Elastic::ReindexingTask.current return false if Elastic::ReindexingTask.current
if helper.unsupported_version? if helper.unsupported_version?
logger.info(structured_payload(message: 'MigrationWorker: You are using an unsupported version of Elasticsearch. Indexing will be paused to prevent data loss')) log message: 'MigrationWorker: You are using an unsupported version of Elasticsearch. Indexing will be paused to prevent data loss'
Gitlab::CurrentSettings.update!(elasticsearch_pause_indexing: true) Gitlab::CurrentSettings.update!(elasticsearch_pause_indexing: true)
return false return false
end end
unless Search::ClusterHealthCheck::Elastic.healthy?
error message: "Advanced search cluster is unhealthy. Execution is skipped."
return false
end
true true
end end
def execute_migration(migration) def execute_migration(migration)
if migration.started? && !migration.batched? && !migration.retry_on_failure? if migration.started? && !migration.batched? && !migration.retry_on_failure?
logger.info(structured_payload(message: "MigrationWorker: migration[#{migration.name}] did not execute migrate method since it was already executed. Waiting for migration to complete")) log message: "MigrationWorker: migration[#{migration.name}] did not execute migrate method since it was already executed. Waiting for migration to complete"
return return
end end
pause_indexing!(migration) pause_indexing!(migration)
logger.info(structured_payload(message: "MigrationWorker: migration[#{migration.name}] executing migrate method")) log message: "MigrationWorker: migration[#{migration.name}] executing migrate method"
migration.migrate migration.migrate
rescue StandardError => e rescue StandardError => e
retry_migration(migration, e) if migration.retry_on_failure? retry_migration(migration, e) if migration.retry_on_failure?
...@@ -108,11 +125,11 @@ def execute_migration(migration) ...@@ -108,11 +125,11 @@ def execute_migration(migration)
def retry_migration(migration, exception) def retry_migration(migration, exception)
if migration.current_attempt >= migration.max_attempts if migration.current_attempt >= migration.max_attempts
message = "MigrationWorker: migration has failed with #{exception.class}:#{exception.message}, no retries left" message = "MigrationWorker: migration has failed with #{exception.class}:#{exception.message}, no retries left"
logger.error(structured_payload(message: message, backtrace: exception.backtrace.join("\n"))) error message: message, backtrace: exception.backtrace.join("\n")
migration.fail(message: message) migration.fail(message: message)
else else
logger.info(structured_payload(message: "MigrationWorker: increasing previous_attempts to #{migration.current_attempt}")) log message: "MigrationWorker: increasing previous_attempts to #{migration.current_attempt}"
migration.save_state!(previous_attempts: migration.current_attempt) migration.save_state!(previous_attempts: migration.current_attempt)
end end
end end
...@@ -120,13 +137,13 @@ def retry_migration(migration, exception) ...@@ -120,13 +137,13 @@ def retry_migration(migration, exception)
def enqueue_next_batch(migration) def enqueue_next_batch(migration)
return unless migration.batched? && !migration.completed? return unless migration.batched? && !migration.completed?
logger.info(structured_payload(message: "MigrationWorker: migration[#{migration.name}] kicking off next migration batch")) log message: "MigrationWorker: migration[#{migration.name}] kicking off next migration batch"
Elastic::MigrationWorker.perform_in(migration.throttle_delay) Elastic::MigrationWorker.perform_in(migration.throttle_delay)
end end
def check_and_save_completed_status(migration) def check_and_save_completed_status(migration)
migration.completed?.tap do |status| migration.completed?.tap do |status|
logger.info(structured_payload(message: "MigrationWorker: migration[#{migration.name}] updating with completed: #{status}")) log message: "MigrationWorker: migration[#{migration.name}] updating with completed: #{status}"
migration.save!(completed: status) migration.save!(completed: status)
end end
end end
...@@ -139,7 +156,7 @@ def pause_indexing!(migration) ...@@ -139,7 +156,7 @@ def pause_indexing!(migration)
migration.save_state!(pause_indexing: pause_indexing) migration.save_state!(pause_indexing: pause_indexing)
if pause_indexing if pause_indexing
logger.info(structured_payload(message: 'MigrationWorker: Pausing indexing')) log message: 'MigrationWorker: Pausing indexing'
Gitlab::CurrentSettings.update!(elasticsearch_pause_indexing: true) Gitlab::CurrentSettings.update!(elasticsearch_pause_indexing: true)
end end
end end
...@@ -149,7 +166,7 @@ def unpause_indexing!(migration) ...@@ -149,7 +166,7 @@ def unpause_indexing!(migration)
return unless migration.load_state[:pause_indexing] return unless migration.load_state[:pause_indexing]
return if migration.load_state[:halted_indexing_unpaused] return if migration.load_state[:halted_indexing_unpaused]
logger.info(structured_payload(message: 'MigrationWorker: unpausing indexing')) log message: 'MigrationWorker: unpausing indexing'
Gitlab::CurrentSettings.update!(elasticsearch_pause_indexing: false) Gitlab::CurrentSettings.update!(elasticsearch_pause_indexing: false)
migration.save_state!(halted_indexing_unpaused: true) if migration.halted? migration.save_state!(halted_indexing_unpaused: true) if migration.halted?
...@@ -158,7 +175,7 @@ def unpause_indexing!(migration) ...@@ -158,7 +175,7 @@ def unpause_indexing!(migration)
def create_migrations_index_if_not_exists def create_migrations_index_if_not_exists
return if helper.migrations_index_exists? return if helper.migrations_index_exists?
logger.info(structured_payload(message: 'MigrationWorker: creating migrations index')) log message: 'MigrationWorker: creating migrations index'
helper.create_migrations_index helper.create_migrations_index
end end
......
...@@ -47,6 +47,20 @@ ...@@ -47,6 +47,20 @@
end end
end end
context 'cluster is unhealthy' do
before do
stub_ee_application_setting(elasticsearch_indexing: true)
allow(Search::ClusterHealthCheck::Elastic).to receive(:healthy?).and_return(false)
allow(::Gitlab::Elasticsearch::Logger).to receive(:build).and_return(logger)
end
it 'raises an error and does not execute migration' do
expect(subject).not_to receive(:execute_migration)
expect(logger).to receive(:error)
expect(subject.perform).to be_falsey
end
end
context 'reindexing task is in progress' do context 'reindexing task is in progress' do
let!(:task) { create(:elastic_reindexing_task) } let!(:task) { create(:elastic_reindexing_task) }
......
0% 加载中 .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册