diff --git a/config/gitlab.yml.example b/config/gitlab.yml.example index 4be6b2127e1194ede3131693b385dad4f21dbb87..c6b4bcc36d80316a201ff0729d98314c0994e2ff 100644 --- a/config/gitlab.yml.example +++ b/config/gitlab.yml.example @@ -521,6 +521,11 @@ production: &base elastic_index_initial_bulk_cron_worker: cron: "*/1 * * * *" + # Elasticsearch reindexing worker + # NOTE: This will only take effect if elasticsearch is enabled. + elastic_index_initial_bulk_cron_worker: + cron: "*/10 * * * *" + registry: # enabled: true # host: registry.example.com diff --git a/config/initializers/1_settings.rb b/config/initializers/1_settings.rb index 53ca39a6f39d26e83cf6a572a773c222042b72f4..495c0702e704d7889f5004e83a8bf41d368ad0e5 100644 --- a/config/initializers/1_settings.rb +++ b/config/initializers/1_settings.rb @@ -567,6 +567,9 @@ Settings.cron_jobs['elastic_index_initial_bulk_cron_worker'] ||= Settingslogic.new({}) Settings.cron_jobs['elastic_index_initial_bulk_cron_worker']['cron'] ||= '*/1 * * * *' Settings.cron_jobs['elastic_index_initial_bulk_cron_worker']['job_class'] ||= 'ElasticIndexInitialBulkCronWorker' + Settings.cron_jobs['elastic_cluster_reindexing_cron_worker'] ||= Settingslogic.new({}) + Settings.cron_jobs['elastic_cluster_reindexing_cron_worker']['cron'] ||= '*/10 * * * *' + Settings.cron_jobs['elastic_cluster_reindexing_cron_worker']['job_class'] ||= 'ElasticClusterReindexingCronWorker' Settings.cron_jobs['sync_seat_link_worker'] ||= Settingslogic.new({}) Settings.cron_jobs['sync_seat_link_worker']['cron'] ||= "#{rand(60)} 0 * * *" Settings.cron_jobs['sync_seat_link_worker']['job_class'] = 'SyncSeatLinkWorker' diff --git a/db/migrate/20200623141544_create_elastic_reindexing_task.rb b/db/migrate/20200623141544_create_elastic_reindexing_task.rb new file mode 100644 index 0000000000000000000000000000000000000000..7089df4f1ea729598994e826b77cdad58c2158ae --- /dev/null +++ b/db/migrate/20200623141544_create_elastic_reindexing_task.rb @@ -0,0 +1,33 @@ +# frozen_string_literal: true + +class CreateElasticReindexingTask < ActiveRecord::Migration[6.0] + include Gitlab::Database::MigrationHelpers + + DOWNTIME = false + + disable_ddl_transaction! + + def up + create_table :elastic_reindexing_tasks do |t| + t.timestamps_with_timezone null: false + t.integer :documents_count + t.integer :state, null: false, default: 0, limit: 2, index: true + t.boolean :in_progress, null: false, default: true + t.text :index_name_from + t.text :index_name_to + t.text :elastic_task + t.text :error_message + end + + add_text_limit :elastic_reindexing_tasks, :index_name_from, 255 + add_text_limit :elastic_reindexing_tasks, :index_name_to, 255 + add_text_limit :elastic_reindexing_tasks, :elastic_task, 255 + add_text_limit :elastic_reindexing_tasks, :error_message, 255 + + add_index :elastic_reindexing_tasks, :in_progress, unique: true, where: 'in_progress' + end + + def down + drop_table :elastic_reindexing_tasks + end +end diff --git a/db/structure.sql b/db/structure.sql index 83297c9e47e1cd33a152995550ce6112e7e60b7b..93fcdaa4a1cef543e99c0582906c81ef0690197d 100644 --- a/db/structure.sql +++ b/db/structure.sql @@ -11093,6 +11093,32 @@ CREATE SEQUENCE public.draft_notes_id_seq ALTER SEQUENCE public.draft_notes_id_seq OWNED BY public.draft_notes.id; +CREATE TABLE public.elastic_reindexing_tasks ( + id bigint NOT NULL, + created_at timestamp with time zone NOT NULL, + updated_at timestamp with time zone NOT NULL, + documents_count integer, + state smallint DEFAULT 0 NOT NULL, + in_progress boolean DEFAULT true NOT NULL, + index_name_from text, + index_name_to text, + elastic_task text, + error_message text, + CONSTRAINT check_04151aca42 CHECK ((char_length(index_name_from) <= 255)), + CONSTRAINT check_7f64acda8e CHECK ((char_length(error_message) <= 255)), + CONSTRAINT check_85ebff7124 CHECK ((char_length(index_name_to) <= 255)), + CONSTRAINT check_942e5aae53 CHECK ((char_length(elastic_task) <= 255)) +); + +CREATE SEQUENCE public.elastic_reindexing_tasks_id_seq + START WITH 1 + INCREMENT BY 1 + NO MINVALUE + NO MAXVALUE + CACHE 1; + +ALTER SEQUENCE public.elastic_reindexing_tasks_id_seq OWNED BY public.elastic_reindexing_tasks.id; + CREATE TABLE public.elasticsearch_indexed_namespaces ( created_at timestamp with time zone NOT NULL, updated_at timestamp with time zone NOT NULL, @@ -16440,6 +16466,8 @@ ALTER TABLE ONLY public.diff_note_positions ALTER COLUMN id SET DEFAULT nextval( ALTER TABLE ONLY public.draft_notes ALTER COLUMN id SET DEFAULT nextval('public.draft_notes_id_seq'::regclass); +ALTER TABLE ONLY public.elastic_reindexing_tasks ALTER COLUMN id SET DEFAULT nextval('public.elastic_reindexing_tasks_id_seq'::regclass); + ALTER TABLE ONLY public.emails ALTER COLUMN id SET DEFAULT nextval('public.emails_id_seq'::regclass); ALTER TABLE ONLY public.environments ALTER COLUMN id SET DEFAULT nextval('public.environments_id_seq'::regclass); @@ -17413,6 +17441,9 @@ ALTER TABLE ONLY public.diff_note_positions ALTER TABLE ONLY public.draft_notes ADD CONSTRAINT draft_notes_pkey PRIMARY KEY (id); +ALTER TABLE ONLY public.elastic_reindexing_tasks + ADD CONSTRAINT elastic_reindexing_tasks_pkey PRIMARY KEY (id); + ALTER TABLE ONLY public.emails ADD CONSTRAINT emails_pkey PRIMARY KEY (id); @@ -18926,6 +18957,10 @@ CREATE INDEX index_draft_notes_on_discussion_id ON public.draft_notes USING btre CREATE INDEX index_draft_notes_on_merge_request_id ON public.draft_notes USING btree (merge_request_id); +CREATE UNIQUE INDEX index_elastic_reindexing_tasks_on_in_progress ON public.elastic_reindexing_tasks USING btree (in_progress) WHERE in_progress; + +CREATE INDEX index_elastic_reindexing_tasks_on_state ON public.elastic_reindexing_tasks USING btree (state); + CREATE INDEX index_elasticsearch_indexed_namespaces_on_created_at ON public.elasticsearch_indexed_namespaces USING btree (created_at); CREATE UNIQUE INDEX index_elasticsearch_indexed_namespaces_on_namespace_id ON public.elasticsearch_indexed_namespaces USING btree (namespace_id); @@ -23488,6 +23523,7 @@ COPY "schema_migrations" (version) FROM STDIN; 20200623000148 20200623000320 20200623121135 +20200623141544 20200623170000 20200623185440 20200624075411 diff --git a/ee/app/models/elastic/reindexing_task.rb b/ee/app/models/elastic/reindexing_task.rb new file mode 100644 index 0000000000000000000000000000000000000000..7fc57b3a6c9f999801c8899ef3707bbe9d546578 --- /dev/null +++ b/ee/app/models/elastic/reindexing_task.rb @@ -0,0 +1,27 @@ +# frozen_string_literal: true + +class Elastic::ReindexingTask < ApplicationRecord + self.table_name = 'elastic_reindexing_tasks' + + enum state: { + initial: 0, + indexing_paused: 1, + reindexing: 2, + success: 10, # states less than 10 are considered in_progress + failure: 11 + } + + before_save :set_in_progress_flag + + def self.current + where(in_progress: true).last + end + + private + + def set_in_progress_flag + in_progress_states = self.class.states.select { |_, v| v < 10 }.keys + + self.in_progress = in_progress_states.include?(state) + end +end diff --git a/ee/app/services/elastic/cluster_reindexing_service.rb b/ee/app/services/elastic/cluster_reindexing_service.rb new file mode 100644 index 0000000000000000000000000000000000000000..0be4e79116e09313d230112d822174c50bdcff7b --- /dev/null +++ b/ee/app/services/elastic/cluster_reindexing_service.rb @@ -0,0 +1,137 @@ +# frozen_string_literal: true + +module Elastic + class ClusterReindexingService + INITIAL_INDEX_OPTIONS = { # Optimized for writes + refresh_interval: -1, # Disable automatic refreshing + number_of_replicas: 0, + translog: { durability: 'async' } + }.freeze + + def execute + case current_task.state.to_sym + when :initial + initial! + when :indexing_paused + indexing_paused! + when :reindexing + reindexing! + end + end + + def current_task + Elastic::ReindexingTask.current + end + + private + + def default_index_options + { + refresh_interval: nil, # Change it back to the default + number_of_replicas: Gitlab::CurrentSettings.elasticsearch_replicas, + translog: { durability: 'request' } + } + end + + def initial! + # Pause indexing + Gitlab::CurrentSettings.update!(elasticsearch_pause_indexing: true) + + unless elastic_helper.alias_exists? + abort_reindexing!('Your Elasticsearch index must first use aliases before you can use this feature. Please recreate your index from scratch before reindexing.') + return false + end + + expected_free_size = elastic_helper.index_size_bytes * 2 + if elastic_helper.cluster_free_size_bytes < expected_free_size + abort_reindexing!("You should have at least #{expected_free_size} bytes of storage available to perform reindexing. Please increase the storage in your Elasticsearch cluster before reindexing.") + return false + end + + current_task.update!(state: :indexing_paused) + + true + end + + def indexing_paused! + # Create an index with custom settings + index_name = elastic_helper.create_empty_index(with_alias: false, options: { settings: INITIAL_INDEX_OPTIONS }) + + # Record documents count + documents_count = elastic_helper.index_size.dig('docs', 'count') + + # Trigger reindex + task_id = elastic_helper.reindex(to: index_name) + + current_task.update!( + index_name_from: elastic_helper.target_index_name, + index_name_to: index_name, + documents_count: documents_count, + elastic_task: task_id, + state: :reindexing + ) + + true + end + + def reindexing! + task = current_task + + # Check if indexing is completed + task_status = elastic_helper.task_status(task_id: task.elastic_task) + return false unless task_status['completed'] + + # Check if reindexing is failed + reindexing_error = task_status.dig('error', 'type') + if reindexing_error + abort_reindexing!("Task #{task.elastic_task} has failed with Elasticsearch error.", additional_logs: { elasticsearch_error_type: reindexing_error }) + return false + end + + # Refresh a new index + elastic_helper.refresh_index(index_name: task.index_name_to) + + # Compare documents count + old_documents_count = task.documents_count + new_documents_count = elastic_helper.index_size(index_name: task.index_name_to).dig('docs', 'count') + if old_documents_count != new_documents_count + abort_reindexing!("Documents count is different, Count from new index: #{new_documents_count} Count from original index: #{old_documents_count}. This likely means something went wrong during reindexing.") + return false + end + + # Change index settings back + elastic_helper.update_settings(index_name: task.index_name_to, settings: default_index_options) + + # Switch alias to a new index + elastic_helper.switch_alias(to: task.index_name_to) + + # Unpause indexing + Gitlab::CurrentSettings.update!(elasticsearch_pause_indexing: false) + + task.update!(state: :success) + + true + end + + def abort_reindexing!(reason, additional_logs: {}) + error = { message: 'elasticsearch_reindex_error', error: reason, elasticsearch_task_id: current_task.elastic_task, gitlab_task_id: current_task.id, gitlab_task_state: current_task.state } + logger.error(error.merge(additional_logs)) + + current_task.update!( + state: :failure, + error_message: reason + ) + + # Unpause indexing + Gitlab::CurrentSettings.update!(elasticsearch_pause_indexing: false) + end + + def logger + @logger ||= ::Gitlab::Elasticsearch::Logger.build + end + + def elastic_helper + Gitlab::Elastic::Helper.default + end + end +end diff --git a/ee/app/workers/all_queues.yml b/ee/app/workers/all_queues.yml index 91cfd4ecf9cd3f51ae7b87eedf81aa729649b1c8..a1a734dc26ce573e198f2b70a7785546340a6e91 100644 --- a/ee/app/workers/all_queues.yml +++ b/ee/app/workers/all_queues.yml @@ -27,6 +27,14 @@ :weight: 1 :idempotent: :tags: [] +- :name: cronjob:elastic_cluster_reindexing_cron + :feature_category: :global_search + :has_external_dependencies: + :urgency: :throttled + :resource_boundary: :unknown + :weight: 1 + :idempotent: true + :tags: [] - :name: cronjob:elastic_index_bulk_cron :feature_category: :global_search :has_external_dependencies: diff --git a/ee/app/workers/elastic_cluster_reindexing_cron_worker.rb b/ee/app/workers/elastic_cluster_reindexing_cron_worker.rb new file mode 100644 index 0000000000000000000000000000000000000000..527d7bd15023d67b95f7a762b4f4d2e023231ef7 --- /dev/null +++ b/ee/app/workers/elastic_cluster_reindexing_cron_worker.rb @@ -0,0 +1,28 @@ +# frozen_string_literal: true + +class ElasticClusterReindexingCronWorker + include ApplicationWorker + include CronjobQueue # rubocop:disable Scalability/CronWorkerContext + include Gitlab::ExclusiveLeaseHelpers + + sidekiq_options retry: false + + feature_category :global_search + urgency :throttled + idempotent! + + def perform + task = Elastic::ReindexingTask.current + return false unless task + + in_lock(self.class.name.underscore, ttl: 1.hour, retries: 10, sleep_sec: 1) do + service.execute + end + end + + private + + def service + Elastic::ClusterReindexingService.new + end +end diff --git a/ee/changelogs/unreleased/213629-cluster-reindexing-feature.yml b/ee/changelogs/unreleased/213629-cluster-reindexing-feature.yml new file mode 100644 index 0000000000000000000000000000000000000000..e5e60bbf138a8a8eedf0952ee96d00043a71291a --- /dev/null +++ b/ee/changelogs/unreleased/213629-cluster-reindexing-feature.yml @@ -0,0 +1,5 @@ +--- +title: Add cluster reindexing feature to our ES integration +merge_request: 34069 +author: +type: added diff --git a/ee/lib/gitlab/elastic/helper.rb b/ee/lib/gitlab/elastic/helper.rb index 1d0afca049b3ec65f9dd28d1260f0b05339cd007..bb1b637605f0289b8b7093beac8bd14b8557a332 100644 --- a/ee/lib/gitlab/elastic/helper.rb +++ b/ee/lib/gitlab/elastic/helper.rb @@ -29,11 +29,11 @@ def default end def create_empty_index(with_alias: true, options: {}) - if index_exists? - raise "Index under '#{target_name}' already exists, use `recreate_index` to recreate it." - end + new_index_name = options[:index_name] || "#{target_name}-#{Time.now.strftime("%Y%m%d-%H%M")}" - new_index_name = with_alias ? "#{target_name}-#{Time.now.strftime("%Y%m%d-%H%M")}" : target_name + if with_alias ? index_exists? : index_exists?(index_name: new_index_name) + raise "Index under '#{with_alias ? target_name : new_index_name}' already exists, use `recreate_index` to recreate it." + end settings = {} mappings = {} @@ -75,18 +75,19 @@ def create_empty_index(with_alias: true, options: {}) client.indices.create create_index_options client.indices.put_alias(name: target_name, index: new_index_name) if with_alias + new_index_name end - def delete_index - result = client.indices.delete(index: target_index_name) + def delete_index(index_name: nil) + result = client.indices.delete(index: index_name || target_index_name) result['acknowledged'] rescue ::Elasticsearch::Transport::Transport::Errors::NotFound => e Gitlab::ErrorTracking.log_exception(e) false end - def index_exists? - client.indices.exists?(index: target_name) # rubocop:disable CodeReuse/ActiveRecord + def index_exists?(index_name: nil) + client.indices.exists?(index: index_name || target_name) # rubocop:disable CodeReuse/ActiveRecord end def alias_exists? @@ -96,15 +97,58 @@ def alias_exists? # Calls Elasticsearch refresh API to ensure data is searchable # immediately. # https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-refresh.html - def refresh_index - client.indices.refresh(index: target_name) + def refresh_index(index_name: nil) + client.indices.refresh(index: index_name || target_name) + end + + def index_size(index_name: nil) + client.indices.stats['indices'][index_name || target_index_name]['total'] + end + + def index_size_bytes + index_size['store']['size_in_bytes'] end - def index_size - client.indices.stats['indices'][target_index_name]['total'] + def cluster_free_size_bytes + client.cluster.stats['nodes']['fs']['free_in_bytes'] end - private + def reindex(from: target_index_name, to:, wait_for_completion: false) + body = { + source: { + index: from + }, + dest: { + index: to + } + } + + response = client.reindex(body: body, slices: 'auto', wait_for_completion: wait_for_completion) + + response['task'] + end + + def task_status(task_id:) + client.tasks.get(task_id: task_id) + end + + def update_settings(index_name: nil, settings:) + client.indices.put_settings(index: index_name || target_index_name, body: settings) + end + + def switch_alias(from: target_index_name, to:) + actions = [ + { + remove: { index: from, alias: target_name } + }, + { + add: { index: to, alias: target_name } + } + ] + + body = { actions: actions } + client.indices.update_aliases(body: body) + end # This method is used when we need to get an actual index name (if it's used through an alias) def target_index_name diff --git a/ee/lib/tasks/gitlab/elastic.rake b/ee/lib/tasks/gitlab/elastic.rake index 195c3e334474c0aad3373ec52cadb0f03f6d4814..599d8bdcb803db14fbacc354ff9b3f78cd47e551 100644 --- a/ee/lib/tasks/gitlab/elastic.rake +++ b/ee/lib/tasks/gitlab/elastic.rake @@ -76,6 +76,15 @@ namespace :gitlab do Rake::Task["gitlab:elastic:create_empty_index"].invoke(*args) end + desc "GitLab | Elasticsearch | Zero-downtime cluster reindexing" + task reindex_cluster: :environment do + Elastic::ReindexingTask.create! + + ElasticClusterReindexingCronWorker.perform_async + + puts "Reindexing job was successfully scheduled".color(:green) + end + desc "GitLab | Elasticsearch | Clear indexing status" task clear_index_status: :environment do IndexStatus.delete_all diff --git a/ee/spec/factories/elastic/reindexing_tasks.rb b/ee/spec/factories/elastic/reindexing_tasks.rb new file mode 100644 index 0000000000000000000000000000000000000000..f82920cb5777101bd4a8ae30a95b588d7745cb9e --- /dev/null +++ b/ee/spec/factories/elastic/reindexing_tasks.rb @@ -0,0 +1,10 @@ +# frozen_string_literal: true + +FactoryBot.define do + factory :elastic_reindexing_task, class: 'Elastic::ReindexingTask' do + state { :initial } + in_progress { true } + index_name_from { 'old_index_name' } + index_name_to { 'new_index_name' } + end +end diff --git a/ee/spec/lib/ee/gitlab/elastic/helper_spec.rb b/ee/spec/lib/ee/gitlab/elastic/helper_spec.rb index a1b961c04a2e39651da714c27d8d3a45e3493d37..712de611fcd5c41661dbf95050b60b0bcdd9a7bb 100644 --- a/ee/spec/lib/ee/gitlab/elastic/helper_spec.rb +++ b/ee/spec/lib/ee/gitlab/elastic/helper_spec.rb @@ -7,18 +7,18 @@ shared_context 'with a legacy index' do before do - helper.create_empty_index(with_alias: false) + @index_name = helper.create_empty_index(with_alias: false, options: { index_name: helper.target_name }) end end shared_context 'with an existing index and alias' do before do - helper.create_empty_index(with_alias: true) + @index_name = helper.create_empty_index(with_alias: true) end end after do - helper.delete_index + helper.delete_index(index_name: @index_name) end describe '.new' do @@ -41,18 +41,31 @@ describe '#create_empty_index' do context 'with an empty cluster' do - it 'creates index and alias' do - helper.create_empty_index + context 'with alias and index' do + include_context 'with an existing index and alias' - expect(helper.index_exists?).to eq(true) - expect(helper.alias_exists?).to eq(true) + it 'creates index and alias' do + expect(helper.index_exists?).to eq(true) + expect(helper.alias_exists?).to eq(true) + end end - it 'creates the index only' do - helper.create_empty_index(with_alias: false) + context 'when there is a legacy index' do + include_context 'with a legacy index' - expect(helper.index_exists?).to eq(true) - expect(helper.alias_exists?).to eq(false) + it 'creates the index only' do + expect(helper.index_exists?).to eq(true) + expect(helper.alias_exists?).to eq(false) + end + end + + it 'creates an index with a custom name' do + @index_name = 'test-custom-index-name' + + helper.create_empty_index(with_alias: false, options: { index_name: @index_name }) + + expect(helper.index_exists?(index_name: @index_name)).to eq(true) + expect(helper.index_exists?).to eq(false) end end @@ -134,4 +147,25 @@ it { is_expected.to be_truthy } end end + + describe '#cluster_free_size' do + it 'returns valid cluster size' do + expect(helper.cluster_free_size_bytes).to be_positive + end + end + + describe '#switch_alias' do + include_context 'with an existing index and alias' + + let(:new_index_name) { 'test-switch-alias' } + + it 'switches the alias' do + helper.create_empty_index(with_alias: false, options: { index_name: new_index_name }) + + expect { helper.switch_alias(to: new_index_name) } + .to change { helper.target_index_name }.to(new_index_name) + + helper.delete_index(index_name: new_index_name) + end + end end diff --git a/ee/spec/models/elastic/reindexing_task_spec.rb b/ee/spec/models/elastic/reindexing_task_spec.rb new file mode 100644 index 0000000000000000000000000000000000000000..cc4109c33b3746729d3b0092f7eabef8897e2def --- /dev/null +++ b/ee/spec/models/elastic/reindexing_task_spec.rb @@ -0,0 +1,19 @@ +# frozen_string_literal: true + +require 'spec_helper' + +RSpec.describe Elastic::ReindexingTask, type: :model do + it 'only allows one running task at a time' do + expect { create(:elastic_reindexing_task, state: :success) }.not_to raise_error + expect { create(:elastic_reindexing_task) }.not_to raise_error + expect { create(:elastic_reindexing_task) }.to raise_error(/violates unique constraint/) + end + + it 'sets in_progress flag' do + task = create(:elastic_reindexing_task, state: :success) + expect(task.in_progress).to eq(false) + + task.update!(state: :reindexing) + expect(task.in_progress).to eq(true) + end +end diff --git a/ee/spec/services/elastic/cluster_reindexing_service_spec.rb b/ee/spec/services/elastic/cluster_reindexing_service_spec.rb new file mode 100644 index 0000000000000000000000000000000000000000..4b6b497798d5093a21cb1fb40af67618eed1d570 --- /dev/null +++ b/ee/spec/services/elastic/cluster_reindexing_service_spec.rb @@ -0,0 +1,81 @@ +# frozen_string_literal: true + +require 'spec_helper' + +RSpec.describe Elastic::ClusterReindexingService, :elastic do + subject { described_class.new } + + context 'state: initial' do + let(:task) { create(:elastic_reindexing_task, state: :initial) } + + it 'errors when there is not enough space' do + allow(Gitlab::Elastic::Helper.default).to receive(:index_size_bytes).and_return(100.megabytes) + allow(Gitlab::Elastic::Helper.default).to receive(:cluster_free_size_bytes).and_return(30.megabytes) + + expect { subject.execute }.to change { task.reload.state }.from('initial').to('failure') + expect(task.reload.error_message).to match(/storage available/) + end + + it 'pauses elasticsearch indexing' do + expect(Gitlab::CurrentSettings.elasticsearch_pause_indexing).to eq(false) + + expect { subject.execute }.to change { task.reload.state }.from('initial').to('indexing_paused') + + expect(Gitlab::CurrentSettings.elasticsearch_pause_indexing).to eq(true) + end + end + + context 'state: indexing_paused' do + it 'triggers reindexing' do + task = create(:elastic_reindexing_task, state: :indexing_paused) + + allow(Gitlab::Elastic::Helper.default).to receive(:create_empty_index).and_return('new_index_name') + allow(Gitlab::Elastic::Helper.default).to receive(:reindex).and_return('task_id') + + expect { subject.execute }.to change { task.reload.state }.from('indexing_paused').to('reindexing') + + task = task.reload + expect(task.index_name_to).to eq('new_index_name') + expect(task.elastic_task).to eq('task_id') + end + end + + context 'state: reindexing' do + let(:task) { create(:elastic_reindexing_task, state: :reindexing, documents_count: 10) } + let(:expected_default_settings) do + { + refresh_interval: nil, + number_of_replicas: Gitlab::CurrentSettings.elasticsearch_replicas, + translog: { durability: 'request' } + } + end + + before do + allow(Gitlab::Elastic::Helper.default).to receive(:task_status).and_return({ 'completed' => true }) + allow(Gitlab::Elastic::Helper.default).to receive(:refresh_index).and_return(true) + end + + it 'errors if documents count is different' do + expect(Gitlab::Elastic::Helper.default).to receive(:index_size).and_return('docs' => { 'count' => task.reload.documents_count * 2 }) + + expect { subject.execute }.to change { task.reload.state }.from('reindexing').to('failure') + expect(task.reload.error_message).to match(/count is different/) + end + + it 'errors if reindexing is failed' do + allow(Gitlab::Elastic::Helper.default).to receive(:task_status).and_return({ 'completed' => true, 'error' => { 'type' => 'search_phase_execution_exception' } }) + + expect { subject.execute }.to change { task.reload.state }.from('reindexing').to('failure') + expect(task.reload.error_message).to match(/has failed with/) + end + + it 'launches all state steps' do + expect(Gitlab::Elastic::Helper.default).to receive(:index_size).and_return('docs' => { 'count' => task.reload.documents_count }) + expect(Gitlab::Elastic::Helper.default).to receive(:update_settings).with(index_name: task.index_name_to, settings: expected_default_settings) + expect(Gitlab::Elastic::Helper.default).to receive(:switch_alias).with(to: task.index_name_to) + expect(Gitlab::CurrentSettings).to receive(:update!).with(elasticsearch_pause_indexing: false) + + expect { subject.execute }.to change { task.reload.state }.from('reindexing').to('success') + end + end +end diff --git a/ee/spec/workers/elastic_cluster_reindexing_cron_worker_spec.rb b/ee/spec/workers/elastic_cluster_reindexing_cron_worker_spec.rb new file mode 100644 index 0000000000000000000000000000000000000000..eb22a00ceaa01b2ae9ef56366e8f23036f562aa2 --- /dev/null +++ b/ee/spec/workers/elastic_cluster_reindexing_cron_worker_spec.rb @@ -0,0 +1,25 @@ +# frozen_string_literal: true + +require 'spec_helper' + +RSpec.describe ElasticClusterReindexingCronWorker do + subject { described_class.new } + + describe '#perform' do + it 'calls execute method' do + expect(Elastic::ReindexingTask).to receive(:current).and_return(build(:elastic_reindexing_task)) + + expect_next_instance_of(Elastic::ClusterReindexingService) do |service| + expect(service).to receive(:execute).and_return(false) + end + + subject.perform + end + + it 'does nothing if no task is found' do + expect(Elastic::ReindexingTask).to receive(:current).and_return(nil) + + expect(subject.perform).to eq(false) + end + end +end