diff --git a/changelogs/unreleased/34086-es-bulk-incremental-index-updates.yml b/changelogs/unreleased/34086-es-bulk-incremental-index-updates.yml new file mode 100644 index 0000000000000000000000000000000000000000..67cceb21af0c129e71f1d4e580f20864aee09554 --- /dev/null +++ b/changelogs/unreleased/34086-es-bulk-incremental-index-updates.yml @@ -0,0 +1,5 @@ +--- +title: 'Add a bulk processor for elasticsearch incremental updates' +merge_request: 24298 +author: +type: added diff --git a/config/gitlab.yml.example b/config/gitlab.yml.example index 20c75a6e255f0ce5622d6a1cde06bb2562803187..330e5109ed416017ab82d003adc49518917fffcb 100644 --- a/config/gitlab.yml.example +++ b/config/gitlab.yml.example @@ -454,6 +454,11 @@ production: &base pseudonymizer_worker: cron: "0 * * * *" + # Elasticsearch bulk updater for incremental updates. + # NOTE: This will only take effect if elasticsearch is enabled. + elastic_index_bulk_cron_worker: + cron: "*/1 * * * *" + registry: # enabled: true # host: registry.example.com diff --git a/config/initializers/1_settings.rb b/config/initializers/1_settings.rb index 5f07b486aea0865e40f0f820086436dd22837ce2..e95d3b74730e746d83b038402a33fd70f9ab3028 100644 --- a/config/initializers/1_settings.rb +++ b/config/initializers/1_settings.rb @@ -531,6 +531,9 @@ Settings.cron_jobs['update_max_seats_used_for_gitlab_com_subscriptions_worker'] ||= Settingslogic.new({}) Settings.cron_jobs['update_max_seats_used_for_gitlab_com_subscriptions_worker']['cron'] ||= '0 12 * * *' Settings.cron_jobs['update_max_seats_used_for_gitlab_com_subscriptions_worker']['job_class'] = 'UpdateMaxSeatsUsedForGitlabComSubscriptionsWorker' + Settings.cron_jobs['elastic_index_bulk_cron_worker'] ||= Settingslogic.new({}) + Settings.cron_jobs['elastic_index_bulk_cron_worker']['cron'] ||= '*/1 * * * *' + Settings.cron_jobs['elastic_index_bulk_cron_worker']['job_class'] ||= 'ElasticIndexBulkCronWorker' end # diff --git a/doc/development/elasticsearch.md b/doc/development/elasticsearch.md index b8d2a873d8b9ce695d3be0646867a8c086c6b7ca..69113fe80308cf6b5c4db162e207037c8dfcd276 100644 --- a/doc/development/elasticsearch.md +++ b/doc/development/elasticsearch.md @@ -36,7 +36,11 @@ Additionally, if you need large repos or multiple forks for testing, please cons The Elasticsearch integration depends on an external indexer. We ship an [indexer written in Go](https://gitlab.com/gitlab-org/gitlab-elasticsearch-indexer). The user must trigger the initial indexing via a rake task but, after this is done, GitLab itself will trigger reindexing when required via `after_` callbacks on create, update, and destroy that are inherited from [/ee/app/models/concerns/elastic/application_versioned_search.rb](https://gitlab.com/gitlab-org/gitlab/blob/master/ee/app/models/concerns/elastic/application_versioned_search.rb). -All indexing after the initial one is done via `ElasticIndexerWorker` (Sidekiq jobs). +After initial indexing is complete, updates proceed in one of two ways, depending on the `:elastic_bulk_incremental_updates` feature flag. + +If disabled, every create, update, or delete operation on an Elasticsearch-tracked model enqueues a new `ElasticIndexerWorker` Sidekiq job which takes care of updating just that document. This is quite inefficient. + +If the feature flag is enabled, create, update, and delete operations for all models except projects (see [#207494](https://gitlab.com/gitlab-org/gitlab/issues/207494)) are tracked in a Redis [`ZSET`](https://redis.io/topics/data-types#sorted-sets) instead. A regular `sidekiq-cron` `ElasticIndexBulkCronWorker` processes this queue, updating many Elasticsearch documents at a time with the [Bulk Request API](https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html). Search queries are generated by the concerns found in [ee/app/models/concerns/elastic](https://gitlab.com/gitlab-org/gitlab/tree/master/ee/app/models/concerns/elastic). These concerns are also in charge of access control, and have been a historic source of security bugs so please pay close attention to them! diff --git a/ee/app/models/concerns/elastic/application_versioned_search.rb b/ee/app/models/concerns/elastic/application_versioned_search.rb index 06afe217c4ec55ca61d52273513bce65037241ec..a0c1e5e663d6ad8414bd63ed9a6079227c894daf 100644 --- a/ee/app/models/concerns/elastic/application_versioned_search.rb +++ b/ee/app/models/concerns/elastic/application_versioned_search.rb @@ -45,10 +45,14 @@ class << self end def maintain_elasticsearch_create + return if maintain_elasticsearch_incremental_bulk + ElasticIndexerWorker.perform_async(:index, self.class.to_s, self.id, self.es_id) end def maintain_elasticsearch_update + return if maintain_elasticsearch_incremental_bulk + ElasticIndexerWorker.perform_async( :update, self.class.to_s, @@ -58,11 +62,21 @@ def maintain_elasticsearch_update end def maintain_elasticsearch_destroy + return if maintain_elasticsearch_incremental_bulk + ElasticIndexerWorker.perform_async( :delete, self.class.to_s, self.id, self.es_id, es_parent: self.es_parent ) end + def maintain_elasticsearch_incremental_bulk + return false unless Feature.enabled?(:elastic_bulk_incremental_updates, self.project) + + ::Elastic::ProcessBookkeepingService.track!(self) + + true + end + class_methods do def __elasticsearch__ @__elasticsearch__ ||= ::Elastic::MultiVersionClassProxy.new(self) diff --git a/ee/app/models/concerns/elastic/projects_search.rb b/ee/app/models/concerns/elastic/projects_search.rb index 4a8debb39ea1ca81daa2486e28ed9bbb3639459a..4dcedac4ff1b8918d217c25a19e01ca7394bf8c5 100644 --- a/ee/app/models/concerns/elastic/projects_search.rb +++ b/ee/app/models/concerns/elastic/projects_search.rb @@ -19,6 +19,14 @@ def use_elasticsearch? ::Gitlab::CurrentSettings.elasticsearch_indexes_project?(self) end + def maintain_elasticsearch_incremental_bulk + # TODO: ElasticIndexerWorker does extra work for project hooks, so we + # can't use the incremental-bulk indexer for projects yet. + # + # https://gitlab.com/gitlab-org/gitlab/issues/207494 + false + end + def each_indexed_association INDEXED_ASSOCIATIONS.each do |association_name| association = self.association(association_name) diff --git a/ee/app/services/elastic/process_bookkeeping_service.rb b/ee/app/services/elastic/process_bookkeeping_service.rb new file mode 100644 index 0000000000000000000000000000000000000000..5c0f0d54c9f10e224de194d9a5e347ed8fb15b80 --- /dev/null +++ b/ee/app/services/elastic/process_bookkeeping_service.rb @@ -0,0 +1,109 @@ +# frozen_string_literal: true + +module Elastic + class ProcessBookkeepingService + REDIS_SET_KEY = 'elastic:incremental:updates:0:zset' + REDIS_SCORE_KEY = 'elastic:incremental:updates:0:score' + LIMIT = 1000 + + class << self + # Add some records to the processing queue. Items must be serializable to + # a Gitlab::Elastic::DocumentReference + def track!(*items) + return true if items.empty? + + items.map! { |item| ::Gitlab::Elastic::DocumentReference.serialize(item) } + + with_redis do |redis| + # Efficiently generate a guaranteed-unique score for each item + max = redis.incrby(REDIS_SCORE_KEY, items.size) + min = (max - items.size) + 1 + + (min..max).zip(items).each_slice(1000) do |group| + logger.debug(message: 'track_items', count: group.count, items: group) + + redis.zadd(REDIS_SET_KEY, group) + end + end + + true + end + + def queue_size + with_redis { |redis| redis.zcard(REDIS_SET_KEY) } + end + + def clear_tracking! + with_redis { |redis| redis.del(REDIS_SET_KEY, REDIS_SCORE_KEY) } + end + + def logger + # build already caches the logger via request store + ::Gitlab::Elasticsearch::Logger.build + end + + def with_redis(&blk) + Gitlab::Redis::SharedState.with(&blk) # rubocop:disable CodeReuse/ActiveRecord + end + end + + def execute + self.class.with_redis { |redis| execute_with_redis(redis) } + end + + private + + def execute_with_redis(redis) + specs = redis.zrangebyscore(REDIS_SET_KEY, '-inf', '+inf', limit: [0, LIMIT], with_scores: true) + return if specs.empty? + + first_score = specs.first.last + last_score = specs.last.last + + logger.info( + message: 'bulk_indexing_start', + records_count: specs.count, + first_score: first_score, + last_score: last_score + ) + + specs.each { |spec, _| submit_document(spec) } + failures = bulk_indexer.flush + + # Re-enqueue any failures so they are retried + self.class.track!(*failures) if failures.present? + + # Remove all the successes + redis.zremrangebyscore(REDIS_SET_KEY, first_score, last_score) + + logger.info( + message: 'bulk_indexing_end', + records_count: specs.count, + failures_count: failures.count, + first_score: first_score, + last_score: last_score + ) + end + + def submit_document(spec) + ref = ::Gitlab::Elastic::DocumentReference.deserialize(spec) + + bulk_indexer.process(ref) + rescue ::Gitlab::Elastic::DocumentReference::InvalidError => err + logger.warn( + message: 'submit_document_failed', + reference: spec, + error_class: err.class.to_s, + error_message: err.message + ) + end + + def bulk_indexer + @bulk_indexer ||= ::Gitlab::Elastic::BulkIndexer.new(logger: logger) + end + + def logger + self.class.logger + end + end +end diff --git a/ee/app/workers/all_queues.yml b/ee/app/workers/all_queues.yml index 7cfb7233d09729aee441ea82ddb04c1d49b0b310..9ef7d4d1f92b71162ae0910a4272ac8dfab24f6f 100644 --- a/ee/app/workers/all_queues.yml +++ b/ee/app/workers/all_queues.yml @@ -24,6 +24,13 @@ :resource_boundary: :unknown :weight: 1 :idempotent: +- :name: cronjob:elastic_index_bulk_cron + :feature_category: :search + :has_external_dependencies: + :latency_sensitive: + :resource_boundary: :unknown + :weight: 1 + :idempotent: true - :name: cronjob:geo_container_repository_sync_dispatch :feature_category: :geo_replication :has_external_dependencies: diff --git a/ee/app/workers/elastic_index_bulk_cron_worker.rb b/ee/app/workers/elastic_index_bulk_cron_worker.rb new file mode 100644 index 0000000000000000000000000000000000000000..1a95218e738f2e056e4f3847a9eca3a0ddf25a47 --- /dev/null +++ b/ee/app/workers/elastic_index_bulk_cron_worker.rb @@ -0,0 +1,21 @@ +# frozen_string_literal: true + +class ElasticIndexBulkCronWorker + include ApplicationWorker + include Gitlab::ExclusiveLeaseHelpers + + # There is no onward scheduling and this cron handles work from across the + # application, so there's no useful context to add. + include CronjobQueue # rubocop:disable Scalability/CronWorkerContext + + feature_category :search + idempotent! + + def perform + in_lock(self.class.name.underscore, ttl: 10.minutes, retries: 10, sleep_sec: 1) do + Elastic::ProcessBookkeepingService.new.execute + end + rescue Gitlab::ExclusiveLeaseHelpers::FailedToObtainLockError + # We're scheduled on a cronjob, so nothing to do here + end +end diff --git a/ee/changelogs/unreleased/34086-es-bulk-incremental-index-updates.yml b/ee/changelogs/unreleased/34086-es-bulk-incremental-index-updates.yml new file mode 100644 index 0000000000000000000000000000000000000000..455bd01632a9ca5ea5e1a9e1054a4295dee32541 --- /dev/null +++ b/ee/changelogs/unreleased/34086-es-bulk-incremental-index-updates.yml @@ -0,0 +1,5 @@ +--- +title: Add a bulk processor for ES incremental updates +merge_request: +author: +type: added diff --git a/ee/lib/gitlab/elastic/bulk_indexer.rb b/ee/lib/gitlab/elastic/bulk_indexer.rb new file mode 100644 index 0000000000000000000000000000000000000000..2e3012f7204b780a801a31f5735f20eae880a8ce --- /dev/null +++ b/ee/lib/gitlab/elastic/bulk_indexer.rb @@ -0,0 +1,148 @@ +# frozen_string_literal: true + +module Gitlab + module Elastic + # Accumulate records and submit to elasticsearch in bulk, respecting limits + # on request size. + # + # Call +process+ to accumulate records in memory, submitting bulk requests + # when the bulk limits are reached. + # + # Once finished, call +flush+. Any errors accumulated earlier will be + # reported by this call. + # + # BulkIndexer is not safe for concurrent use. + class BulkIndexer + include ::Elasticsearch::Model::Client::ClassMethods + + attr_reader :logger, :failures + + def initialize(logger:) + @body = [] + @body_size_bytes = 0 + @failures = [] + @logger = logger + @ref_cache = [] + end + + # Adds or removes a document in elasticsearch, depending on whether the + # database record it refers to can be found + def process(ref) + ref_cache << ref + + if ref.database_record + index(ref) + else + delete(ref) + end + end + + def flush + maybe_send_bulk(force: true).failures + end + + private + + def reset! + @body = [] + @body_size_bytes = 0 + @ref_cache = [] + end + + attr_reader :body, :body_size_bytes, :ref_cache + + def index(ref) + proxy = ref.database_record.__elasticsearch__ + op = build_op(ref, proxy) + + submit({ index: op }, proxy.as_indexed_json) + + maybe_send_bulk + end + + def delete(ref) + proxy = ref.klass.__elasticsearch__ + op = build_op(ref, proxy) + + submit(delete: op) + + maybe_send_bulk + end + + def build_op(ref, proxy) + op = { + _index: proxy.index_name, + _type: proxy.document_type, + _id: ref.es_id + } + + op[:_routing] = ref.es_parent if ref.es_parent # blank for projects + + op + end + + def bulk_limit_bytes + Gitlab::CurrentSettings.elasticsearch_max_bulk_size_mb.megabytes + end + + def submit(*hashes) + hashes.each do |hash| + text = hash.to_json + + body.push(text) + @body_size_bytes += text.bytesize + 2 # Account for newlines + end + end + + def maybe_send_bulk(force: false) + return self if body.empty? + return self if body_size_bytes < bulk_limit_bytes && !force + + failed_refs = try_send_bulk + + logger.info( + message: 'bulk_submitted', + body_size_bytes: body_size_bytes, + bulk_count: ref_cache.count, + errors_count: failed_refs.count + ) + + failures.push(*failed_refs) + + reset! + + self + end + + def try_send_bulk + process_errors(client.bulk(body: body)) + rescue => err + # If an exception is raised, treat the entire bulk as failed + logger.error(message: 'bulk_exception', error_class: err.class.to_s, error_message: err.message) + + ref_cache + end + + def process_errors(result) + return [] unless result['errors'] + + out = [] + + # Items in the response have the same order as items in the request. + # + # Example succces: {"index": {"result": "created", "status": 201}} + # Example failure: {"index": {"error": {...}, "status": 400}} + result['items'].each_with_index do |item, i| + op = item['index'] || item['delete'] + + if op.nil? || op['error'] + logger.warn(message: 'bulk_error', item: item) + out << ref_cache[i] + end + end + + out + end + end + end +end diff --git a/ee/lib/gitlab/elastic/document_reference.rb b/ee/lib/gitlab/elastic/document_reference.rb new file mode 100644 index 0000000000000000000000000000000000000000..7a586f296638868705b95a2fdea617118df88fbb --- /dev/null +++ b/ee/lib/gitlab/elastic/document_reference.rb @@ -0,0 +1,95 @@ +# frozen_string_literal: true + +module Gitlab + module Elastic + # Tracks some essential information needed to tie database and elasticsearch + # records together, and to delete ES documents when the database object no + # longer exists. + # + # A custom serialisation format suitable for Redis is included. + class DocumentReference + include Gitlab::Utils::StrongMemoize + + InvalidError = Class.new(StandardError) + + class << self + def build(instance) + new(instance.class, instance.id, instance.es_id, instance.es_parent) + end + + def serialize(anything) + case anything + when String + anything + when Gitlab::Elastic::DocumentReference + anything.serialize + when ApplicationRecord + serialize_record(anything) + when Array + serialize_array(anything) + else + raise InvalidError.new("Don't know how to serialize #{anything.class}") + end + end + + def serialize_record(record) + serialize_array([record.class.to_s, record.id, record.es_id, record.es_parent].compact) + end + + def serialize_array(array) + test_array!(array) + + array.join(' ') + end + + def deserialize(string) + deserialize_array(string.split(' ')) + end + + def deserialize_array(array) + test_array!(array) + + new(*array) + end + + private + + def test_array!(array) + raise InvalidError.new("Bad array representation: #{array.inspect}") unless + (3..4).cover?(array.size) + end + end + + attr_reader :klass, :db_id, :es_id + + # This attribute is nil for some records, e.g., projects + attr_reader :es_parent + + def initialize(klass_or_name, db_id, es_id, es_parent = nil) + @klass = klass_or_name + @klass = klass_or_name.constantize if @klass.is_a?(String) + @db_id = db_id + @es_id = es_id + @es_parent = es_parent + end + + def ==(other) + other.instance_of?(self.class) && + self.serialize == other.serialize + end + + def klass_name + klass.to_s + end + + # TODO: return a promise for batch loading: https://gitlab.com/gitlab-org/gitlab/issues/207280 + def database_record + strong_memoize(:database_record) { klass.find_by_id(db_id) } + end + + def serialize + self.class.serialize_array([klass_name, db_id, es_id, es_parent].compact) + end + end + end +end diff --git a/ee/spec/features/search/elastic/project_search_spec.rb b/ee/spec/features/search/elastic/project_search_spec.rb index 332ef010b258815b577d9cfcc8c77c53f6b6da70..c7371b93541a0f5518b60203f079d6aa71f36e2c 100644 --- a/ee/spec/features/search/elastic/project_search_spec.rb +++ b/ee/spec/features/search/elastic/project_search_spec.rb @@ -16,8 +16,9 @@ end describe 'searching' do - it 'finds issues', :sidekiq_might_not_need_inline do + it 'finds issues', :sidekiq_inline do create(:issue, project: project, title: 'Test searching for an issue') + ensure_elasticsearch_index! submit_search('Test') select_search_scope('Issues') @@ -25,8 +26,9 @@ expect(page).to have_selector('.results', text: 'Test searching for an issue') end - it 'finds merge requests', :sidekiq_might_not_need_inline do + it 'finds merge requests', :sidekiq_inline do create(:merge_request, source_project: project, target_project: project, title: 'Test searching for an MR') + ensure_elasticsearch_index! submit_search('Test') select_search_scope('Merge requests') @@ -34,8 +36,9 @@ expect(page).to have_selector('.results', text: 'Test searching for an MR') end - it 'finds milestones', :sidekiq_might_not_need_inline do + it 'finds milestones', :sidekiq_inline do create(:milestone, project: project, title: 'Test searching for a milestone') + ensure_elasticsearch_index! submit_search('Test') select_search_scope('Milestones') @@ -43,7 +46,7 @@ expect(page).to have_selector('.results', text: 'Test searching for a milestone') end - it 'finds wiki pages', :sidekiq_might_not_need_inline do + it 'finds wiki pages', :sidekiq_inline do project.wiki.create_page('test.md', 'Test searching for a wiki page') project.wiki.index_wiki_blobs @@ -53,8 +56,9 @@ expect(page).to have_selector('.results', text: 'Test searching for a wiki page') end - it 'finds notes', :sidekiq_might_not_need_inline do + it 'finds notes', :sidekiq_inline do create(:note, project: project, note: 'Test searching for a comment') + ensure_elasticsearch_index! submit_search('Test') select_search_scope('Comments') @@ -62,7 +66,7 @@ expect(page).to have_selector('.results', text: 'Test searching for a comment') end - it 'finds commits', :sidekiq_might_not_need_inline do + it 'finds commits', :sidekiq_inline do project.repository.index_commits_and_blobs submit_search('initial') @@ -71,7 +75,7 @@ expect(page).to have_selector('.results', text: 'Initial commit') end - it 'finds blobs', :sidekiq_might_not_need_inline do + it 'finds blobs', :sidekiq_inline do project.repository.index_commits_and_blobs submit_search('def') diff --git a/ee/spec/lib/gitlab/elastic/bulk_indexer_spec.rb b/ee/spec/lib/gitlab/elastic/bulk_indexer_spec.rb new file mode 100644 index 0000000000000000000000000000000000000000..0e3c0b7443b90e3ed08e428a0c15323dd0d7ed9c --- /dev/null +++ b/ee/spec/lib/gitlab/elastic/bulk_indexer_spec.rb @@ -0,0 +1,162 @@ +# frozen_string_literal: true + +require 'spec_helper' + +describe Gitlab::Elastic::BulkIndexer, :elastic do + let_it_be(:issue) { create(:issue) } + let_it_be(:other_issue) { create(:issue, project: issue.project) } + + let(:project) { issue.project } + + let(:logger) { ::Gitlab::Elasticsearch::Logger.build } + + subject(:indexer) { described_class.new(logger: logger) } + + let(:es_client) { indexer.client } + + let(:issue_as_ref) { ref(issue) } + let(:issue_as_json_with_times) { issue.__elasticsearch__.as_indexed_json } + let(:issue_as_json) { issue_as_json_with_times.except('created_at', 'updated_at') } + + let(:other_issue_as_ref) { ref(other_issue) } + + describe '#process' do + it 'returns self' do + expect(indexer.process(issue_as_ref)).to be(indexer) + end + + it 'does not send a bulk request per call' do + expect(es_client).not_to receive(:bulk) + + indexer.process(issue_as_ref) + end + + it 'sends a bulk request if the max bulk request size is reached' do + set_bulk_limit(indexer, 1) + + expect(es_client) + .to receive(:bulk) + .with(body: [kind_of(String), kind_of(String)]) + .and_return({}) + + indexer.process(issue_as_ref) + + expect(indexer.failures).to be_empty + end + end + + describe '#flush' do + it 'completes a bulk' do + indexer.process(issue_as_ref) + + expect(es_client) + .to receive(:bulk) + .with(body: [kind_of(String), kind_of(String)]) + .and_return({}) + + expect(indexer.flush).to be_empty + end + + it 'fails documents that elasticsearch refuses to accept' do + # Indexes with uppercase characters are invalid + expect(other_issue_as_ref.database_record.__elasticsearch__) + .to receive(:index_name) + .and_return('Invalid') + + indexer.process(issue_as_ref) + indexer.process(other_issue_as_ref) + + expect(indexer.flush).to contain_exactly(other_issue_as_ref) + expect(indexer.failures).to contain_exactly(other_issue_as_ref) + + refresh_index! + + expect(search_one(Issue)).to have_attributes(issue_as_json) + end + + it 'fails all documents on exception' do + expect(es_client).to receive(:bulk) { raise 'An exception' } + + indexer.process(issue_as_ref) + indexer.process(other_issue_as_ref) + + expect(indexer.flush).to contain_exactly(issue_as_ref, other_issue_as_ref) + expect(indexer.failures).to contain_exactly(issue_as_ref, other_issue_as_ref) + end + + context 'indexing an issue' do + it 'adds the issue to the index' do + expect(indexer.process(issue_as_ref).flush).to be_empty + + refresh_index! + + expect(search_one(Issue)).to have_attributes(issue_as_json) + end + + it 'reindexes an unchanged issue' do + ensure_elasticsearch_index! + + expect(es_client).to receive(:bulk).and_call_original + expect(indexer.process(issue_as_ref).flush).to be_empty + end + + it 'reindexes a changed issue' do + ensure_elasticsearch_index! + issue.update!(title: 'new title') + + expect(issue_as_json['title']).to eq('new title') + expect(indexer.process(issue_as_ref).flush).to be_empty + + refresh_index! + + expect(search_one(Issue)).to have_attributes(issue_as_json) + end + end + + context 'deleting an issue' do + it 'removes the issue from the index' do + ensure_elasticsearch_index! + + expect(issue_as_ref).to receive(:database_record).and_return(nil) + expect(indexer.process(issue_as_ref).flush).to be_empty + + refresh_index! + + expect(search(Issue, '*').size).to eq(0) + end + + it 'succeeds even if the issue is not present' do + expect(issue_as_ref).to receive(:database_record).and_return(nil) + expect(indexer.process(issue_as_ref).flush).to be_empty + + refresh_index! + + expect(search(Issue, '*').size).to eq(0) + end + end + end + + def ref(record) + Gitlab::Elastic::DocumentReference.build(record) + end + + def stub_es_client(indexer, client) + allow(indexer).to receive(:client) { client } + end + + def set_bulk_limit(indexer, bytes) + allow(indexer).to receive(:bulk_limit_bytes) { bytes } + end + + def search(klass, text) + klass.__elasticsearch__.search(text) + end + + def search_one(klass) + results = search(klass, '*') + + expect(results.size).to eq(1) + + results.first._source + end +end diff --git a/ee/spec/lib/gitlab/elastic/document_reference_spec.rb b/ee/spec/lib/gitlab/elastic/document_reference_spec.rb new file mode 100644 index 0000000000000000000000000000000000000000..cc1b6c3c95e2797eefe7015928da97fe6cd4eacc --- /dev/null +++ b/ee/spec/lib/gitlab/elastic/document_reference_spec.rb @@ -0,0 +1,170 @@ +# frozen_string_literal: true + +require 'spec_helper' + +describe Gitlab::Elastic::DocumentReference do + let_it_be(:issue) { create(:issue) } + let(:project) { issue.project } + + let(:issue_as_array) { [Issue, issue.id, issue.es_id, issue.es_parent] } + let(:issue_as_ref) { described_class.new(*issue_as_array) } + let(:issue_as_str) { issue_as_array.join(' ') } + + let(:project_as_array) { [Project, project.id, project.es_id] } + let(:project_as_ref) { described_class.new(*project_as_array) } + let(:project_as_str) { project_as_array.join(' ') } + + describe '.build' do + it 'builds a document for an issue' do + expect(described_class.build(issue)).to eq(issue_as_ref) + end + + it 'builds a document for a project' do + expect(described_class.build(project)).to eq(project_as_ref) + end + end + + describe '.serialize' do + it 'does nothing to a string' do + expect(described_class.serialize('foo')).to eq('foo') + end + + it 'serializes a DocumentReference' do + expect(described_class.serialize(issue_as_ref)).to eq(issue_as_str) + end + + it 'defers to serialize_record for ApplicationRecord instances' do + expect(described_class).to receive(:serialize_record).with(issue) + + described_class.serialize(issue) + end + + it 'defers to serialize_array for Array instances' do + expect(described_class).to receive(:serialize_array).with(issue_as_array) + + described_class.serialize(issue_as_array) + end + + it 'fails to serialize an unrecognised value' do + expect { described_class.serialize(1) }.to raise_error(described_class::InvalidError) + end + end + + describe '.serialize_record' do + it 'serializes an issue' do + expect(described_class.serialize(issue)).to eq(issue_as_str) + end + + it 'serializes a project' do + expect(described_class.serialize(project)).to eq(project_as_str) + end + end + + describe '.serialize_array' do + it 'serializes a project array' do + expect(described_class.serialize(project_as_array)).to eq(project_as_str) + end + + it 'serializes an issue array' do + expect(described_class.serialize(issue_as_array)).to eq(issue_as_str) + end + + it 'fails to serialize a too-small array' do + expect { described_class.serialize(project_as_array[0..1]) }.to raise_error(described_class::InvalidError) + end + + it 'fails to serialize a too-large array' do + expect { described_class.serialize(project_as_array * 2) }.to raise_error(described_class::InvalidError) + end + end + + describe '.deserialize' do + it 'deserializes an issue string' do + expect(described_class.deserialize(issue_as_str)).to eq(issue_as_ref) + end + + it 'deserializes a project string' do + expect(described_class.deserialize(project_as_str)).to eq(project_as_ref) + end + end + + describe '#initialize' do + it 'creates an issue reference' do + expect(described_class.new(*issue_as_array)).to eq(issue_as_ref) + end + + it 'creates a project reference' do + expect(described_class.new(*project_as_array)).to eq(project_as_ref) + end + end + + describe '#==' do + let(:subclass) { Class.new(described_class) } + + it 'is equal to itself' do + expect(issue_as_ref).to eq(issue_as_ref) + end + + it 'is equal to another ref when all elements match' do + expect(issue_as_ref).to eq(described_class.new(*issue_as_array)) + end + + it 'is not equal unless the other instance class matches' do + expect(issue_as_ref).not_to eq(subclass.new(*issue_as_array)) + end + + it 'is not equal unless db_id matches' do + other = described_class.new(Issue, issue.id + 1, issue.es_id, issue.es_parent) + + expect(issue_as_ref).not_to eq(other) + end + + it 'is not equal unless es_id matches' do + other = described_class.new(Issue, issue.id, 'Other es_id', issue.es_parent) + + expect(issue_as_ref).not_to eq(other) + end + + it 'is not equal unless es_parent matches' do + other = described_class.new(Issue, issue.id, issue.es_id, 'Other es_parent') + + expect(issue_as_ref).not_to eq(other) + end + end + + describe '#klass_name' do + it { expect(issue_as_ref.klass_name).to eq('Issue') } + end + + describe '#database_record' do + it 'returns an issue' do + expect(issue_as_ref.database_record).to eq(issue) + end + + it 'returns a project' do + expect(project_as_ref.database_record).to eq(project) + end + + it 'returns nil if the record cannot be found' do + ref = described_class.new(Issue, issue.id + 1, 'issue_1') + + expect(ref.database_record).to be_nil + end + + it 'raises if the class is bad' do + ref = described_class.new(Integer, 1, 'integer_1') + + expect { ref.database_record }.to raise_error(NoMethodError) + end + end + + describe '#serialize' do + it 'serializes an issue' do + expect(issue_as_ref.serialize).to eq(issue_as_str) + end + + it 'serializes a project' do + expect(project_as_ref.serialize).to eq(project_as_str) + end + end +end diff --git a/ee/spec/models/concerns/elastic/note_spec.rb b/ee/spec/models/concerns/elastic/note_spec.rb index 5fe5d324f6a476e0f633428a78cd3a059992302e..1766aa6a159cd30f0be3c7d06552fde93782c97a 100644 --- a/ee/spec/models/concerns/elastic/note_spec.rb +++ b/ee/spec/models/concerns/elastic/note_spec.rb @@ -107,6 +107,8 @@ end it "does not create ElasticIndexerWorker job for system messages" do + stub_feature_flags(elastic_bulk_incremental_updates: false) + project = create :project, :repository # We have to set one minute delay because of https://gitlab.com/gitlab-org/gitlab-foss/merge_requests/15682 issue = create :issue, project: project, updated_at: 1.minute.ago @@ -116,6 +118,16 @@ create :note, :system, project: project, noteable: issue end + it 'does not track system note updates via the bulk updater' do + stub_feature_flags(elastic_bulk_incremental_updates: true) + + note = create(:note, :system) + + expect(Elastic::ProcessBookkeepingService).not_to receive(:track!) + + note.update!(note: 'some other text here') + end + it 'uses same index for Note subclasses' do Note.subclasses.each do |note_class| expect(note_class.index_name).to eq(Note.index_name) diff --git a/ee/spec/services/elastic/index_record_service_spec.rb b/ee/spec/services/elastic/index_record_service_spec.rb index b98ddaba5a64522bc0401e2e87e5ffd1585df0ac..8bf36b730bfc91675f5976de815dddf24e658b9c 100644 --- a/ee/spec/services/elastic/index_record_service_spec.rb +++ b/ee/spec/services/elastic/index_record_service_spec.rb @@ -25,10 +25,10 @@ with_them do it 'indexes new records' do - object = nil - Sidekiq::Testing.disable! do - object = create(type) - end + object = create(type) + + # Prevent records from being added via bulk indexing updates + ::Elastic::ProcessBookkeepingService.clear_tracking! expect do expect(subject.execute(object, true)).to eq(true) @@ -122,10 +122,14 @@ Sidekiq::Testing.inline! do expect(subject.execute(other_project, true)).to eq(true) end + + # Prevent records from being added via bulk indexing updates + ::Elastic::ProcessBookkeepingService.clear_tracking! + ensure_elasticsearch_index! # Only the project itself should be in the index - expect(Elasticsearch::Model.search('*').total_count).to be 1 + expect(Elasticsearch::Model.search('*').total_count).to eq(1) expect(Project.elastic_search('*').records).to contain_exactly(other_project) end @@ -312,13 +316,9 @@ def expect_indexing(issue_ids, response, unstub: false) end it 'skips records for which indexing is disabled' do - project = nil - - Sidekiq::Testing.disable! do - project = create :project, name: 'project_1' - end + stub_ee_application_setting(elasticsearch_limit_indexing: true) - expect(project).to receive(:use_elasticsearch?).and_return(false) + project = create(:project, name: 'project_1') Sidekiq::Testing.inline! do expect(subject.execute(project, true)).to eq(true) diff --git a/ee/spec/services/elastic/process_bookkeeping_service_spec.rb b/ee/spec/services/elastic/process_bookkeeping_service_spec.rb new file mode 100644 index 0000000000000000000000000000000000000000..a0c070c51a01065b56c4c0f38ea825eaa5ecf47c --- /dev/null +++ b/ee/spec/services/elastic/process_bookkeeping_service_spec.rb @@ -0,0 +1,140 @@ +# frozen_string_literal: true + +require 'spec_helper' + +describe Elastic::ProcessBookkeepingService, :clean_gitlab_redis_shared_state do + around do |example| + described_class.with_redis do |redis| + @redis = redis + example.run + end + end + + let(:zset) { 'elastic:incremental:updates:0:zset' } + let(:redis) { @redis } + let(:ref_class) { ::Gitlab::Elastic::DocumentReference } + + let(:fake_refs) { Array.new(10) { |i| ref_class.new(Issue, i, "issue_#{i}", 'project_1') } } + let(:issue) { fake_refs.first } + let(:issue_spec) { issue.serialize } + + describe '.track' do + it 'enqueues a record' do + described_class.track!(issue) + + spec, score = redis.zpopmin(zset) + + expect(spec).to eq(issue_spec) + expect(score).to eq(1.0) + end + + it 'enqueues a set of unique records' do + described_class.track!(*fake_refs) + + expect(described_class.queue_size).to eq(fake_refs.size) + + spec1, score1 = redis.zpopmin(zset) + _, score2 = redis.zpopmin(zset) + + expect(score1).to be < score2 + expect(spec1).to eq(issue_spec) + end + + it 'enqueues 10 identical records as 1 entry' do + described_class.track!(*([issue] * 10)) + + expect(described_class.queue_size).to eq(1) + end + + it 'deduplicates across multiple inserts' do + 10.times { described_class.track!(issue) } + + expect(described_class.queue_size).to eq(1) + end + end + + describe '.queue_size' do + it 'reports the queue size' do + expect(described_class.queue_size).to eq(0) + + described_class.track!(*fake_refs) + + expect(described_class.queue_size).to eq(fake_refs.size) + + expect { redis.zpopmin(zset) }.to change(described_class, :queue_size).by(-1) + end + end + + describe '.clear_tracking!' do + it 'removes all entries from the queue' do + described_class.track!(*fake_refs) + + expect(described_class.queue_size).to eq(fake_refs.size) + + described_class.clear_tracking! + + expect(described_class.queue_size).to eq(0) + end + end + + describe '#execute' do + let(:limit) { 5 } + + before do + stub_const('Elastic::ProcessBookkeepingService::LIMIT', limit) + end + + it 'submits a batch of documents' do + described_class.track!(*fake_refs) + + expect(described_class.queue_size).to eq(fake_refs.size) + expect_processing(*fake_refs[0...limit]) + + expect { described_class.new.execute }.to change(described_class, :queue_size).by(-limit) + end + + it 'retries failed documents' do + described_class.track!(*fake_refs) + failed = fake_refs[0] + + expect(described_class.queue_size).to eq(10) + expect_processing(*fake_refs[0...limit], failures: [failed]) + + expect { described_class.new.execute }.to change(described_class, :queue_size).by(-limit + 1) + + serialized, _ = redis.zpopmax(zset) + expect(ref_class.deserialize(serialized)).to eq(failed) + end + + it 'discards malformed documents' do + described_class.track!('Bad') + + expect(described_class.queue_size).to eq(1) + expect_next_instance_of(::Gitlab::Elastic::BulkIndexer) do |indexer| + expect(indexer).not_to receive(:process) + end + + expect { described_class.new.execute }.to change(described_class, :queue_size).by(-1) + end + + it 'fails, preserving documents, when processing fails with an exception' do + described_class.track!(issue) + + expect(described_class.queue_size).to eq(1) + expect_next_instance_of(::Gitlab::Elastic::BulkIndexer) do |indexer| + expect(indexer).to receive(:process).with(issue) { raise 'Bad' } + end + + expect { described_class.new.execute }.to raise_error('Bad') + expect(described_class.queue_size).to eq(1) + end + + def expect_processing(*refs, failures: []) + expect_next_instance_of(::Gitlab::Elastic::BulkIndexer) do |indexer| + refs.each { |ref| expect(indexer).to receive(:process).with(ref) } + + expect(indexer).to receive(:flush) { failures } + end + end + end +end diff --git a/ee/spec/support/elastic.rb b/ee/spec/support/elastic.rb index 188244a259ce4b50d27fa296eefdc7d72ab21660..96586855a40ccf8a8c4de415a069e6c000a54803 100644 --- a/ee/spec/support/elastic.rb +++ b/ee/spec/support/elastic.rb @@ -2,11 +2,13 @@ RSpec.configure do |config| config.before(:each, :elastic) do + Elastic::ProcessBookkeepingService.clear_tracking! Gitlab::Elastic::Helper.create_empty_index end config.after(:each, :elastic) do Gitlab::Elastic::Helper.delete_index + Elastic::ProcessBookkeepingService.clear_tracking! end config.include ElasticsearchHelpers, :elastic diff --git a/ee/spec/support/helpers/elasticsearch_helpers.rb b/ee/spec/support/helpers/elasticsearch_helpers.rb index c8b9a46fd58ad3fef0096790fdfabd49a86a71cc..09ce1981032a76dcc1c81ae5ca65b10689f36e9f 100644 --- a/ee/spec/support/helpers/elasticsearch_helpers.rb +++ b/ee/spec/support/helpers/elasticsearch_helpers.rb @@ -2,6 +2,14 @@ module ElasticsearchHelpers def ensure_elasticsearch_index! + # Ensure that any enqueued updates are processed + Elastic::ProcessBookkeepingService.new.execute + + # Make any documents added to the index visible + refresh_index! + end + + def refresh_index! ::Gitlab::Elastic::Helper.refresh_index end end diff --git a/ee/spec/workers/elastic_index_bulk_cron_worker_spec.rb b/ee/spec/workers/elastic_index_bulk_cron_worker_spec.rb new file mode 100644 index 0000000000000000000000000000000000000000..833970ab6a4b5e100ebd84258fd677c6a872af75 --- /dev/null +++ b/ee/spec/workers/elastic_index_bulk_cron_worker_spec.rb @@ -0,0 +1,18 @@ +# frozen_string_literal: true + +require 'spec_helper' + +describe ElasticIndexBulkCronWorker do + include ExclusiveLeaseHelpers + describe '.perform' do + it 'executes the service under an exclusive lease' do + expect_to_obtain_exclusive_lease('elastic_index_bulk_cron_worker') + + expect_next_instance_of(::Elastic::ProcessBookkeepingService) do |service| + expect(service).to receive(:execute) + end + + described_class.new.perform + end + end +end