From e0fe59dbfc10f8c86a7f07b682c5619829dfcad9 Mon Sep 17 00:00:00 2001
From: Valery Sizov <valery@gitlab.com>
Date: Wed, 10 Jun 2020 05:18:00 +0000
Subject: [PATCH] Geo: Implement backfill of PackageFiles/Blobs

Implements backfill of PackageFiles/Blobs in  a scope of
Self-Serivce Framework
---
 config/gitlab.yml.example                     |   5 +
 config/initializers/1_settings.rb             |   3 +
 ee/app/models/geo/base_registry.rb            |  17 +++
 ee/app/models/geo/package_file_registry.rb    |   3 +
 ee/app/workers/all_queues.yml                 |   8 ++
 ee/app/workers/geo/registry_sync_worker.rb    |  84 ++++++++++++
 ee/lib/gitlab/geo/cron_manager.rb             |   1 +
 ee/lib/gitlab/geo/replicable_model.rb         |  12 ++
 ee/lib/gitlab/geo/replicator.rb               |   5 +
 ee/spec/lib/gitlab/geo/cron_manager_spec.rb   |   2 +
 .../models/geo/package_file_registry_spec.rb  |   2 +
 .../geo_framework_registry_shared_examples.rb |  44 +++++++
 .../workers/geo/registry_sync_worker_spec.rb  | 123 ++++++++++++++++++
 13 files changed, 309 insertions(+)
 create mode 100644 ee/app/workers/geo/registry_sync_worker.rb
 create mode 100644 ee/spec/support/shared_examples/models/geo_framework_registry_shared_examples.rb
 create mode 100644 ee/spec/workers/geo/registry_sync_worker_spec.rb

diff --git a/config/gitlab.yml.example b/config/gitlab.yml.example
index 03a453534656..9c0fa7832504 100644
--- a/config/gitlab.yml.example
+++ b/config/gitlab.yml.example
@@ -469,6 +469,11 @@ production: &base
     geo_file_download_dispatch_worker:
       cron: "*/1 * * * *"
 
+    # GitLab Geo registry sync worker (for backfilling)
+    # NOTE: This will only take effect if Geo is enabled (secondary nodes only)
+    geo_registry_sync_worker:
+      cron: "*/1 * * * *"
+
     # GitLab Geo migrated local files clean up worker
     # NOTE: This will only take effect if Geo is enabled (secondary nodes only)
     geo_migrated_local_files_clean_up_worker:
diff --git a/config/initializers/1_settings.rb b/config/initializers/1_settings.rb
index 2699d0984dc9..df552e33fd06 100644
--- a/config/initializers/1_settings.rb
+++ b/config/initializers/1_settings.rb
@@ -513,6 +513,9 @@
   Settings.cron_jobs['geo_file_download_dispatch_worker'] ||= Settingslogic.new({})
   Settings.cron_jobs['geo_file_download_dispatch_worker']['cron'] ||= '*/1 * * * *'
   Settings.cron_jobs['geo_file_download_dispatch_worker']['job_class'] ||= 'Geo::FileDownloadDispatchWorker'
+  Settings.cron_jobs['geo_registry_sync_worker'] ||= Settingslogic.new({})
+  Settings.cron_jobs['geo_registry_sync_worker']['cron'] ||= '*/1 * * * *'
+  Settings.cron_jobs['geo_registry_sync_worker']['job_class'] ||= 'Geo::RegistrySyncWorker'
   Settings.cron_jobs['geo_metrics_update_worker'] ||= Settingslogic.new({})
   Settings.cron_jobs['geo_metrics_update_worker']['cron'] ||= '*/1 * * * *'
   Settings.cron_jobs['geo_metrics_update_worker']['job_class'] ||= 'Geo::MetricsUpdateWorker'
diff --git a/ee/app/models/geo/base_registry.rb b/ee/app/models/geo/base_registry.rb
index fafe75a16cd9..2d4fb8ae4287 100644
--- a/ee/app/models/geo/base_registry.rb
+++ b/ee/app/models/geo/base_registry.rb
@@ -34,4 +34,21 @@ def self.insert_for_model_ids(ids)
   def self.delete_for_model_ids(ids)
     raise NotImplementedError, "#{self.class} does not implement #{__method__}"
   end
