Skip to content
代码片段 群组 项目
未验证 提交 49136d0c 编辑于 作者: Sean McGivern's avatar Sean McGivern
浏览文件

Exit with failure code when sidekiq-cluster child process fails

sidekiq-cluster handles process supervision for its child Sidekiq
processes, and terminates itself and all child processes if any child
Sidekiq process exits.

Previously, it always exited with a 0 status code (i.e. success), no
matter how the child process had terminated.

Now it exits with 1 if any child process had a non-zero exit code. This
allows a process supervisor one level up (like systemd) to detect
failures and restart.

Changelog: changed
上级 0600b3ad
No related branches found
No related tags found
无相关合并请求
......@@ -40,15 +40,6 @@ def self.signal_processes(pids, signal)
pids.each { |pid| signal(pid, signal) }
end
# Waits for the given process to complete using a separate thread.
def self.wait_async(pid)
Thread.new do
Process.wait(pid)
rescue StandardError
nil # There is no reason to return `Errno::ECHILD` if it catches a `TypeError`
end
end
# Returns true if all the processes are alive.
def self.all_alive?(pids)
pids.each do |pid|
......
# frozen_string_literal: true
require_relative './daemon'
module Gitlab
# Given a set of process IDs, the supervisor can monitor processes
# for being alive and invoke a callback if some or all should go away.
......
......@@ -112,7 +112,7 @@ def run(argv = ARGV)
end
def start_and_supervise_workers(queue_groups)
worker_pids = SidekiqCluster.start(
wait_threads = SidekiqCluster.start(
queue_groups,
env: @environment,
directory: @rails_path,
......@@ -135,6 +135,7 @@ def start_and_supervise_workers(queue_groups)
)
metrics_server_pid = start_metrics_server
worker_pids = wait_threads.map(&:pid)
supervisor.supervise(worker_pids + Array(metrics_server_pid)) do |dead_pids|
# If we're not in the process of shutting down the cluster,
# and the metrics server died, restart it.
......@@ -149,6 +150,13 @@ def start_and_supervise_workers(queue_groups)
[]
end
end
exit_statuses = wait_threads.map do |thread|
thread.join
thread.value
end
exit 1 unless exit_statuses.compact.all?(&:success?)
end
def start_metrics_server
......
# frozen_string_literal: true
require_relative '../lib/gitlab/process_management'
require_relative '../lib/gitlab/process_supervisor'
module Gitlab
module SidekiqCluster
......@@ -33,7 +34,8 @@ module SidekiqCluster
#
# directory - The directory of the Rails application.
#
# Returns an Array containing the PIDs of the started processes.
# Returns an Array containing the waiter threads (from Process.detach) of
# the started processes.
def self.start(queues, env: :development, directory: Dir.pwd, max_concurrency: 20, min_concurrency: 0, timeout: DEFAULT_SOFT_TIMEOUT_SECONDS, dryrun: false)
queues.map.with_index do |pair, index|
start_sidekiq(pair, env: env,
......@@ -82,9 +84,7 @@ def self.start_sidekiq(queues, env:, directory:, max_concurrency:, min_concurren
)
end
ProcessManagement.wait_async(pid)
pid
Process.detach(pid)
end
def self.count_by_queue(queues)
......
......@@ -299,11 +299,11 @@
end
context 'starting the server' do
context 'without --dryrun' do
before do
allow(Gitlab::SidekiqCluster).to receive(:start).and_return([])
end
before do
allow(Gitlab::SidekiqCluster).to receive(:start).and_return([])
end
context 'without --dryrun' do
it 'wipes the metrics directory before starting workers' do
expect(metrics_cleanup_service).to receive(:execute).ordered
expect(Gitlab::SidekiqCluster).to receive(:start).ordered.and_return([])
......@@ -403,9 +403,42 @@
let(:sidekiq_exporter_enabled) { true }
let(:metrics_server_pid) { 99 }
let(:sidekiq_worker_pids) { [2, 42] }
let(:waiter_threads) { [instance_double('Process::Waiter'), instance_double('Process::Waiter')] }
let(:process_status) { instance_double('Process::Status') }
before do
allow(Gitlab::SidekiqCluster).to receive(:start).and_return(sidekiq_worker_pids)
allow(Gitlab::SidekiqCluster).to receive(:start).and_return(waiter_threads)
allow(process_status).to receive(:success?).and_return(true)
allow(cli).to receive(:exit)
waiter_threads.each.with_index do |thread, i|
allow(thread).to receive(:join)
allow(thread).to receive(:pid).and_return(sidekiq_worker_pids[i])
allow(thread).to receive(:value).and_return(process_status)
end
end
context 'when one of the workers has been terminated gracefully' do
it 'stops the entire process cluster' do
expect(MetricsServer).to receive(:start_for_sidekiq).once.and_return(metrics_server_pid)
expect(supervisor).to receive(:supervise).and_yield([2, 99])
expect(supervisor).to receive(:shutdown)
expect(cli).not_to receive(:exit).with(1)
cli.run(%w(foo))
end
end
context 'when one of the workers has failed' do
it 'stops the entire process cluster and exits with a non-zero code' do
expect(MetricsServer).to receive(:start_for_sidekiq).once.and_return(metrics_server_pid)
expect(supervisor).to receive(:supervise).and_yield([2, 99])
expect(supervisor).to receive(:shutdown)
expect(process_status).to receive(:success?).and_return(false)
expect(cli).to receive(:exit).with(1)
cli.run(%w(foo))
end
end
it 'stops the entire process cluster if one of the workers has been terminated' do
......
......@@ -41,15 +41,6 @@
end
end
describe '.wait_async' do
it 'waits for a process in a separate thread' do
thread = described_class.wait_async(Process.spawn('true'))
# Upon success Process.wait just returns the PID.
expect(thread.value).to be_a_kind_of(Numeric)
end
end
# In the X_alive? checks, we check negative PIDs sometimes as a simple way
# to be sure the pids are definitely for non-existent processes.
# Note that -1 is special, and sends the signal to every process we have permission
......
......@@ -20,13 +20,16 @@
"SIDEKIQ_WORKER_ID" => "0"
},
"bundle", "exec", "sidekiq", "-c10", "-eproduction", "-t25", "-gqueues:foo", "-rfoo/bar", "-qfoo,1", process_options
)
).and_return(1)
expect(Process).to receive(:detach).ordered.with(1)
expect(Process).to receive(:spawn).ordered.with({
"ENABLE_SIDEKIQ_CLUSTER" => "1",
"SIDEKIQ_WORKER_ID" => "1"
},
"bundle", "exec", "sidekiq", "-c10", "-eproduction", "-t25", "-gqueues:bar,baz", "-rfoo/bar", "-qbar,1", "-qbaz,1", process_options
)
).and_return(2)
expect(Process).to receive(:detach).ordered.with(2)
described_class.start([%w(foo), %w(bar baz)], env: :production, directory: 'foo/bar', max_concurrency: 20, min_concurrency: 10)
end
......@@ -58,11 +61,13 @@
let(:env) { { "ENABLE_SIDEKIQ_CLUSTER" => "1", "SIDEKIQ_WORKER_ID" => first_worker_id.to_s } }
let(:args) { ['bundle', 'exec', 'sidekiq', anything, '-eproduction', '-t10', *([anything] * 5)] }
let(:waiter_thread) { instance_double('Process::Waiter') }
it 'starts a Sidekiq process' do
allow(Process).to receive(:spawn).and_return(1)
allow(Process).to receive(:detach).with(1).and_return(waiter_thread)
expect(Gitlab::ProcessManagement).to receive(:wait_async).with(1)
expect(described_class.start_sidekiq(%w(foo), **options)).to eq(1)
expect(described_class.start_sidekiq(%w(foo), **options)).to eq(waiter_thread)
end
it 'handles duplicate queue names' do
......@@ -70,9 +75,9 @@
.to receive(:spawn)
.with(env, *args, anything)
.and_return(1)
allow(Process).to receive(:detach).with(1).and_return(waiter_thread)
expect(Gitlab::ProcessManagement).to receive(:wait_async).with(1)
expect(described_class.start_sidekiq(%w(foo foo bar baz), **options)).to eq(1)
expect(described_class.start_sidekiq(%w(foo foo bar baz), **options)).to eq(waiter_thread)
end
it 'runs the sidekiq process in a new process group' do
......@@ -80,9 +85,9 @@
.to receive(:spawn)
.with(anything, *args, a_hash_including(pgroup: true))
.and_return(1)
allow(Process).to receive(:detach).with(1).and_return(waiter_thread)
allow(Gitlab::ProcessManagement).to receive(:wait_async)
expect(described_class.start_sidekiq(%w(foo bar baz), **options)).to eq(1)
expect(described_class.start_sidekiq(%w(foo bar baz), **options)).to eq(waiter_thread)
end
end
......
0% 加载中 .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册