From 45dfb0de66d0585e8c2b850e5e4bf5525b220c7f Mon Sep 17 00:00:00 2001
From: Ravi Kumar <rkumar@gitlab.com>
Date: Thu, 13 Apr 2023 08:53:31 +0000
Subject: [PATCH] Backfill the missing wiki permissions in the main index

For project wikis we were not indexing wiki_access_level and
visibility_level. We will backfill existing project wikis.
And for any newly created project wikis, these values will get indexed.

Changelog: added
MR: https://gitlab.com/gitlab-org/gitlab/-/merge_requests/116944
EE: true
---
 GITLAB_ELASTICSEARCH_INDEXER_VERSION          |   2 +-
 ...backfill_wiki_permissions_in_main_index.rb | 182 +++++++++++
 ee/lib/gitlab/elastic/indexer.rb              |   6 +-
 ...ill_wiki_permissions_in_main_index_spec.rb | 284 ++++++++++++++++++
 ee/spec/lib/gitlab/elastic/indexer_spec.rb    |   8 +-
 5 files changed, 476 insertions(+), 6 deletions(-)
 create mode 100644 ee/elastic/migrate/20230405500000_backfill_wiki_permissions_in_main_index.rb
 create mode 100644 ee/spec/elastic/migrate/2023040500000_backfill_wiki_permissions_in_main_index_spec.rb

