Skip to content
代码片段 群组 项目
未验证 提交 9e53a29f 编辑于 作者: Dmitry Gruzd's avatar Dmitry Gruzd 提交者: GitLab
浏览文件

Add Search::Zoekt::Task

Changelog: changed
上级 3bdf00a8
未找到相关分支
未找到相关标签
无相关合并请求
显示
291 个添加3 个删除
...@@ -23,7 +23,8 @@ ...@@ -23,7 +23,8 @@
IncidentManagement::PendingEscalations::Issue, IncidentManagement::PendingEscalations::Issue,
Security::Finding, Security::Finding,
Analytics::ValueStreamDashboard::Count, Analytics::ValueStreamDashboard::Count,
Ci::FinishedBuildChSyncEvent Ci::FinishedBuildChSyncEvent,
Search::Zoekt::Task
]) ])
else else
Gitlab::Database::Partitioning.register_tables( Gitlab::Database::Partitioning.register_tables(
......
---
table_name: zoekt_tasks
classes:
- Search::Zoekt::Task
feature_categories:
- global_search
description: Represents a zoekt task for a project
introduced_by_url: https://gitlab.com/gitlab-org/gitlab/-/merge_requests/143063
milestone: '16.10'
gitlab_schema: gitlab_main_cell
sharding_key:
project_identifier: projects
# frozen_string_literal: true
class AddZoektTasks < Gitlab::Database::Migration[2.2]
milestone '16.10'
OPTIONS = {
primary_key: [:id, :partition_id],
options: 'PARTITION BY LIST (partition_id)',
if_not_exists: true
}
ZOEKT_NODE_ID_INDEX_NAME = 'index_zoekt_tasks_on_zoekt_node_id_and_state_and_perform_at'
STATE_INDEX_NAME = 'index_zoekt_tasks_on_state'
FAILED_STATE_ENUM = 255
CONSTRAINT_NAME = 'c_zoekt_tasks_on_retries_left'
CONSTRAINT_QUERY = <<~SQL
(retries_left > 0) OR (retries_left = 0 AND state = #{FAILED_STATE_ENUM})
SQL
def change
create_table :zoekt_tasks, **OPTIONS do |t|
t.bigserial :id, null: false
t.bigint :partition_id, null: false, default: 1
t.bigint :zoekt_node_id, null: false
t.bigint :zoekt_repository_id, null: false
t.bigint :project_identifier, null: false # sharding key
t.datetime_with_timezone :perform_at, null: false, default: -> { 'NOW()' }
t.timestamps_with_timezone null: false
t.integer :state, null: false, default: 0, limit: 2
t.integer :task_type, null: false, limit: 2
t.integer :retries_left, null: false, default: 5, limit: 2
t.index :state, name: STATE_INDEX_NAME, using: :btree
t.index :zoekt_repository_id
t.index [:zoekt_node_id, :state, :perform_at], name: ZOEKT_NODE_ID_INDEX_NAME, using: :btree
t.check_constraint CONSTRAINT_QUERY, name: CONSTRAINT_NAME
end
end
end
# frozen_string_literal: true
class EnsureIdUniquenessForZoektTasks < Gitlab::Database::Migration[2.2]
include Gitlab::Database::PartitioningMigrationHelpers::UniquenessHelpers
milestone '16.10'
enable_lock_retries!
TABLE_NAME = :zoekt_tasks
FUNCTION_NAME = :assign_zoekt_tasks_id_value
def up
ensure_unique_id(TABLE_NAME)
end
def down
execute(<<~SQL.squish)
ALTER TABLE #{TABLE_NAME}
ALTER COLUMN id SET DEFAULT nextval('zoekt_tasks_id_seq'::regclass);
DROP FUNCTION IF EXISTS #{FUNCTION_NAME} CASCADE;
SQL
end
end
# frozen_string_literal: true
class AddNodeIdForeignKeyToZoektTasks < Gitlab::Database::Migration[2.2]
include Gitlab::Database::PartitioningMigrationHelpers::ForeignKeyHelpers
disable_ddl_transaction!
milestone '16.10'
def up
add_concurrent_partitioned_foreign_key :zoekt_tasks, :zoekt_nodes, column: :zoekt_node_id, on_delete: :cascade
end
def down
with_lock_retries do
remove_foreign_key :zoekt_tasks, column: :zoekt_node_id
end
end
end
026d861a03e5b8790e22c81b320a3f3c74f9b81f0a693e91a1648b2084c60f0e
\ No newline at end of file
5ca0b37dfd9766f56baf69458587bad31749866c218b91b56d466fb8bec4e75c
\ No newline at end of file
3f4092ae4c9f58985b14b724b9d0b1b987f4554f0a6b9339c8b92571b7bf51ad
\ No newline at end of file
...@@ -49,6 +49,19 @@ RETURN NEW; ...@@ -49,6 +49,19 @@ RETURN NEW;
END END
$$; $$;
   
CREATE FUNCTION assign_zoekt_tasks_id_value() RETURNS trigger
LANGUAGE plpgsql
AS $$
BEGIN
IF NEW."id" IS NOT NULL THEN
RAISE WARNING 'Manually assigning ids is not allowed, the value will be ignored';
END IF;
NEW."id" := nextval('zoekt_tasks_id_seq'::regclass);
RETURN NEW;
END
$$;
CREATE FUNCTION delete_associated_project_namespace() RETURNS trigger CREATE FUNCTION delete_associated_project_namespace() RETURNS trigger
LANGUAGE plpgsql LANGUAGE plpgsql
AS $$ AS $$
...@@ -1045,6 +1058,22 @@ CREATE TABLE web_hook_logs ( ...@@ -1045,6 +1058,22 @@ CREATE TABLE web_hook_logs (
) )
PARTITION BY RANGE (created_at); PARTITION BY RANGE (created_at);
   
CREATE TABLE zoekt_tasks (
id bigint NOT NULL,
partition_id bigint DEFAULT 1 NOT NULL,
zoekt_node_id bigint NOT NULL,
zoekt_repository_id bigint NOT NULL,
project_identifier bigint NOT NULL,
perform_at timestamp with time zone DEFAULT now() NOT NULL,
created_at timestamp with time zone NOT NULL,
updated_at timestamp with time zone NOT NULL,
state smallint DEFAULT 0 NOT NULL,
task_type smallint NOT NULL,
retries_left smallint DEFAULT 5 NOT NULL,
CONSTRAINT c_zoekt_tasks_on_retries_left CHECK (((retries_left > 0) OR ((retries_left = 0) AND (state = 255))))
)
PARTITION BY LIST (partition_id);
CREATE TABLE analytics_cycle_analytics_issue_stage_events ( CREATE TABLE analytics_cycle_analytics_issue_stage_events (
stage_event_hash_id bigint NOT NULL, stage_event_hash_id bigint NOT NULL,
issue_id bigint NOT NULL, issue_id bigint NOT NULL,
...@@ -17961,6 +17990,15 @@ CREATE SEQUENCE zoekt_shards_id_seq ...@@ -17961,6 +17990,15 @@ CREATE SEQUENCE zoekt_shards_id_seq
   
ALTER SEQUENCE zoekt_shards_id_seq OWNED BY zoekt_shards.id; ALTER SEQUENCE zoekt_shards_id_seq OWNED BY zoekt_shards.id;
   
CREATE SEQUENCE zoekt_tasks_id_seq
START WITH 1
INCREMENT BY 1
NO MINVALUE
NO MAXVALUE
CACHE 1;
ALTER SEQUENCE zoekt_tasks_id_seq OWNED BY zoekt_tasks.id;
CREATE TABLE zoom_meetings ( CREATE TABLE zoom_meetings (
id bigint NOT NULL, id bigint NOT NULL,
project_id bigint NOT NULL, project_id bigint NOT NULL,
...@@ -22192,6 +22230,9 @@ ALTER TABLE ONLY zoekt_repositories ...@@ -22192,6 +22230,9 @@ ALTER TABLE ONLY zoekt_repositories
ALTER TABLE ONLY zoekt_shards ALTER TABLE ONLY zoekt_shards
ADD CONSTRAINT zoekt_shards_pkey PRIMARY KEY (id); ADD CONSTRAINT zoekt_shards_pkey PRIMARY KEY (id);
   
ALTER TABLE ONLY zoekt_tasks
ADD CONSTRAINT zoekt_tasks_pkey PRIMARY KEY (id, partition_id);
ALTER TABLE ONLY zoom_meetings ALTER TABLE ONLY zoom_meetings
ADD CONSTRAINT zoom_meetings_pkey PRIMARY KEY (id); ADD CONSTRAINT zoom_meetings_pkey PRIMARY KEY (id);
   
...@@ -27329,6 +27370,12 @@ CREATE INDEX index_zoekt_shards_on_last_seen_at ON zoekt_shards USING btree (las ...@@ -27329,6 +27370,12 @@ CREATE INDEX index_zoekt_shards_on_last_seen_at ON zoekt_shards USING btree (las
   
CREATE UNIQUE INDEX index_zoekt_shards_on_search_base_url ON zoekt_shards USING btree (search_base_url); CREATE UNIQUE INDEX index_zoekt_shards_on_search_base_url ON zoekt_shards USING btree (search_base_url);
   
CREATE INDEX index_zoekt_tasks_on_state ON ONLY zoekt_tasks USING btree (state);
CREATE INDEX index_zoekt_tasks_on_zoekt_node_id_and_state_and_perform_at ON ONLY zoekt_tasks USING btree (zoekt_node_id, state, perform_at);
CREATE INDEX index_zoekt_tasks_on_zoekt_repository_id ON ONLY zoekt_tasks USING btree (zoekt_repository_id);
CREATE INDEX index_zoom_meetings_on_issue_id ON zoom_meetings USING btree (issue_id); CREATE INDEX index_zoom_meetings_on_issue_id ON zoom_meetings USING btree (issue_id);
   
CREATE UNIQUE INDEX index_zoom_meetings_on_issue_id_and_issue_status ON zoom_meetings USING btree (issue_id, issue_status) WHERE (issue_status = 1); CREATE UNIQUE INDEX index_zoom_meetings_on_issue_id_and_issue_status ON zoom_meetings USING btree (issue_id, issue_status) WHERE (issue_status = 1);
...@@ -29103,6 +29150,8 @@ CREATE TRIGGER assign_p_ci_job_artifacts_id_trigger BEFORE INSERT ON p_ci_job_ar ...@@ -29103,6 +29150,8 @@ CREATE TRIGGER assign_p_ci_job_artifacts_id_trigger BEFORE INSERT ON p_ci_job_ar
   
CREATE TRIGGER assign_p_ci_pipeline_variables_id_trigger BEFORE INSERT ON p_ci_pipeline_variables FOR EACH ROW EXECUTE FUNCTION assign_p_ci_pipeline_variables_id_value(); CREATE TRIGGER assign_p_ci_pipeline_variables_id_trigger BEFORE INSERT ON p_ci_pipeline_variables FOR EACH ROW EXECUTE FUNCTION assign_p_ci_pipeline_variables_id_value();
   
CREATE TRIGGER assign_zoekt_tasks_id_trigger BEFORE INSERT ON zoekt_tasks FOR EACH ROW EXECUTE FUNCTION assign_zoekt_tasks_id_value();
CREATE TRIGGER chat_names_loose_fk_trigger AFTER DELETE ON chat_names REFERENCING OLD TABLE AS old_table FOR EACH STATEMENT EXECUTE FUNCTION insert_into_loose_foreign_keys_deleted_records(); CREATE TRIGGER chat_names_loose_fk_trigger AFTER DELETE ON chat_names REFERENCING OLD TABLE AS old_table FOR EACH STATEMENT EXECUTE FUNCTION insert_into_loose_foreign_keys_deleted_records();
   
CREATE TRIGGER ci_builds_loose_fk_trigger AFTER DELETE ON ci_builds REFERENCING OLD TABLE AS old_table FOR EACH STATEMENT EXECUTE FUNCTION insert_into_loose_foreign_keys_deleted_records(); CREATE TRIGGER ci_builds_loose_fk_trigger AFTER DELETE ON ci_builds REFERENCING OLD TABLE AS old_table FOR EACH STATEMENT EXECUTE FUNCTION insert_into_loose_foreign_keys_deleted_records();
...@@ -30964,6 +31013,9 @@ ALTER TABLE ONLY project_repository_storage_moves ...@@ -30964,6 +31013,9 @@ ALTER TABLE ONLY project_repository_storage_moves
ALTER TABLE ONLY ml_candidate_metadata ALTER TABLE ONLY ml_candidate_metadata
ADD CONSTRAINT fk_rails_5117dddf22 FOREIGN KEY (candidate_id) REFERENCES ml_candidates(id) ON DELETE CASCADE; ADD CONSTRAINT fk_rails_5117dddf22 FOREIGN KEY (candidate_id) REFERENCES ml_candidates(id) ON DELETE CASCADE;
   
ALTER TABLE zoekt_tasks
ADD CONSTRAINT fk_rails_51af186590 FOREIGN KEY (zoekt_node_id) REFERENCES zoekt_nodes(id) ON DELETE CASCADE;
ALTER TABLE ONLY ml_models ALTER TABLE ONLY ml_models
ADD CONSTRAINT fk_rails_51e87f7c50 FOREIGN KEY (project_id) REFERENCES projects(id); ADD CONSTRAINT fk_rails_51e87f7c50 FOREIGN KEY (project_id) REFERENCES projects(id);
   
...@@ -9,6 +9,8 @@ class Node < ApplicationRecord ...@@ -9,6 +9,8 @@ class Node < ApplicationRecord
foreign_key: :zoekt_node_id, inverse_of: :node, class_name: '::Search::Zoekt::Index' foreign_key: :zoekt_node_id, inverse_of: :node, class_name: '::Search::Zoekt::Index'
has_many :enabled_namespaces, has_many :enabled_namespaces,
through: :indices, source: :zoekt_enabled_namespace, class_name: '::Search::Zoekt::EnabledNamespace' through: :indices, source: :zoekt_enabled_namespace, class_name: '::Search::Zoekt::EnabledNamespace'
has_many :tasks,
foreign_key: :zoekt_node_id, inverse_of: :node, class_name: '::Search::Zoekt::Task'
validates :index_base_url, presence: true validates :index_base_url, presence: true
validates :search_base_url, presence: true validates :search_base_url, presence: true
......
...@@ -9,6 +9,9 @@ class Repository < ApplicationRecord ...@@ -9,6 +9,9 @@ class Repository < ApplicationRecord
belongs_to :project, inverse_of: :zoekt_repository, class_name: 'Project' belongs_to :project, inverse_of: :zoekt_repository, class_name: 'Project'
has_many :tasks,
foreign_key: :zoekt_repository_id, inverse_of: :zoekt_repository, class_name: '::Search::Zoekt::Task'
before_validation :set_project_identifier before_validation :set_project_identifier
validates_presence_of :zoekt_index_id, :project_identifier, :state validates_presence_of :zoekt_index_id, :project_identifier, :state
......
# frozen_string_literal: true
module Search
module Zoekt
class Task < ApplicationRecord
PARTITION_DURATION = 1.day
include PartitionedTable
include IgnorableColumns
self.table_name = 'zoekt_tasks'
self.primary_key = :id
ignore_column :partition_id, remove_never: true
belongs_to :node, foreign_key: :zoekt_node_id, inverse_of: :tasks, class_name: '::Search::Zoekt::Node'
belongs_to :zoekt_repository, inverse_of: :tasks, class_name: '::Search::Zoekt::Repository'
scope :for_partition, ->(partition) { where(partition_id: partition) }
enum state: {
pending: 0,
done: 10,
failed: 255,
orphaned: 256
}
enum task_type: {
index_repo: 0,
force_index_repo: 1,
delete_repo: 50
}
partitioned_by :partition_id,
strategy: :sliding_list,
next_partition_if: ->(active_partition) do
oldest_record_in_partition = Task
.select(:id, :created_at)
.for_partition(active_partition.value)
.order(:id)
.first
oldest_record_in_partition.present? &&
oldest_record_in_partition.created_at < PARTITION_DURATION.ago
end,
detach_partition_if: ->(partition) do
!Task
.for_partition(partition.value)
.where(state: :pending)
.exists?
end
end
end
end
# frozen_string_literal: true
FactoryBot.define do
factory :zoekt_task, class: '::Search::Zoekt::Task' do
node { association(:zoekt_node) }
zoekt_repository { association(:zoekt_repository) }
project_identifier { zoekt_repository.project.id }
task_type { :index_repo }
end
end
...@@ -19,6 +19,7 @@ ...@@ -19,6 +19,7 @@
describe 'relations' do describe 'relations' do
it { is_expected.to have_many(:indices).inverse_of(:node) } it { is_expected.to have_many(:indices).inverse_of(:node) }
it { is_expected.to have_many(:tasks).inverse_of(:node) }
it { is_expected.to have_many(:enabled_namespaces).through(:indices) } it { is_expected.to have_many(:enabled_namespaces).through(:indices) }
end end
......
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe ::Search::Zoekt::Task, feature_category: :global_search do
subject(:task) { create(:zoekt_task) }
describe 'relations' do
it { is_expected.to belong_to(:node).inverse_of(:tasks) }
it { is_expected.to belong_to(:zoekt_repository).inverse_of(:tasks) }
end
describe 'sliding_list partitioning' do
let(:partition_manager) { Gitlab::Database::Partitioning::PartitionManager.new(described_class) }
describe 'next_partition_if callback' do
let(:active_partition) { described_class.partitioning_strategy.active_partition }
subject(:value) { described_class.partitioning_strategy.next_partition_if.call(active_partition) }
context 'when the partition is empty' do
it { is_expected.to eq(false) }
end
context 'when the partition has records' do
before do
create(:zoekt_task, state: :pending)
create(:zoekt_task, state: :done)
create(:zoekt_task, state: :failed)
end
it { is_expected.to eq(false) }
context 'when the first record of the partition is older than PARTITION_DURATION' do
before do
described_class.first.update!(created_at: (described_class::PARTITION_DURATION + 1.day).ago)
end
it { is_expected.to eq(true) }
end
end
end
describe 'detach_partition_if callback' do
let(:active_partition) { described_class.partitioning_strategy.active_partition }
subject(:value) { described_class.partitioning_strategy.detach_partition_if.call(active_partition) }
before_all do
create(:zoekt_task, state: :pending)
create(:zoekt_task, state: :done)
end
context 'when the partition contains unprocessed records' do
it { is_expected.to eq(false) }
end
context 'when the partition contains only processed records' do
before do
described_class.update_all(state: :done)
end
it { is_expected.to eq(true) }
end
end
end
end
...@@ -139,7 +139,8 @@ ...@@ -139,7 +139,8 @@
ml_candidates: %w[internal_id], ml_candidates: %w[internal_id],
value_stream_dashboard_counts: %w[namespace_id], value_stream_dashboard_counts: %w[namespace_id],
zoekt_indices: %w[namespace_id], # needed for cells sharding key zoekt_indices: %w[namespace_id], # needed for cells sharding key
zoekt_repositories: %w[namespace_id project_identifier] # needed for cells sharding key zoekt_repositories: %w[namespace_id project_identifier], # needed for cells sharding key
zoekt_tasks: %w[project_identifier partition_id zoekt_repository_id] # needed for: cells sharding key, partitioning, and performance reasons
}.with_indifferent_access.freeze }.with_indifferent_access.freeze
context 'for table' do context 'for table' do
......
...@@ -50,8 +50,10 @@ ...@@ -50,8 +50,10 @@
'value_stream_dashboard_counts.namespace_id', # https://gitlab.com/gitlab-org/gitlab/-/issues/439555 'value_stream_dashboard_counts.namespace_id', # https://gitlab.com/gitlab-org/gitlab/-/issues/439555
'zoekt_indices.namespace_id', 'zoekt_indices.namespace_id',
'zoekt_repositories.project_identifier', 'zoekt_repositories.project_identifier',
'zoekt_tasks.project_identifier',
'ci_namespace_monthly_usages.namespace_id', # https://gitlab.com/gitlab-org/gitlab/-/issues/321400 'ci_namespace_monthly_usages.namespace_id', # https://gitlab.com/gitlab-org/gitlab/-/issues/321400
'ci_job_artifacts.project_id' 'ci_job_artifacts.project_id',
'ci_namespace_monthly_usages.namespace_id' # https://gitlab.com/gitlab-org/gitlab/-/issues/321400
] ]
end end
......
0% 加载中 .
You are about to add 0 people to the discussion. Proceed with caution.
想要评论请 注册