Skip to content
代码片段 群组 项目
提交 625e38e0 编辑于 作者: Matthias Käppler's avatar Matthias Käppler 提交者: Sean McGivern
浏览文件

Address several edge cases in ProcessSupervisor

Fixed:
- Do not determine `alive` status from the pidset returned
  by caller, since this can lead to "bouncing" state.
- Swap loop-check with sleep call, since sleep suspends
  the calling thread, which may lead to the callback
  being invoked if `alive` is invalidated by another
  thread.
- Do not trap INT and TERM by default; this was swallowing
  these signals for the Puma master.

Changelog: fixed
上级 07a34882
No related branches found
No related tags found
无相关合并请求
...@@ -20,7 +20,7 @@ def initialize( ...@@ -20,7 +20,7 @@ def initialize(
health_check_interval_seconds: DEFAULT_HEALTH_CHECK_INTERVAL_SECONDS, health_check_interval_seconds: DEFAULT_HEALTH_CHECK_INTERVAL_SECONDS,
check_terminate_interval_seconds: DEFAULT_TERMINATE_INTERVAL_SECONDS, check_terminate_interval_seconds: DEFAULT_TERMINATE_INTERVAL_SECONDS,
terminate_timeout_seconds: DEFAULT_TERMINATE_TIMEOUT_SECONDS, terminate_timeout_seconds: DEFAULT_TERMINATE_TIMEOUT_SECONDS,
term_signals: %i(INT TERM), term_signals: [],
forwarded_signals: [], forwarded_signals: [],
**options) **options)
super(**options) super(**options)
...@@ -31,7 +31,7 @@ def initialize( ...@@ -31,7 +31,7 @@ def initialize(
@check_terminate_interval_seconds = check_terminate_interval_seconds @check_terminate_interval_seconds = check_terminate_interval_seconds
@terminate_timeout_seconds = terminate_timeout_seconds @terminate_timeout_seconds = terminate_timeout_seconds
@pids = [] @pids = Set.new
@alive = false @alive = false
end end
...@@ -43,7 +43,7 @@ def initialize( ...@@ -43,7 +43,7 @@ def initialize(
# If the block returns a non-empty list of IDs, the supervisor will # If the block returns a non-empty list of IDs, the supervisor will
# start observing those processes instead. Otherwise it will shut down. # start observing those processes instead. Otherwise it will shut down.
def supervise(pid_or_pids, &on_process_death) def supervise(pid_or_pids, &on_process_death)
@pids = Array(pid_or_pids) @pids = Array(pid_or_pids).to_set
@on_process_death = on_process_death @on_process_death = on_process_death
trap_signals! trap_signals!
...@@ -56,7 +56,6 @@ def shutdown(signal = :TERM) ...@@ -56,7 +56,6 @@ def shutdown(signal = :TERM)
return unless @alive return unless @alive
stop_processes(signal) stop_processes(signal)
stop
end end
def supervised_pids def supervised_pids
...@@ -75,26 +74,25 @@ def stop_working ...@@ -75,26 +74,25 @@ def stop_working
def run_thread def run_thread
while @alive while @alive
sleep(@health_check_interval_seconds)
check_process_health check_process_health
sleep(@health_check_interval_seconds)
end end
end end
def check_process_health def check_process_health
unless all_alive? unless all_alive?
existing_pids = live_pids # Capture this value for the duration of the block. existing_pids = live_pids.to_set # Capture this value for the duration of the block.
dead_pids = @pids - existing_pids dead_pids = @pids - existing_pids
new_pids = Array(@on_process_death.call(dead_pids)) new_pids = Array(@on_process_death.call(dead_pids.to_a))
@pids = existing_pids + new_pids @pids = existing_pids + new_pids.to_set
@alive = @pids.any?
end end
end end
def stop_processes(signal) def stop_processes(signal)
# Set this prior to shutting down so that shutdown hooks which read `alive` # Set this prior to shutting down so that shutdown hooks which read `alive`
# know the supervisor is about to shut down. # know the supervisor is about to shut down.
@alive = false stop_working
# Shut down supervised processes. # Shut down supervised processes.
signal_all(signal) signal_all(signal)
......
...@@ -22,8 +22,6 @@ def start_for_puma ...@@ -22,8 +22,6 @@ def start_for_puma
supervisor = PumaProcessSupervisor.instance supervisor = PumaProcessSupervisor.instance
supervisor.supervise(start_server.call) do supervisor.supervise(start_server.call) do
next unless supervisor.alive
Gitlab::AppLogger.info('Puma metrics server terminated, restarting...') Gitlab::AppLogger.info('Puma metrics server terminated, restarting...')
start_server.call start_server.call
end end
......
...@@ -20,7 +20,7 @@ ...@@ -20,7 +20,7 @@
module Gitlab module Gitlab
module SidekiqCluster module SidekiqCluster
class CLI class CLI
THREAD_NAME = 'supervisor' THREAD_NAME = 'sidekiq-cluster'
# The signals that should terminate both the master and workers. # The signals that should terminate both the master and workers.
TERMINATE_SIGNALS = %i(INT TERM).freeze TERMINATE_SIGNALS = %i(INT TERM).freeze
...@@ -134,23 +134,17 @@ def start_and_supervise_workers(queue_groups) ...@@ -134,23 +134,17 @@ def start_and_supervise_workers(queue_groups)
) )
metrics_server_pid = start_metrics_server metrics_server_pid = start_metrics_server
supervisor.supervise(worker_pids + Array(metrics_server_pid)) do |dead_pids|
all_pids = worker_pids + Array(metrics_server_pid)
supervisor.supervise(all_pids) do |dead_pids|
# If we're not in the process of shutting down the cluster, # If we're not in the process of shutting down the cluster,
# and the metrics server died, restart it. # and the metrics server died, restart it.
if supervisor.alive && dead_pids.include?(metrics_server_pid) if dead_pids == Array(metrics_server_pid)
@logger.info('Sidekiq metrics server terminated, restarting...') @logger.info('Sidekiq metrics server terminated, restarting...')
metrics_server_pid = restart_metrics_server metrics_server_pid = restart_metrics_server
all_pids = worker_pids + Array(metrics_server_pid)
else else
# If a worker process died we'll just terminate the whole cluster. # If a worker process died we'll just terminate the whole cluster.
# We let an external system (runit, kubernetes) handle the restart. # We let an external system (runit, kubernetes) handle the restart.
@logger.info('A worker terminated, shutting down the cluster') @logger.info('A worker terminated, shutting down the cluster')
supervisor.shutdown
ProcessManagement.signal_processes(all_pids - dead_pids, :TERM)
# Signal supervisor not to respawn workers and shut down.
[] []
end end
end end
......
...@@ -440,32 +440,18 @@ ...@@ -440,32 +440,18 @@
end end
it 'stops the entire process cluster if one of the workers has been terminated' do it 'stops the entire process cluster if one of the workers has been terminated' do
expect(supervisor).to receive(:alive).and_return(true)
expect(supervisor).to receive(:supervise).and_yield([2])
expect(MetricsServer).to receive(:start_for_sidekiq).once.and_return(metrics_server_pid) expect(MetricsServer).to receive(:start_for_sidekiq).once.and_return(metrics_server_pid)
expect(Gitlab::ProcessManagement).to receive(:signal_processes).with([42, 99], :TERM) expect(supervisor).to receive(:supervise).and_yield([2, 99])
expect(supervisor).to receive(:shutdown)
cli.run(%w(foo)) cli.run(%w(foo))
end end
context 'when the supervisor is alive' do it 'restarts the metrics server when it is down' do
it 'restarts the metrics server when it is down' do expect(supervisor).to receive(:supervise).and_yield([metrics_server_pid])
expect(supervisor).to receive(:alive).and_return(true) expect(MetricsServer).to receive(:start_for_sidekiq).twice.and_return(metrics_server_pid)
expect(supervisor).to receive(:supervise).and_yield([metrics_server_pid])
expect(MetricsServer).to receive(:start_for_sidekiq).twice.and_return(metrics_server_pid)
cli.run(%w(foo)) cli.run(%w(foo))
end
end
context 'when the supervisor is shutting down' do
it 'does not restart the metrics server' do
expect(supervisor).to receive(:alive).and_return(false)
expect(supervisor).to receive(:supervise).and_yield([metrics_server_pid])
expect(MetricsServer).to receive(:start_for_sidekiq).once.and_return(metrics_server_pid)
cli.run(%w(foo))
end
end end
end end
end end
......
...@@ -6,6 +6,7 @@ ...@@ -6,6 +6,7 @@
let(:health_check_interval_seconds) { 0.1 } let(:health_check_interval_seconds) { 0.1 }
let(:check_terminate_interval_seconds) { 1 } let(:check_terminate_interval_seconds) { 1 }
let(:forwarded_signals) { [] } let(:forwarded_signals) { [] }
let(:term_signals) { [] }
let(:process_ids) { [spawn_process, spawn_process] } let(:process_ids) { [spawn_process, spawn_process] }
def spawn_process def spawn_process
...@@ -19,7 +20,8 @@ def spawn_process ...@@ -19,7 +20,8 @@ def spawn_process
health_check_interval_seconds: health_check_interval_seconds, health_check_interval_seconds: health_check_interval_seconds,
check_terminate_interval_seconds: check_terminate_interval_seconds, check_terminate_interval_seconds: check_terminate_interval_seconds,
terminate_timeout_seconds: 1 + check_terminate_interval_seconds, terminate_timeout_seconds: 1 + check_terminate_interval_seconds,
forwarded_signals: forwarded_signals forwarded_signals: forwarded_signals,
term_signals: term_signals
) )
end end
...@@ -29,6 +31,8 @@ def spawn_process ...@@ -29,6 +31,8 @@ def spawn_process
rescue Errno::ESRCH rescue Errno::ESRCH
# Ignore if a process wasn't actually alive. # Ignore if a process wasn't actually alive.
end end
supervisor.stop
end end
describe '#supervise' do describe '#supervise' do
...@@ -60,7 +64,7 @@ def spawn_process ...@@ -60,7 +64,7 @@ def spawn_process
[42] # Fake starting a new process in place of the terminated one. [42] # Fake starting a new process in place of the terminated one.
end end
# Terminate the supervised process. # Terminate a supervised process.
Process.kill('TERM', process_ids.first) Process.kill('TERM', process_ids.first)
await_condition(sleep_sec: health_check_interval_seconds) do await_condition(sleep_sec: health_check_interval_seconds) do
...@@ -71,6 +75,72 @@ def spawn_process ...@@ -71,6 +75,72 @@ def spawn_process
expect(Gitlab::ProcessManagement.process_alive?(process_ids.last)).to be(true) expect(Gitlab::ProcessManagement.process_alive?(process_ids.last)).to be(true)
expect(supervisor.supervised_pids).to match_array([process_ids.last, 42]) expect(supervisor.supervised_pids).to match_array([process_ids.last, 42])
end end
it 'deduplicates PIDs returned from callback' do
expect(Gitlab::ProcessManagement.all_alive?(process_ids)).to be(true)
pids_killed = []
supervisor.supervise(process_ids) do |dead_pids|
pids_killed = dead_pids
# Fake a new process having the same pid as one that was just terminated.
[process_ids.last]
end
# Terminate a supervised process.
Process.kill('TERM', process_ids.first)
await_condition(sleep_sec: health_check_interval_seconds) do
pids_killed == [process_ids.first]
end
expect(supervisor.supervised_pids).to contain_exactly(process_ids.last)
end
it 'accepts single PID returned from callback' do
expect(Gitlab::ProcessManagement.all_alive?(process_ids)).to be(true)
pids_killed = []
supervisor.supervise(process_ids) do |dead_pids|
pids_killed = dead_pids
42
end
# Terminate a supervised process.
Process.kill('TERM', process_ids.first)
await_condition(sleep_sec: health_check_interval_seconds) do
pids_killed == [process_ids.first]
end
expect(supervisor.supervised_pids).to contain_exactly(42, process_ids.last)
end
context 'but supervisor has entered shutdown' do
it 'does not trigger callback again' do
expect(Gitlab::ProcessManagement.all_alive?(process_ids)).to be(true)
callback_count = 0
supervisor.supervise(process_ids) do |dead_pids|
callback_count += 1
Thread.new { supervisor.shutdown }
[42]
end
# Terminate the supervised processes to trigger more than 1 callback.
Process.kill('TERM', process_ids.first)
Process.kill('TERM', process_ids.last)
await_condition(sleep_sec: health_check_interval_seconds * 3) do
supervisor.alive == false
end
# Since we shut down the supervisor during the first callback, it should not
# be called anymore.
expect(callback_count).to eq(1)
end
end
end end
context 'signal handling' do context 'signal handling' do
...@@ -82,6 +152,8 @@ def spawn_process ...@@ -82,6 +152,8 @@ def spawn_process
end end
context 'termination signals' do context 'termination signals' do
let(:term_signals) { %i(INT TERM) }
context 'when TERM results in timely shutdown of processes' do context 'when TERM results in timely shutdown of processes' do
it 'forwards them to observed processes without waiting for grace period to expire' do it 'forwards them to observed processes without waiting for grace period to expire' do
allow(Gitlab::ProcessManagement).to receive(:any_alive?).and_return(false) allow(Gitlab::ProcessManagement).to receive(:any_alive?).and_return(false)
......
...@@ -267,28 +267,13 @@ ...@@ -267,28 +267,13 @@
end end
context 'when the supervisor callback is invoked' do context 'when the supervisor callback is invoked' do
context 'and the supervisor is alive' do it 'restarts the metrics server' do
it 'restarts the metrics server' do expect(supervisor).to receive(:supervise).and_yield
expect(supervisor).to receive(:alive).and_return(true) expect(Process).to receive(:spawn).with(
expect(supervisor).to receive(:supervise).and_yield include('METRICS_SERVER_TARGET' => 'puma'), end_with('bin/metrics-server'), anything
expect(Process).to receive(:spawn).with( ).twice.and_return(42)
include('METRICS_SERVER_TARGET' => 'puma'), end_with('bin/metrics-server'), anything
).twice.and_return(42)
described_class.start_for_puma
end
end
context 'and the supervisor is not alive' do described_class.start_for_puma
it 'does not restart the server' do
expect(supervisor).to receive(:alive).and_return(false)
expect(supervisor).to receive(:supervise).and_yield
expect(Process).to receive(:spawn).with(
include('METRICS_SERVER_TARGET' => 'puma'), end_with('bin/metrics-server'), anything
).once.and_return(42)
described_class.start_for_puma
end
end end
end end
end end
......
0% 加载中 .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册