+
+  def self.find_unsynced_registries(batch_size:, except_ids: [])
+    pending
+      .model_id_not_in(except_ids)
+      .limit(batch_size)
+  end
+
+  def self.find_failed_registries(batch_size:, except_ids: [])
+    failed
+      .retry_due
+      .model_id_not_in(except_ids)
+      .limit(batch_size)
+  end
+
+  def model_record_id
+    read_attribute(self.class::MODEL_FOREIGN_KEY)
+  end
 end
diff --git a/ee/app/models/geo/package_file_registry.rb b/ee/app/models/geo/package_file_registry.rb
index 7d9744784ee6..c7b3867b2b85 100644
--- a/ee/app/models/geo/package_file_registry.rb
+++ b/ee/app/models/geo/package_file_registry.rb
@@ -4,6 +4,8 @@ class Geo::PackageFileRegistry < Geo::BaseRegistry
   include ::Delay
   include ShaAttribute
 
+  MODEL_FOREIGN_KEY = :package_file_id
+
   def self.declarative_policy_class
     'Geo::RegistryPolicy'
   end
@@ -20,6 +22,7 @@ def self.declarative_policy_class
   scope :never, -> { where(last_synced_at: nil) }
   scope :failed, -> { with_state(:failed) }
   scope :synced, -> { with_state(:synced) }
+  scope :pending, -> { with_state(:pending) }
   scope :retry_due, -> { where(arel_table[:retry_at].eq(nil).or(arel_table[:retry_at].lt(Time.current))) }
   scope :ordered, -> { order(:id) }
 
diff --git a/ee/app/workers/all_queues.yml b/ee/app/workers/all_queues.yml
index c65cdbc5202c..a1db755a329b 100644
--- a/ee/app/workers/all_queues.yml
+++ b/ee/app/workers/all_queues.yml
@@ -75,6 +75,14 @@
   :weight: 1
   :idempotent: 
   :tags: []
+- :name: cronjob:geo_registry_sync
+  :feature_category: :geo_replication
+  :has_external_dependencies: 
+  :urgency: :low
+  :resource_boundary: :unknown
+  :weight: 1
+  :idempotent: true
+  :tags: []
 - :name: cronjob:geo_repository_sync
   :feature_category: :geo_replication
   :has_external_dependencies: 
