diff --git a/app/workers/concerns/worker_context.rb b/app/workers/concerns/worker_context.rb index d85565e3446d44451eb14f501e90e0a2cfc8e400..ca006eaad5d45a245c928d58dee68c400e3b11ef 100644 --- a/app/workers/concerns/worker_context.rb +++ b/app/workers/concerns/worker_context.rb @@ -12,8 +12,46 @@ def get_worker_context @worker_context || superclass_context end + def bulk_perform_async_with_contexts(objects, arguments_proc:, context_proc:) + with_batch_contexts(objects, arguments_proc, context_proc) do |arguments| + bulk_perform_async(arguments) + end + end + + def bulk_perform_in_with_contexts(delay, objects, arguments_proc:, context_proc:) + with_batch_contexts(objects, arguments_proc, context_proc) do |arguments| + bulk_perform_in(delay, arguments) + end + end + + def context_for_arguments(args) + batch_context&.context_for(args) + end + private + BATCH_CONTEXT_KEY = "#{name}_batch_context" + + def batch_context + Thread.current[BATCH_CONTEXT_KEY] + end + + def batch_context=(value) + Thread.current[BATCH_CONTEXT_KEY] = value + end + + def with_batch_contexts(objects, arguments_proc, context_proc) + self.batch_context = Gitlab::BatchWorkerContext.new( + objects, + arguments_proc: arguments_proc, + context_proc: context_proc + ) + + yield(batch_context.arguments) + ensure + self.batch_context = nil + end + def superclass_context return unless superclass.include?(WorkerContext) diff --git a/ee/app/workers/update_all_mirrors_worker.rb b/ee/app/workers/update_all_mirrors_worker.rb index 57b3a277f080644a7c18b651ce0c703da2481310..60cd732a855c5483adda14206b6eb52722868090 100644 --- a/ee/app/workers/update_all_mirrors_worker.rb +++ b/ee/app/workers/update_all_mirrors_worker.rb @@ -47,11 +47,12 @@ def schedule_mirrors! projects = pull_mirrors_batch(freeze_at: now, batch_size: batch_size, offset_at: last).to_a break if projects.empty? - project_ids = projects.lazy.select(&:mirror?).take(capacity).map(&:id).force - capacity -= project_ids.length + projects_to_schedule = projects.lazy.select(&:mirror?).take(capacity).force + capacity -= projects_to_schedule.size - ProjectImportScheduleWorker.bulk_perform_async(project_ids.map { |id| [id] }) - scheduled += project_ids.length + schedule_projects_in_batch(projects_to_schedule) + + scheduled += projects_to_schedule.length # If fewer than `batch_size` projects were returned, we don't need to query again break if projects.length < batch_size @@ -95,6 +96,7 @@ def pull_mirrors_batch(freeze_at:, batch_size:, offset_at: nil) .mirrors_to_sync(freeze_at) .reorder('import_state.next_execution_timestamp') .limit(batch_size) + .with_route .with_namespace # Used by `project.mirror?` relation = relation.where('import_state.next_execution_timestamp > ?', offset_at) if offset_at @@ -102,4 +104,12 @@ def pull_mirrors_batch(freeze_at:, batch_size:, offset_at: nil) relation end # rubocop: enable CodeReuse/ActiveRecord + + def schedule_projects_in_batch(projects) + ProjectImportScheduleWorker.bulk_perform_async_with_contexts( + projects, + arguments_proc: -> (project) { project.id }, + context_proc: -> (project) { { project: project } } + ) + end end diff --git a/lib/gitlab/batch_worker_context.rb b/lib/gitlab/batch_worker_context.rb new file mode 100644 index 0000000000000000000000000000000000000000..0589206fefc2f8423820abffd79ae6a9d8d8b174 --- /dev/null +++ b/lib/gitlab/batch_worker_context.rb @@ -0,0 +1,32 @@ +# frozen_string_literal: true + +module Gitlab + class BatchWorkerContext + def initialize(objects, arguments_proc:, context_proc:) + @objects = objects + @arguments_proc = arguments_proc + @context_proc = context_proc + end + + def arguments + context_by_arguments.keys + end + + def context_for(arguments) + context_by_arguments[arguments] + end + + private + + attr_reader :objects, :arguments_proc, :context_proc + + def context_by_arguments + @context_by_arguments ||= objects.each_with_object({}) do |object, result| + arguments = Array.wrap(arguments_proc.call(object)) + context = Gitlab::ApplicationContext.new(context_proc.call(object)) + + result[arguments] = context + end + end + end +end diff --git a/lib/gitlab/sidekiq_middleware.rb b/lib/gitlab/sidekiq_middleware.rb index b19853a17023b67a4e529317a195b3554d46b26e..439d45b7a14827ea7c545e7741c95ac76ea3613f 100644 --- a/lib/gitlab/sidekiq_middleware.rb +++ b/lib/gitlab/sidekiq_middleware.rb @@ -29,6 +29,7 @@ def self.client_configurator lambda do |chain| chain.add Gitlab::SidekiqStatus::ClientMiddleware chain.add Gitlab::SidekiqMiddleware::ClientMetrics + chain.add Gitlab::SidekiqMiddleware::WorkerContext::Client # needs to be before the Labkit middleware chain.add Labkit::Middleware::Sidekiq::Client end end diff --git a/lib/gitlab/sidekiq_middleware/worker_context.rb b/lib/gitlab/sidekiq_middleware/worker_context.rb new file mode 100644 index 0000000000000000000000000000000000000000..897a9211948d790feb19abd55b58c8d3a73eb4e3 --- /dev/null +++ b/lib/gitlab/sidekiq_middleware/worker_context.rb @@ -0,0 +1,15 @@ +# frozen_string_literal: true + +module Gitlab + module SidekiqMiddleware + module WorkerContext + private + + def wrap_in_optional_context(context_or_nil, &block) + return yield unless context_or_nil + + context_or_nil.use(&block) + end + end + end +end diff --git a/lib/gitlab/sidekiq_middleware/worker_context/client.rb b/lib/gitlab/sidekiq_middleware/worker_context/client.rb new file mode 100644 index 0000000000000000000000000000000000000000..0eb52179db2a72bd0bd3d8334393261c2b184718 --- /dev/null +++ b/lib/gitlab/sidekiq_middleware/worker_context/client.rb @@ -0,0 +1,23 @@ +# frozen_string_literal: true + +module Gitlab + module SidekiqMiddleware + module WorkerContext + class Client + include Gitlab::SidekiqMiddleware::WorkerContext + + def call(worker_class_or_name, job, _queue, _redis_pool, &block) + worker_class = worker_class_or_name.to_s.safe_constantize + + # Mailers can't be constantized like this + return yield unless worker_class + return yield unless worker_class.include?(::ApplicationWorker) + + context_for_args = worker_class.context_for_arguments(job['args']) + + wrap_in_optional_context(context_for_args, &block) + end + end + end + end +end diff --git a/lib/gitlab/sidekiq_middleware/worker_context/server.rb b/lib/gitlab/sidekiq_middleware/worker_context/server.rb index 29d98ad16a9c0b6de3569f5725a5f38f43ab7cb8..d2d84742c17fda12e40fbbe3aec76865063e890a 100644 --- a/lib/gitlab/sidekiq_middleware/worker_context/server.rb +++ b/lib/gitlab/sidekiq_middleware/worker_context/server.rb @@ -4,6 +4,8 @@ module Gitlab module SidekiqMiddleware module WorkerContext class Server + include Gitlab::SidekiqMiddleware::WorkerContext + def call(worker, job, _queue, &block) worker_class = worker.class @@ -13,14 +15,6 @@ def call(worker, job, _queue, &block) # Use the context defined on the class level as a base context wrap_in_optional_context(worker_class.get_worker_context, &block) end - - private - - def wrap_in_optional_context(context, &block) - return yield unless context - - context.use(&block) - end end end end diff --git a/spec/lib/gitlab/batch_worker_context_spec.rb b/spec/lib/gitlab/batch_worker_context_spec.rb new file mode 100644 index 0000000000000000000000000000000000000000..0ba30287ae5363000516840ff644282b921b7fe6 --- /dev/null +++ b/spec/lib/gitlab/batch_worker_context_spec.rb @@ -0,0 +1,28 @@ +# frozen_string_literal: true + +require 'spec_helper' + +describe Gitlab::BatchWorkerContext do + subject(:batch_context) do + described_class.new( + %w(hello world), + arguments_proc: -> (word) { word }, + context_proc: -> (word) { { user: build_stubbed(:user, username: word) } } + ) + end + + describe "#arguments" do + it "returns all the expected arguments in arrays" do + expect(batch_context.arguments).to eq([%w(hello), %w(world)]) + end + end + + describe "#context_for" do + it "returns the correct application context for the arguments" do + context = batch_context.context_for(%w(world)) + + expect(context).to be_a(Gitlab::ApplicationContext) + expect(context.to_lazy_hash[:user].call).to eq("world") + end + end +end diff --git a/spec/lib/gitlab/sidekiq_middleware/worker_context/client_spec.rb b/spec/lib/gitlab/sidekiq_middleware/worker_context/client_spec.rb new file mode 100644 index 0000000000000000000000000000000000000000..9cb89b1bc10e080049ebc08a89e93ddc177685a5 --- /dev/null +++ b/spec/lib/gitlab/sidekiq_middleware/worker_context/client_spec.rb @@ -0,0 +1,45 @@ +# frozen_string_literal: true + +require 'spec_helper' + +describe Gitlab::SidekiqMiddleware::WorkerContext::Client do + let(:worker_class) do + Class.new do + def self.name + 'TestWithContextWorker' + end + + include ApplicationWorker + + def self.job_for_args(args) + jobs.find { |job| job['args'] == args } + end + + def perform(*args) + end + end + end + + before do + stub_const('TestWithContextWorker', worker_class) + end + + describe "#call" do + it 'applies a context for jobs scheduled in batch' do + user_per_job = { 'job1' => build_stubbed(:user, username: 'user-1'), + 'job2' => build_stubbed(:user, username: 'user-2') } + + TestWithContextWorker.bulk_perform_async_with_contexts( + %w(job1 job2), + arguments_proc: -> (name) { [name, 1, 2, 3] }, + context_proc: -> (name) { { user: user_per_job[name] } } + ) + + job1 = TestWithContextWorker.job_for_args(['job1', 1, 2, 3]) + job2 = TestWithContextWorker.job_for_args(['job2', 1, 2, 3]) + + expect(job1['meta.user']).to eq(user_per_job['job1'].username) + expect(job2['meta.user']).to eq(user_per_job['job2'].username) + end + end +end diff --git a/spec/lib/gitlab/sidekiq_middleware_spec.rb b/spec/lib/gitlab/sidekiq_middleware_spec.rb index b3c0a5b04f0c2f27654cf73a4fe7f9d0a818e610..e8dcbbd2ee1fe76eb6ba7dabdc3229fe1b18116b 100644 --- a/spec/lib/gitlab/sidekiq_middleware_spec.rb +++ b/spec/lib/gitlab/sidekiq_middleware_spec.rb @@ -110,6 +110,14 @@ def perform(_arg) let(:queue) { 'default' } let(:redis_pool) { Sidekiq.redis_pool } let(:middleware_expected_args) { [worker_class_arg, job, queue, redis_pool] } + let(:expected_middlewares) do + [ + Gitlab::SidekiqStatus::ClientMiddleware, + Gitlab::SidekiqMiddleware::ClientMetrics, + Gitlab::SidekiqMiddleware::WorkerContext::Client, + Labkit::Middleware::Sidekiq::Client + ] + end before do described_class.client_configurator.call(chain) @@ -120,8 +128,9 @@ def perform(_arg) # this will prevent the full middleware chain from being executed. # This test ensures that this does not happen it "invokes the chain" do - expect_any_instance_of(Gitlab::SidekiqStatus::ClientMiddleware).to receive(:call).with(*middleware_expected_args).once.and_call_original - expect_any_instance_of(Labkit::Middleware::Sidekiq::Client).to receive(:call).with(*middleware_expected_args).once.and_call_original + expected_middlewares do |middleware| + expect_any_instance_of(middleware).to receive(:call).with(*middleware_expected_args).once.ordered.and_call_original + end expect { |b| chain.invoke(worker_class_arg, job, queue, redis_pool, &b) }.to yield_control.once end diff --git a/spec/workers/concerns/worker_context_spec.rb b/spec/workers/concerns/worker_context_spec.rb index a7d0ba2b8bdf49403dc8fe9838c6897565183142..97a88eecd733f788655619389877f9ddf79e6db8 100644 --- a/spec/workers/concerns/worker_context_spec.rb +++ b/spec/workers/concerns/worker_context_spec.rb @@ -5,7 +5,11 @@ describe WorkerContext do let(:worker) do Class.new do - include WorkerContext + def self.name + "TestWorker" + end + + include ApplicationWorker end end @@ -24,6 +28,78 @@ end end + shared_examples 'tracking bulk scheduling contexts' do + describe "context contents" do + before do + # stub clearing the contexts, so we can check what's inside + allow(worker).to receive(:batch_context=).and_call_original + allow(worker).to receive(:batch_context=).with(nil) + end + + it 'keeps track of the context per key to schedule' do + subject + + expect(worker.context_for_arguments(["hello"])).to be_a(Gitlab::ApplicationContext) + end + + it 'does not share contexts across threads' do + t1_context = nil + t2_context = nil + + Thread.new do + subject + + t1_context = worker.context_for_arguments(["hello"]) + end.join + Thread.new do + t2_context = worker.context_for_arguments(["hello"]) + end.join + + expect(t1_context).to be_a(Gitlab::ApplicationContext) + expect(t2_context).to be_nil + end + end + + it 'clears the contexts' do + subject + + expect(worker.__send__(:batch_context)).to be_nil + end + end + + describe '.bulk_perform_async_with_contexts' do + subject do + worker.bulk_perform_async_with_contexts(%w(hello world), + context_proc: -> (_) { { user: build_stubbed(:user) } }, + arguments_proc: -> (word) { word }) + end + + it 'calls bulk_perform_async with the arguments' do + expect(worker).to receive(:bulk_perform_async).with([["hello"], ["world"]]) + + subject + end + + it_behaves_like 'tracking bulk scheduling contexts' + end + + describe '.bulk_perform_in_with_contexts' do + subject do + worker.bulk_perform_in_with_contexts(10.minutes, + %w(hello world), + context_proc: -> (_) { { user: build_stubbed(:user) } }, + arguments_proc: -> (word) { word }) + end + + it 'calls bulk_perform_in with the arguments and delay' do + expect(worker).to receive(:bulk_perform_in).with(10.minutes, [["hello"], ["world"]]) + + subject + end + + it_behaves_like 'tracking bulk scheduling contexts' + end + describe '#with_context' do it 'allows modifying context when the job is running' do worker.new.with_context(user: build_stubbed(:user, username: 'jane-doe')) do