diff --git a/GITLAB_ELASTICSEARCH_INDEXER_VERSION b/GITLAB_ELASTICSEARCH_INDEXER_VERSION
index 6aba2b245a847..80895903a15c8 100644
--- a/GITLAB_ELASTICSEARCH_INDEXER_VERSION
+++ b/GITLAB_ELASTICSEARCH_INDEXER_VERSION
@@ -1 +1 @@
-4.2.0
+4.3.0
diff --git a/ee/elastic/migrate/20230405500000_backfill_wiki_permissions_in_main_index.rb b/ee/elastic/migrate/20230405500000_backfill_wiki_permissions_in_main_index.rb
new file mode 100644
index 0000000000000..004975402bfe5
--- /dev/null
+++ b/ee/elastic/migrate/20230405500000_backfill_wiki_permissions_in_main_index.rb
@@ -0,0 +1,182 @@
+# frozen_string_literal: true
+
+class BackfillWikiPermissionsInMainIndex < Elastic::Migration
+  include Elastic::MigrationHelper
+  include Elastic::Latest::Routing
+
+  ELASTIC_TIMEOUT = '5m'
+  MAX_PROJECTS_TO_PROCESS = 50
+
+  batch_size 10_000
+  batched!
+  throttle_delay 5.seconds
+  retry_on_failure
+
+  def migrate
+    projects_in_progress = migration_state[:projects_in_progress] || []
+    if projects_in_progress.present?
+      failed_or_completed_projects = process_projects_in_progress(projects_in_progress)
+      projects_in_progress -= failed_or_completed_projects
+      set_migration_state(projects_in_progress: projects_in_progress)
+    end
+
+    if completed?
+      log 'Migration Completed: There are no wikis left to add permissions'
+      return
+    end
+
+    project_limit = determine_project_limit
+    return if projects_in_progress.size >= project_limit
+
+    log 'Enqueuing projects having wiki_blobs with missing permissions'
+    exclude_project_ids = projects_in_progress.pluck(:project_id) # rubocop: disable CodeReuse/ActiveRecord
+
+    projects_having_wikis_with_missing_permissions(exclude_project_ids: exclude_project_ids).each do |project_id|
+      task_id = update_by_query(Project.find(project_id))
+
+      next if task_id.nil?
+
+      projects_in_progress << { task_id: task_id, project_id: project_id }
+
+      break if projects_in_progress.size >= project_limit
+    rescue ActiveRecord::RecordNotFound
+      log "Project not found: #{project_id}. Scheduling ElasticDeleteProjectWorker"
+      # project must be removed from the index or it will continue to show up in the missing query
+      # since we do not have access to the project record, the es_id input must be constructed manually
+      es_id = ::Gitlab::Elastic::Helper.build_es_id(es_type: Project.es_type, target_id: project_id)
+      ElasticDeleteProjectWorker.perform_async(project_id, es_id)
+    end
+
+    set_migration_state(projects_in_progress: projects_in_progress)
+  end
+
+  def completed?
+    helper.refresh_index(index_name: helper.target_name)
+    log 'Running the count_items_missing_wiki_permissions query'
+    total_remaining = count_items_missing_wiki_permissions
+
+    log "Checking to see if migration is completed based on index counts remaining: #{total_remaining}"
+    total_remaining == 0
+  end
+
+  private
+
+  def process_projects_in_progress(projects)
+    failed_or_completed_projects = []
+    projects.each do |item|
+      project_id = item[:project_id]
+      task_id = item[:task_id]
+      begin
+        task_status = helper.task_status(task_id: task_id)
+      rescue ::Elasticsearch::Transport::Transport::Errors::NotFound
+        log_warn "Failed to fetch task status for project #{project_id} with_task_id: #{task_id}"
+        failed_or_completed_projects << item
+        next
+      end
+
+      if task_status['failures'].present? || task_status['error'].present?
+        log_warn "Failed to update project #{project_id} with_task_id: #{task_id} - #{task_status['failures']}"
+        failed_or_completed_projects << item
+      end
+
+      if task_status['completed'].present?
+        log "Updating wiki permissions in main index is completed for project #{project_id} with task_id: #{task_id}"
+        failed_or_completed_projects << item
+      else
+        log "Updating wiki permissions in main index is in progress for project #{project_id} with task_id: #{task_id}"
+      end
+    end
+    failed_or_completed_projects
+  end
+
+  def update_by_query(project)
+    log "Launching update query for project #{project.id}"
+    source = "ctx._source.wiki_access_level = #{project.wiki_access_level};" \
+             "ctx._source.visibility_level = #{project.visibility_level};"
+    response = client.update_by_query(
+      index: helper.target_name,
+      body: {
+        query: {
+          bool: {
+            filter: [
+              { term: { project_id: project.id } },
+              { term: { type: 'wiki_blob' } }
+            ]
+          }
+        },
+        script: {
+          lang: 'painless',
+          source: source
+        }
+      },
+      wait_for_completion: false,
+      max_docs: batch_size,
+      timeout: ELASTIC_TIMEOUT,
+      routing: routing_options({ project_id: project.id })[:routing],
+      conflicts: 'proceed'
+    )
+
+    if response['failures'].present?
+      log_warn "Failed to update project #{project.id} - #{response['failures']}"
+      return
+    end
+
+    response['task']
+  end
+
+  def count_items_missing_wiki_permissions
+    client.count(
+      index: helper.target_name,
+      body: {
+        query: query_wikis_with_missing_permissions
+      }
+    )['count']
+  end
+
+  def projects_having_wikis_with_missing_permissions(exclude_project_ids:)
+    results = client.search(
+      index: helper.target_name,
+      body: {
+        size: 0,
+        query: query_wikis_with_missing_permissions(exclude_project_ids),
+        aggs: {
+          project_ids: {
+            terms: {
+              size: MAX_PROJECTS_TO_PROCESS * 2,
+              field: 'project_id'
+            }
+          }
+        }
+      }
+    )
+    project_ids_hist = results.dig('aggregations', 'project_ids', 'buckets') || []
+    project_ids_hist.pluck('key') # rubocop: disable CodeReuse/ActiveRecord
+  end
+
+  def query_wikis_with_missing_permissions(exclude_project_ids = nil)
+    query = {
+      bool: {
+        minimum_should_match: 1,
+        should: [
+          {
+            bool: {
+              must_not: [{ exists: { field: 'visibility_level' } }]
+            }
+          },
+          {
+            bool: {
+              must_not: [{ exists: { field: 'wiki_access_level' } }]
+            }
+          }
+        ],
+        filter: { term: { type: 'wiki_blob' } }
+      }
+    }
+    query[:bool][:must_not] = { terms: { project_id: exclude_project_ids } } if exclude_project_ids.present?
+    query
+  end
+
+  def determine_project_limit
+    [get_number_of_shards(index_name: helper.target_name), MAX_PROJECTS_TO_PROCESS].min
+  end
+end
diff --git a/ee/lib/gitlab/elastic/indexer.rb b/ee/lib/gitlab/elastic/indexer.rb
index 18a805b7a39f3..edbbab6882007 100644
--- a/ee/lib/gitlab/elastic/indexer.rb
+++ b/ee/lib/gitlab/elastic/indexer.rb
@@ -101,13 +101,13 @@ def run_indexer!(base_sha, to_sha, target)
         command << "--search-curation" if Feature.enabled?(:search_index_curation)
         command << "--from-sha=#{base_sha}"
         command << "--to-sha=#{to_sha}"
+        command << "--full-path=#{project.full_path}"
+        command << "--visibility-level=#{project.visibility_level}"
 
         command += if index_wiki?