diff --git a/ee/app/workers/geo/registry_sync_worker.rb b/ee/app/workers/geo/registry_sync_worker.rb
new file mode 100644
index 000000000000..6ccf3ece8bc7
--- /dev/null
+++ b/ee/app/workers/geo/registry_sync_worker.rb
@@ -0,0 +1,84 @@
+# frozen_string_literal: true
+
+module Geo
+  class RegistrySyncWorker < Geo::Scheduler::Secondary::SchedulerWorker
+    # This worker does not perform work scoped to a context
+    include CronjobQueue # rubocop:disable Scalability/CronWorkerContext
+
+    idempotent!
+
+    private
+
+    # We use inexpensive queries now so we don't need a backoff time
+    #
+    # Overrides Geo::Scheduler::SchedulerWorker#should_apply_backoff?
+    def should_apply_backoff?
+      false
+    end
+
+    def max_capacity
+      # Transition-period-solution.
+      # Explained in https://gitlab.com/gitlab-org/gitlab/-/issues/213872#note_336828581
+      [current_node.files_max_capacity / 4, 1].max
+    end
+
+    def schedule_job(replicable_name, model_record_id)
+      job_id = ::Geo::EventWorker.perform_async(replicable_name, :created, model_record_id: model_record_id)
+
+      { model_record_id: model_record_id, replicable_name: replicable_name, job_id: job_id } if job_id
+    end
+
+    # Pools for new resources to be transferred
+    #
+    # @return [Array] resources to be transferred
+    def load_pending_resources
+      resources = find_unsynced_jobs(batch_size: db_retrieve_batch_size)
+      remaining_capacity = db_retrieve_batch_size - resources.count
+
+      if remaining_capacity.zero?
+        resources
+      else
+        resources + find_low_priority_jobs(batch_size: remaining_capacity)
+      end
+    end
+
+    # Get a batch of unsynced resources, taking equal parts from each resource.
+    #
+    # @return [Array] job arguments of unsynced resources
+    def find_unsynced_jobs(batch_size:)
+      jobs = replicator_classes.reduce([]) do |jobs, replicator_class|
+        except_ids = scheduled_replicable_ids(replicator_class.replicable_name)
+
+        jobs << replicator_class
+                  .find_unsynced_registries(batch_size: batch_size, except_ids: except_ids)
+                  .map { |registry| [replicator_class.replicable_name, registry.model_record_id] }
+      end
+
+      take_batch(*jobs, batch_size: batch_size)
+    end
+
+    # Get a batch of failed and synced-but-missing-on-primary resources, taking
+    # equal parts from each resource.
+    #
+    # @return [Array] job arguments of low priority resources
+    def find_low_priority_jobs(batch_size:)
+      jobs = replicator_classes.reduce([]) do |jobs, replicator_class|
+        except_ids = scheduled_replicable_ids(replicator_class.replicable_name)
+
+        jobs << replicator_class
+                  .find_failed_registries(batch_size: batch_size, except_ids: except_ids)
+                  .map { |registry| [replicator_class.replicable_name, registry.model_record_id] }
+      end
+
+      take_batch(*jobs, batch_size: batch_size)
+    end
+
+    def scheduled_replicable_ids(replicable_name)
+      scheduled_jobs.select { |data| data[:replicable_name] == replicable_name }.map { |data| data[:model_record_id] }
+    end
+
+    def replicator_classes
+      Gitlab::Geo::ReplicableModel.replicators
+    end
+  end
+end
diff --git a/ee/lib/gitlab/geo/cron_manager.rb b/ee/lib/gitlab/geo/cron_manager.rb
index 07c45a768746..f6b2d2feb869 100644
--- a/ee/lib/gitlab/geo/cron_manager.rb
+++ b/ee/lib/gitlab/geo/cron_manager.rb
@@ -15,6 +15,7 @@ class CronManager
 
       SECONDARY_JOBS = %w[
         geo_file_download_dispatch_worker
+        geo_registry_sync_worker
         geo_migrated_local_files_clean_up_worker
         geo_repository_sync_worker
         geo_container_repository_sync_worker
diff --git a/ee/lib/gitlab/geo/replicable_model.rb b/ee/lib/gitlab/geo/replicable_model.rb
index 7117bd58d16e..6c2b3d02bb9c 100644
--- a/ee/lib/gitlab/geo/replicable_model.rb
+++ b/ee/lib/gitlab/geo/replicable_model.rb
@@ -24,6 +24,8 @@ module ReplicableModel
         def with_replicator(klass)
           raise ArgumentError, 'Must be a class inheriting from Gitlab::Geo::Replicator' unless klass < ::Gitlab::Geo::Replicator
 
+          Gitlab::Geo::ReplicableModel.add_replicator(klass)
+
           class_eval <<-RUBY, __FILE__, __LINE__ + 1
             define_method :replicator do
               @_replicator ||= klass.new(model_record: self)
@@ -32,6 +34,16 @@ def with_replicator(klass)
         end
       end
 
+      def self.add_replicator(klass)
+        @_replicators ||= []
+        @_replicators << klass
+      end
+
+      def self.replicators
+        @_replicators ||= []
+        @_replicators.filter { |replicator| const_defined?(replicator.to_s) }
+      end
+
       # Geo Replicator
       #
       # @abstract
diff --git a/ee/lib/gitlab/geo/replicator.rb b/ee/lib/gitlab/geo/replicator.rb
index 1b24edf3c9ba..9754d80db8f6 100644
--- a/ee/lib/gitlab/geo/replicator.rb
+++ b/ee/lib/gitlab/geo/replicator.rb
@@ -17,8 +17,13 @@ class Replicator
       CLASS_SUFFIXES = %w(RegistryFinder RegistriesResolver).freeze
 
       attr_reader :model_record_id
+
       delegate :model, to: :class
 
+      class << self
+        delegate :find_unsynced_registries, :find_failed_registries, to: :registry_class
+      end
+
       # Declare supported event
       #
       # @example Declaring support for :update and :delete events
diff --git a/ee/spec/lib/gitlab/geo/cron_manager_spec.rb b/ee/spec/lib/gitlab/geo/cron_manager_spec.rb
index 01e1fea22c2b..75b95f0f9fd4 100644
--- a/ee/spec/lib/gitlab/geo/cron_manager_spec.rb
+++ b/ee/spec/lib/gitlab/geo/cron_manager_spec.rb
@@ -11,6 +11,7 @@
     geo_repository_verification_primary_batch_worker
     geo_repository_sync_worker
     geo_file_download_dispatch_worker
+    geo_registry_sync_worker
     geo_container_repository_sync_worker
     geo_repository_verification_secondary_scheduler_worker
     geo_metrics_update_worker
@@ -35,6 +36,7 @@ def job(name)
     let(:secondary_jobs) do
       [
         job('geo_file_download_dispatch_worker'),
+        job('geo_registry_sync_worker'),
         job('geo_repository_sync_worker'),
         job('geo_container_repository_sync_worker'),
         job('geo_repository_verification_secondary_scheduler_worker'),
diff --git a/ee/spec/models/geo/package_file_registry_spec.rb b/ee/spec/models/geo/package_file_registry_spec.rb
index 8429ef06b0c1..b757b13f9d24 100644
--- a/ee/spec/models/geo/package_file_registry_spec.rb
+++ b/ee/spec/models/geo/package_file_registry_spec.rb
@@ -7,4 +7,6 @@
     let(:valid_items_for_bulk_insertion) { build_list(:package_file_registry, 10, created_at: Time.zone.now) }
     let(:invalid_items_for_bulk_insertion) { [] } # class does not have any validations defined
   end
+
+  include_examples 'a Geo framework registry'
 end
diff --git a/ee/spec/support/shared_examples/models/geo_framework_registry_shared_examples.rb b/ee/spec/support/shared_examples/models/geo_framework_registry_shared_examples.rb
new file mode 100644
index 000000000000..4e3397368bf5
--- /dev/null
+++ b/ee/spec/support/shared_examples/models/geo_framework_registry_shared_examples.rb
@@ -0,0 +1,44 @@
+# frozen_string_literal: true
+
+shared_examples 'a Geo framework registry' do
+  let(:registry_class_factory) { described_class.underscore.tr('/', '_').sub('geo_', '').to_sym }
+
+  let!(:failed_item1) { create(registry_class_factory, :failed) }
+  let!(:failed_item2) { create(registry_class_factory, :failed) }
+  let!(:unsynced_item1) { create(registry_class_factory) }
+  let!(:unsynced_item2) { create(registry_class_factory) }
+
+  describe '.find_unsynced_registries' do
+    it 'returns unsynced items' do
+      result = described_class.find_unsynced_registries(batch_size: 10)
+
+      expect(result).to include(unsynced_item1, unsynced_item2)
+    end
+
+    it 'returns unsynced items except some specific item ID' do
+      except_id = unsynced_item1.model_record_id
+
+      result = described_class.find_unsynced_registries(batch_size: 10, except_ids: [except_id])
+
+      expect(result).to include(unsynced_item2)
+      expect(result).not_to include(unsynced_item1)
+    end
+  end
+
+  describe '.find_failed_registries' do
+    it 'returns failed items' do
+      result = described_class.find_failed_registries(batch_size: 10)
+
+      expect(result).to include(failed_item1, failed_item2)
+    end
+
+    it 'returns failed items except some specific item ID' do
+      except_id = failed_item1.model_record_id
+
+      result = described_class.find_failed_registries(batch_size: 10, except_ids: [except_id])
+
+      expect(result).to include(failed_item2)
+      expect(result).not_to include(failed_item1)
+    end
+  end
+end
diff --git a/ee/spec/workers/geo/registry_sync_worker_spec.rb b/ee/spec/workers/geo/registry_sync_worker_spec.rb
new file mode 100644
index 000000000000..0174f3fa754d
--- /dev/null
+++ b/ee/spec/workers/geo/registry_sync_worker_spec.rb
@@ -0,0 +1,123 @@
+# frozen_string_literal: true
+
+require 'spec_helper'
+
+describe Geo::RegistrySyncWorker, :geo, :use_sql_query_cache_for_tracking_db do
+  include ::EE::GeoHelpers
+  include ExclusiveLeaseHelpers
+
+  let(:primary)   { create(:geo_node, :primary) }
+  let(:secondary) { create(:geo_node) }
+
+  before do
+    stub_current_geo_node(secondary)
+    stub_exclusive_lease(renew: true)
+
+    allow_next_instance_of(described_class) do |instance|
+      allow(instance).to receive(:over_time?).and_return(false)
+    end
+  end
+
+  it 'does not schedule anything when tracking database is not configured' do
+    create(:package_file_registry)
+
+    expect(::Geo::EventWorker).not_to receive(:perform_async)
+
+    with_no_geo_database_configured do
+      subject.perform
+    end
+  end
+
+  it 'does not schedule anything when node is disabled' do
+    create(:package_file_registry)
+
+    secondary.enabled = false
+    secondary.save!
+
+    expect(::Geo::EventWorker).not_to receive(:perform_async)
+
+    subject.perform
+  end
+
+  it 'does not schedule duplicated jobs' do
+    package_file_1 = create(:package_file_registry)
+    package_file_2 = create(:package_file_registry)
+
+    stub_const('Geo::Scheduler::SchedulerWorker::DB_RETRIEVE_BATCH_SIZE', 5)
+    secondary.update!(files_max_capacity: 8)
+    allow(Gitlab::SidekiqStatus).to receive(:job_status).with([]).and_return([]).twice
+    allow(Gitlab::SidekiqStatus).to receive(:job_status).with(array_including('123', '456')).and_return([true, true], [true, true], [false, false])
+
+    expect(::Geo::EventWorker)
+      .to receive(:perform_async)
+      .with('package_file', :created, { model_record_id: package_file_1.package_file.id })
+      .once
+      .and_return('123')
+    expect(::Geo::EventWorker)
+      .to receive(:perform_async)
+      .with('package_file', :created, { model_record_id: package_file_2.package_file.id })
+      .once
+      .and_return('456')
+
+    subject.perform
+  end
+
+  it 'does not schedule duplicated jobs because of query cache' do
+    package_file_1 = create(:package_file_registry)
+    package_file_2 = create(:package_file_registry)
+
+    # We retrieve all the items in a single batch
+    stub_const('Geo::Scheduler::SchedulerWorker::DB_RETRIEVE_BATCH_SIZE', 2)
+    # 8 / 4 = 2 We use one quarter of common files_max_capacity in the Geo::RegistrySyncWorker
+    secondary.update!(files_max_capacity: 8)
+
+    expect(Geo::EventWorker).to receive(:perform_async).with('package_file', :created, { model_record_id: package_file_1.package_file.id }).once do
+      Thread.new do
+        # Rails will invalidate the query cache if the update happens in the same thread
+        Geo::PackageFileRegistry.update(state: Geo::PackageFileRegistry::STATE_VALUES[:synced])
+      end
+    end
+
+    expect(Geo::EventWorker).to receive(:perform_async)
+                                  .with('package_file', :created, { model_record_id: package_file_2.package_file.id })
+                                  .once
+
+    subject.perform
+  end
+
+  # Test the case where we have:
+  #
+  # 1. A total of 10 files in the queue, and we can load a maximimum of 5 and send 2 at a time.
+  # 2. We send 2, wait for 1 to finish, and then send again.
+  it 'attempts to load a new batch without pending downloads' do
+    stub_const('Geo::Scheduler::SchedulerWorker::DB_RETRIEVE_BATCH_SIZE', 5)
+    # 8 / 4 = 2 We use one quarter of common files_max_capacity in the Geo::RegistrySyncWorker
+    secondary.update!(files_max_capacity: 8)
+
+    result_object = double(
+      :result,
+      success: true,
+      bytes_downloaded: 100,
+      primary_missing_file: false,
+      reason: '',
+      extra_details: {}
+    )
+
+    allow_any_instance_of(::Gitlab::Geo::Replication::BlobDownloader).to receive(:execute).and_return(result_object)
+
+    create_list(:package_file_registry, 10)
+
+    expect(::Geo::EventWorker).to receive(:perform_async).exactly(10).times.and_call_original
+    # For 10 downloads, we expect four database reloads:
+    # 1. Load the first batch of 5.
+    # 2. 4 get sent out, 1 remains. This triggers another reload, which loads in the next 5.
+    # 3. Those 4 get sent out, and 1 remains.
+    # 3. Since the second reload filled the pipe with 4, we need to do a final reload to ensure
+    #    zero are left.
+    expect(subject).to receive(:load_pending_resources).exactly(4).times.and_call_original
+
+    Sidekiq::Testing.inline! do
+      subject.perform
+    end
+  end
+end
-- 
GitLab