Skip to content
代码片段 群组 项目
提交 a353177c 编辑于 作者: Ravi Kumar's avatar Ravi Kumar 提交者: Terri Chu
浏览文件

Add a migration to add prefix project in wiki rid

This MR will add the prefix project in rid in all wiki_blobs. This is
needed because in the future we will support group wikis as well so we
need a way to differentiate the rid in project wikis and group wikis.

Changelog: other
MR: https://gitlab.com/gitlab-org/gitlab/-/merge_requests/119261
EE: true
上级 ef10bb7b
No related branches found
No related tags found
无相关合并请求
4.3.0 4.3.1
# frozen_string_literal: true
class AddSuffixProjectInWikiRid < Elastic::Migration
include Elastic::MigrationHelper
pause_indexing!
batched!
batch_size 10_000
space_requirements!
throttle_delay 1.minute
ELASTIC_TIMEOUT = '5m'
MAX_ATTEMPTS_PER_SLICE = 30
def migrate
retry_attempt, slice, task_id, max_slices = set_vars
if retry_attempt >= MAX_ATTEMPTS_PER_SLICE
fail_migration_halt_error!(retry_attempt: retry_attempt)
return
end
return if slice >= max_slices
if task_id
process_already_started_task(task_id, slice)
return
end
log('Launching reindexing', slice: slice, max_slices: max_slices)
response = update_by_query(slice, max_slices)
task_id = response['task']
log('Reindexing for slice is started', slice: slice, max_slices: max_slices, task_id: task_id)
set_migration_state(slice: slice, task_id: task_id, max_slices: max_slices,
remaining_document_count: count_items_missing_prefix_in_rid)
rescue StandardError => e
log('migration failed, increasing migration_state', slice: slice, retry_attempt: retry_attempt, error: e.message)
set_migration_state(slice: slice, task_id: nil, retry_attempt: retry_attempt + 1, max_slices: max_slices)
raise e
end
def completed?
total_remaining = count_items_missing_prefix_in_rid
log('Checking if migration is finished based on index counts remaining', total_remaining: total_remaining)
total_remaining.eql?(0)
end
def space_required_bytes
# wiki documents on GitLab.com takes at most 1% of the main index storage
# this migration will require a small buffer
(helper.index_size_bytes * 0.01).ceil
end
private
def index_name
Elastic::Latest::WikiConfig.index_name
end
def count_items_missing_prefix_in_rid
helper.refresh_index(index_name: index_name)
client.count(index: index_name, body: { query: query })['count']
end
def query
{ regexp: { rid: "wiki_[0-9].*" } }
end
def set_migration_state_for_next_slice(slice)
set_migration_state(
slice: slice,
task_id: nil,
retry_attempt: 0,
max_slices: migration_state[:max_slices]
)
end
def update_by_query(slice, max_slices)
client.update_by_query(
index: index_name,
body: {
query: query,
script: { lang: 'painless', source: "ctx._source.rid = ctx._source.rid.replace('wiki', 'wiki_project')" },
slice: { id: slice, max: max_slices }
},
wait_for_completion: false,
max_docs: batch_size,
timeout: ELASTIC_TIMEOUT,
conflicts: 'proceed'
)
end
def set_vars
retry_attempt = migration_state[:retry_attempt] || 0
slice = migration_state[:slice] || 0
task_id = migration_state[:task_id]
max_slices = migration_state[:max_slices] || get_number_of_shards(index_name: index_name)
[retry_attempt, slice, task_id, max_slices]
end
def process_already_started_task(task_id, slice)
log('Checking reindexing status', slice: slice, task_id: task_id)
if reindexing_completed?(task_id: task_id)
log('Reindexing is completed', slice: slice, task_id: task_id)
set_migration_state_for_next_slice(slice + 1)
else
log('Reindexing is still in progress', slice: slice, task_id: task_id)
end
end
end
...@@ -36,7 +36,12 @@ def delete_index_for_commits_and_blobs(wiki: false) ...@@ -36,7 +36,12 @@ def delete_index_for_commits_and_blobs(wiki: false)
types = wiki ? %w[wiki_blob] : %w[commit blob] types = wiki ? %w[wiki_blob] : %w[commit blob]
if (wiki && ::Elastic::DataMigrationService.migration_has_finished?(:migrate_wikis_to_separate_index)) || types.include?('commit') if (wiki && ::Elastic::DataMigrationService.migration_has_finished?(:migrate_wikis_to_separate_index)) || types.include?('commit')
index, rid = if wiki index, rid = if wiki
[::Elastic::Latest::WikiConfig.index_name, "wiki_#{project_id}"] output = [::Elastic::Latest::WikiConfig.index_name]
output << if ::Elastic::DataMigrationService.migration_has_finished?(:add_suffix_project_in_wiki_rid)
"wiki_project_#{project_id}"
else
"wiki_#{project_id}"
end
else else
[::Elastic::Latest::CommitConfig.index_name, project_id] [::Elastic::Latest::CommitConfig.index_name, project_id]
end end
......
...@@ -18,7 +18,11 @@ def elastic_search_as_wiki_page(query, page: 1, per: 20, options: {}) ...@@ -18,7 +18,11 @@ def elastic_search_as_wiki_page(query, page: 1, per: 20, options: {})
private private
def repository_id def repository_id
"wiki_#{project.id}" if ::Elastic::DataMigrationService.migration_has_finished?(:add_suffix_project_in_wiki_rid)
"wiki_project_#{project.id}"
else
"wiki_#{project.id}"
end
end end
end end
end end
......
# frozen_string_literal: true
require 'spec_helper'
require File.expand_path('ee/elastic/migrate/20230428500000_add_suffix_project_in_wiki_rid.rb')
RSpec.describe AddSuffixProjectInWikiRid, :elastic_clean, :sidekiq_inline, feature_category: :global_search do
let(:version) { 20230428500000 }
let(:migration) { described_class.new(version) }
let(:helper) { Gitlab::Elastic::Helper.new }
let(:client) { ::Gitlab::Search::Client.new }
before do
stub_ee_application_setting(elasticsearch_search: true, elasticsearch_indexing: true)
allow(migration).to receive(:helper).and_return(helper)
set_elasticsearch_migration_to :add_suffix_project_in_wiki_rid, including: false
allow(migration).to receive(:client).and_return(client)
end
describe 'migration_options' do
it 'has migration options set', :aggregate_failures do
expect(migration).to be_batched
expect(migration.throttle_delay).to eq(1.minute)
expect(migration).to be_pause_indexing
expect(migration).to be_space_requirements
end
end
describe '.migrate' do
context 'for batch run' do
it 'sets migration_state task_id' do
migration.migrate
expect(migration.migration_state).to include(slice: 0, max_slices: 5)
expect(migration.migration_state['task_id']).not_to be nil
end
it 'sets next slice and clears task_id after task check' do
allow(migration).to receive(:reindexing_completed?).and_return(true)
migration.set_migration_state(slice: 0, max_slices: 5, retry_attempt: 0, task_id: 'task_id')
migration.migrate
expect(migration.migration_state).to include(slice: 1, max_slices: 5, task_id: nil)
end
it 'resets retry_attempt clears task_id for the next slice' do
allow(migration).to receive(:reindexing_completed?).and_return(true)
migration.set_migration_state(slice: 0, max_slices: 5, retry_attempt: 5, task_id: 'task_id')
migration.migrate
expect(migration.migration_state).to match(slice: 1, max_slices: 5, retry_attempt: 0, task_id: nil)
end
context 'when reindexing is still in progress' do
before do
allow(migration).to receive(:reindexing_completed?).and_return(false)
end
it 'does nothing' do
migration.set_migration_state(slice: 0, max_slices: 5, retry_attempt: 0, task_id: 'task_id')
migration.migrate
expect(client).not_to receive(:update_by_query)
end
end
context 'with wikis in elastic' do
# Create wikis on different projects to ensure they are spread across
# all shards. If they all end up in 1 ES shard then they'll be migrated
# in a single slice.
let_it_be(:projects) { create_list(:project, 3, :wiki_repo, visibility_level: 0, wiki_access_level: 0) }
before do
projects.each do |project|
project.wiki.create_page('index_page', 'Bla bla term')
project.wiki.create_page('home_page', 'Bla bla term2')
project.wiki.index_wiki_blobs
end
ensure_elasticsearch_index! # ensure objects are indexed
end
it 'migrates all wikis' do
slices = 2
migration.set_migration_state(slice: 0, max_slices: slices, retry_attempt: 0)
migration.migrate
10.times do
break if migration.completed?
migration.migrate
end
expect(migration.completed?).to be_truthy
expect(client.search(index: "#{es_helper.target_name}-wikis")['hits']['hits'].map do |hit|
hit['_source']['rid'].match(/wiki_project_[0-9].*/)
end.all?).to be true
end
end
end
context 'for failed run' do
context 'if exception is raised' do
before do
allow(migration).to receive(:client).and_return(client)
allow(client).to receive(:update_by_query).and_raise(StandardError)
end
it 'increases retry_attempt and clears task_id' do
migration.set_migration_state(slice: 0, max_slices: 2, retry_attempt: 1)
expect { migration.migrate }.to raise_error(StandardError)
expect(migration.migration_state).to match(slice: 0, max_slices: 2, retry_attempt: 2, task_id: nil)
end
it 'fails the migration after too many attempts' do
migration.set_migration_state(slice: 0, max_slices: 2, retry_attempt: 30)
migration.migrate
expect(migration.migration_state).to match(
slice: 0,
max_slices: 2,
retry_attempt: 30,
halted: true,
failed: true,
halted_indexing_unpaused: false
)
expect(client).not_to receive(:update_by_query)
end
end
context 'when elasticsearch failures' do
context 'if total is not equal' do
before do
allow(helper).to receive(:task_status).and_return(
{
"completed" => true,
"response" => {
"total" => 60, "updated" => 0, "created" => 45, "deleted" => 0, "failures" => []
}
}
)
end
it 'raises an error and clears task_id' do
migration.set_migration_state(slice: 0, max_slices: 2, retry_attempt: 0, task_id: 'task_id')
expect { migration.migrate }.to raise_error(/total is not equal/)
expect(migration.migration_state[:task_id]).to be_nil
end
end
context 'when reindexing fails' do
before do
allow(helper).to receive(:task_status).with(task_id: 'task_id').and_return(
{
"completed" => true,
"response" => {
"total" => 60,
"updated" => 0,
"created" => 0,
"deleted" => 0,
"failures" => [
{ type: "es_rejected_execution_exception" }
]
}
}
)
end
it 'raises an error and clears task_id' do
migration.set_migration_state(slice: 0, max_slices: 2, retry_attempt: 0, task_id: 'task_id')
expect { migration.migrate }.to raise_error(/failed with/)
expect(migration.migration_state[:task_id]).to be_nil
end
end
end
end
end
describe '.completed?' do
subject { migration.completed? }
let_it_be(:project) { create(:project, :wiki_repo, visibility_level: 0, wiki_access_level: 0) }
before do
project.wiki.create_page('index_page', 'Bla bla term')
project.wiki.index_wiki_blobs
ensure_elasticsearch_index! # ensure objects are indexed
end
context 'when there are no items which are missing project prefix in rid' do
before do
client.update_by_query(index: Elastic::Latest::WikiConfig.index_name,
body: {
script: { lang: 'painless',
source: "ctx._source.rid = ctx._source.rid.replace('wiki', 'wiki_project')" }
}
)
end
it 'returns true' do
is_expected.to be_truthy
end
end
context 'when some items are missing project prefix in rid' do
before do
client.update_by_query(index: Elastic::Latest::WikiConfig.index_name,
body: {
script: { lang: 'painless',
source: "ctx._source.rid = ctx._source.rid.replace('wiki_project', 'wiki')" }
}
)
end
it 'returns false' do
is_expected.to be_falsey
end
end
end
describe 'space_required_bytes' do
subject { migration.space_required_bytes }
before do
allow(helper).to receive(:index_size_bytes).and_return(300)
end
it { is_expected.to eq(3) }
end
end
0% 加载中 .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册