-                     %W[--blob-type=wiki_blob --skip-commits --full-path=#{project.full_path}]
+                     %W[--blob-type=wiki_blob --skip-commits --wiki-access-level=#{project.wiki_access_level}]
                    else
                      %W[
-                       --full-path=#{project.full_path}
-                       --visibility-level=#{project.visibility_level}
                        --repository-access-level=#{project.repository_access_level}
                      ].tap do |c|
                        migration_name = :add_hashed_root_namespace_id_to_commits
diff --git a/ee/spec/elastic/migrate/2023040500000_backfill_wiki_permissions_in_main_index_spec.rb b/ee/spec/elastic/migrate/2023040500000_backfill_wiki_permissions_in_main_index_spec.rb
new file mode 100644
index 0000000000000..49547402bf2f5
--- /dev/null
+++ b/ee/spec/elastic/migrate/2023040500000_backfill_wiki_permissions_in_main_index_spec.rb
@@ -0,0 +1,284 @@
+# frozen_string_literal: true
+
+require 'spec_helper'
+require_relative 'migration_shared_examples'
+require File.expand_path('ee/elastic/migrate/20230405500000_backfill_wiki_permissions_in_main_index.rb')
+
+RSpec.describe BackfillWikiPermissionsInMainIndex, :elastic_delete_by_query, :sidekiq_inline, feature_category: :global_search do
+  let(:version) { 20230405500000 }
+  let(:helper) { Gitlab::Elastic::Helper.new }
+  let(:migration) { described_class.new(version) }
+
+  let_it_be_with_reload(:projects) { create_list(:project, 3, :wiki_repo, visibility_level: 0, wiki_access_level: 0) }
+
+  before do
+    stub_ee_application_setting(elasticsearch_search: true, elasticsearch_indexing: true)
+    set_elasticsearch_migration_to(version, including: false)
+
+    allow(migration).to receive(:helper).and_return(helper)
+  end
+
+  describe 'migration_options' do
+    it 'has migration options set', :aggregate_failures do
+      expect(migration).to be_batched
+      expect(migration).to be_retry_on_failure
+      expect(migration.throttle_delay).to eq(5.seconds)
+      expect(migration.batch_size).to eq(10_000)
+    end
+  end
+
+  describe 'when Elasticsearch gives 404' do
+    context 'when Elasticsearch responds with NotFoundException' do
+      let(:client) { instance_double('Elasticsearch::Transport::Client') }
+      let(:update_by_query_response) { { 'failures' => ['failed'] } }
+
+      before do
+        allow(client).to receive(:update_by_query).and_return(update_by_query_response)
+
+        allow(migration).to receive(:projects_having_wikis_with_missing_permissions).and_return(projects.map(&:id))
+        allow(migration).to receive(:completed?).and_return(false)
+        allow(migration).to receive(:client).and_return(client)
+      end
+
+      context 'when a task_status throws a NotFound Exception' do
+        it 'removes entry from projects_in_progress in migration_state' do
+          migration_state = projects.map { |p| { task_id: 'oTUltX4IQMOUUVeiohTt8A:124', project_id: p.id } }
+          migration.set_migration_state(projects_in_progress: migration_state)
+          expect(migration).to receive(:set_migration_state).with(projects_in_progress: []).twice
+
+          expect { migration.migrate }.not_to raise_error
+
+          expect(migration.migration_state[:projects_in_progress]).to match_array(migration_state)
+        end
+      end
+    end
+  end
+
+  describe '.migrate' do
+    before do
+      projects.each do |p|
+        p.wiki.create_page('index_page', 'Bla bla term1')
+        p.wiki.index_wiki_blobs
+      end
+    end
+
+    context 'with wiki permissions already existing in all wikis' do
+      before do
+        projects.each { |p| set_visibility_level_for_wikis(p) }
+      end
+
+      it 'does not execute update_by_query' do
+        expect(migration).to be_completed
+        expect(helper.client).not_to receive(:update_by_query)
+        migration.migrate
+      end
+    end
+
+    context 'with wiki permissions not present in wikis' do
+      before do
+        projects.each { |p| remove_visibility_level_for_wikis(p) }
+      end
+
+      context 'when task in progress' do
+        let(:client) { instance_double('Elasticsearch::Transport::Client') }
+
+        before do
+          allow(migration).to receive(:completed?).and_return(false)
+          allow(migration).to receive(:client).and_return(client)
+          allow(migration).to receive(:projects_having_wikis_with_missing_permissions).and_return([])
+          allow(helper).to receive(:task_status).with(task_id: 'task_1').and_return('completed' => false)
+          migration.set_migration_state(projects_in_progress: [{ task_id: 'task_1', project_id: 1 }])
+        end
+
+        it 'does nothing if task is not completed' do
+          expect(client).not_to receive(:update_by_query)
+          migration.migrate
+        end
+      end
+
+      context 'with project not found exception' do
+        let(:client) { instance_double('Elasticsearch::Transport::Client') }
+
+        before do
+          allow(migration).to receive(:client).and_return(client)
+          allow(migration).to receive(:projects_having_wikis_with_missing_permissions).and_return([0])
+          allow(migration).to receive(:completed?).and_return(false)
+        end
+
+        it 'logs failure when project is not found and schedules ElasticDeleteProjectWorker' do
+          expect(migration).to receive(:log).with(/Enqueuing projects having wiki_blobs with missing permissions/).once
+          expect(migration).to receive(:log).with(/Project not found/).once
+          expect(migration).to receive(:log).with(/Setting migration_state to/).once
+          expect(ElasticDeleteProjectWorker).to receive(:perform_async).with(0, 'project_0')
+          migration.migrate
+        end
+      end
+
+      context 'when migration fails' do
+        let(:client) { instance_double('Elasticsearch::Transport::Client') }
+
+        before do
+          allow(client).to receive(:update_by_query).and_return(update_by_query_response)
+          allow(helper).to receive(:task_status).with(task_id: 'task_1').and_return(task_status_response)
+
+          allow(migration).to receive(:projects_having_wikis_with_missing_permissions).and_return(projects.map(&:id))
+          allow(migration).to receive(:completed?).and_return(false)
+          allow(migration).to receive(:client).and_return(client)
+        end
+
+        context 'when Elasticsearch responds with errors' do
+          context 'when a task throws an error' do
+            let(:task_status_response) { { 'failures' => ['failed'] } }
+            let(:update_by_query_response) { { 'task' => 'task_1' } }
+
+            it 'removes entry from projects_in_progress in migration_state' do
+              migration_state = projects.map { |p| { task_id: 'task_1', project_id: p.id } }
+              migration.set_migration_state(projects_in_progress: migration_state)
+              expect(migration).to receive(:set_migration_state).with(projects_in_progress: [])
+              expect(migration).to receive(:set_migration_state).with(projects_in_progress: migration_state)
+
+              expect { migration.migrate }.not_to raise_error
+
+              expect(migration.migration_state[:projects_in_progress]).to match_array(migration_state)
+            end
+          end
+
+          context 'when update_by_query throws an error' do
+            let(:task_status_response) { {} }
+            let(:update_by_query_response) { { 'failures' => ['failed'] } }
+
+            it 'removes entry from projects_in_progress in migration_state' do
+              migration.set_migration_state({}) # simulate first run
+
+              expect { migration.migrate }.not_to raise_error
+              expect(migration.migration_state).to match(projects_in_progress: [])
+            end
+          end
+        end
+      end
+    end
+  end
+
+  describe 'integration test' do
+    let(:old_version_without_wiki_permissions) { 20230321202400 }
+
+    before do
+      set_elasticsearch_migration_to(old_version_without_wiki_permissions, including: false)
+
+      projects.each do |project|
+        project.wiki.create_page('index_page', 'Bla bla term1')
+        project.wiki.create_page('home_page', 'Bla bla term2')
+        project.wiki.create_page('index_page2', 'Bla bla term3')
+        project.wiki.create_page('home_page2', 'Bla bla term4')
+        project.wiki.index_wiki_blobs
+      end
+      ensure_elasticsearch_index!
+      projects.each { |project| remove_visibility_level_for_wikis(project) }
+      set_elasticsearch_migration_to(version, including: false)
+    end
+
+    it 'updates documents in batches' do
+      # calculate how many files are in each project in the index
+      query = { bool: { must: [{ term: { project_id: projects.first.id } }, { term: { type: 'wiki_blob' } }] } }
+      file_count = helper.client.count(index: helper.target_name, body: { query: query })['count']
+      allow(migration).to receive(:batch_size).and_return(file_count / 2)
+      stub_const("BackfillWikiPermissionsInMainIndex::MAX_PROJECTS_TO_PROCESS", 2)
+
+      expect(migration).not_to be_completed
+      expect(migration).to receive(:update_by_query).exactly(projects.size * 2).times.and_call_original
+
+      # process first two projects and half of the records
+      # the projects are returned ordered by record count, then by project_id
+      # make sure to give time to process the tasks
+      old_migration_state = migration.migration_state[:projects_in_progress]
+
+      5.times do |_|
+        migration.migrate
+        break if old_migration_state != migration.migration_state[:projects_in_progress]
+      end
+
+      expected_migration_project_ids = projects.map(&:id)
+
+      project_ids = migration.migration_state[:projects_in_progress].pluck(:project_id)
+      expect(expected_migration_project_ids).to include(*project_ids)
+      expect(project_ids.size).to eq(2)
+
+      # process two projects, the last project is now returned because it has the most documents to update
+      old_migration_state = migration.migration_state[:projects_in_progress]
+      5.times do |_|
+        migration.migrate
+        break if old_migration_state != migration.migration_state[:projects_in_progress]
+      end
+
+      project_ids = migration.migration_state[:projects_in_progress].pluck(:project_id)
+      expect(expected_migration_project_ids).to include(*project_ids)
+      expect(project_ids.size).to eq(2)
+
+      # process two projects, the second project is returned because the first project is completed
+      old_migration_state = migration.migration_state[:projects_in_progress]
+      5.times do |_|
+        migration.migrate
+        break if old_migration_state != migration.migration_state[:projects_in_progress]
+      end
+
+      project_ids = migration.migration_state[:projects_in_progress].pluck(:project_id)
+      expect(expected_migration_project_ids).to include(*project_ids)
+      expect(project_ids.size).to eq(2)
+
+      # all projects are marked as completed
+      old_migration_state = migration.migration_state[:projects_in_progress]
+      5.times do |_|
+        migration.migrate
+        break if old_migration_state != migration.migration_state[:projects_in_progress]
+      end
+
+      expect(migration.migration_state[:projects_in_progress]).to be_empty
+      expect(migration).to be_completed
+    end
+  end
+
+  def set_visibility_level_for_wikis(project)
+    source = "ctx._source.wiki_access_level = #{project.wiki_access_level};" \
+             "ctx._source.visibility_level = #{project.visibility_level};"
+    update_by_query project, source
+  end
+
+  def remove_visibility_level_for_wikis(project)
+    source = "ctx._source.remove('visibility_level');ctx._source.remove('wiki_access_level')"
+    update_by_query project, source
+  end
+
+  def update_by_query(project, source)
+    Project.__elasticsearch__.client.update_by_query({
+      index: Project.__elasticsearch__.index_name,
+      refresh: true,
+      body: {
+        script: {
+          source: source,
+          lang: 'painless'
+        },
+        query: {
+          bool: {
+            filter: [
+              {
+                term: {
+                  type: 'wiki_blob'
+                }
+              },
+              {
+                has_parent: {
+                  parent_type: 'project',
+                  query: {
+                    term: {
+                      id: project.id
+                    }
+                  }
+                }
+              }
+            ]
+          }
+        }
+      }
+    })
+  end
+end
diff --git a/ee/spec/lib/gitlab/elastic/indexer_spec.rb b/ee/spec/lib/gitlab/elastic/indexer_spec.rb
index d5279908872cf..525a1458ffc9b 100644
--- a/ee/spec/lib/gitlab/elastic/indexer_spec.rb
+++ b/ee/spec/lib/gitlab/elastic/indexer_spec.rb
@@ -385,9 +385,11 @@ def indexed_commits_for(term)
               "--timeout=#{described_class.timeout}s",
               "--from-sha=#{expected_from_sha}",
               "--to-sha=#{to_sha}",
+              "--full-path=#{project.full_path}",
+              "--visibility-level=#{project.visibility_level}",
               '--blob-type=wiki_blob',
               '--skip-commits',
-              "--full-path=#{project.full_path}",
+              "--wiki-access-level=#{project.wiki_access_level}",
               "--traversal-ids=#{project.namespace_ancestry}",
               "#{project.wiki.repository.disk_path}.git"
             ],
@@ -411,9 +413,11 @@ def indexed_commits_for(term)
             '--search-curation',
             "--from-sha=#{expected_from_sha}",
             "--to-sha=#{to_sha}",
+            "--full-path=#{project.full_path}",
+            "--visibility-level=#{project.visibility_level}",
             '--blob-type=wiki_blob',
             '--skip-commits',
-            "--full-path=#{project.full_path}",
+            "--wiki-access-level=#{project.wiki_access_level}",
             "--traversal-ids=#{project.namespace_ancestry}",
             "#{project.wiki.repository.disk_path}.git"
           ],
-- 
GitLab