Skip to content
代码片段 群组 项目
提交 f93056ea 编辑于 作者: Greg Harris's avatar Greg Harris 提交者: Randall Hauch
浏览文件

KAFKA-10286: Connect system tests should wait for workers to join group (#9040)


Currently, the system tests `connect_distributed_test` and `connect_rest_test` only wait for the REST api to come up.
The startup of the worker includes an asynchronous process for joining the worker group and syncing with other workers.
There are some situations in which this sync takes an unusually long time, and the test continues without all workers up.
This leads to flakey test failures, as worker joins are not given sufficient time to timeout and retry without waiting explicitly.

This changes the `ConnectDistributedTest` to wait for the Joined group message to be printed to the logs before continuing with tests. I've activated this behavior by default, as it's a superset of the checks that were performed by default before.

This log message is present in every version of DistributedHerder that I could find, in slightly different forms, but always with `Joined group` at the beginning of the log message. This change should be safe to backport to any branch.

Signed-off-by: default avatarGreg Harris <gregh@confluent.io>
Author: Greg Harris <gregh@confluent.io>
Reviewer: Randall Hauch <rhauch@gmail.com>
上级 bac44009
No related branches found
No related tags found
无相关合并请求
......@@ -43,13 +43,15 @@ class ConnectServiceBase(KafkaPathResolverMixin, Service):
CONNECT_REST_PORT = 8083
HEAP_DUMP_FILE = os.path.join(PERSISTENT_ROOT, "connect_heap_dump.bin")
# Currently the Connect worker supports waiting on three modes:
# Currently the Connect worker supports waiting on four modes:
STARTUP_MODE_INSTANT = 'INSTANT'
"""STARTUP_MODE_INSTANT: Start Connect worker and return immediately"""
STARTUP_MODE_LOAD = 'LOAD'
"""STARTUP_MODE_LOAD: Start Connect worker and return after discovering and loading plugins"""
STARTUP_MODE_LISTEN = 'LISTEN'
"""STARTUP_MODE_LISTEN: Start Connect worker and return after opening the REST port."""
STARTUP_MODE_JOIN = 'JOIN'
"""STARTUP_MODE_JOIN: Start Connect worker and return after joining the group."""
logs = {
"connect_log": {
......@@ -114,8 +116,9 @@ class ConnectServiceBase(KafkaPathResolverMixin, Service):
self.logger.debug("REST resources are not loaded yet")
return False
def start(self, mode=STARTUP_MODE_LISTEN):
self.startup_mode = mode
def start(self, mode=None):
if mode:
self.startup_mode = mode
super(ConnectServiceBase, self).start()
def start_and_return_immediately(self, node, worker_type, remote_connector_configs):
......@@ -136,6 +139,15 @@ class ConnectServiceBase(KafkaPathResolverMixin, Service):
err_msg="Kafka Connect failed to start on node: %s in condition mode: %s" %
(str(node.account), self.startup_mode))
def start_and_wait_to_join_group(self, node, worker_type, remote_connector_configs):
if worker_type != 'distributed':
raise RuntimeError("Cannot wait for joined group message for %s" % worker_type)
with node.account.monitor_log(self.LOG_FILE) as monitor:
self.start_and_return_immediately(node, worker_type, remote_connector_configs)
monitor.wait_until('Joined group', timeout_sec=self.startup_timeout_sec,
err_msg="Never saw message indicating Kafka Connect joined group on node: " +
"%s in condition mode: %s" % (str(node.account), self.startup_mode))
def stop_node(self, node, clean_shutdown=True):
self.logger.info((clean_shutdown and "Cleanly" or "Forcibly") + " stopping Kafka Connect on " + str(node.account))
pids = self.pids(node)
......@@ -307,6 +319,8 @@ class ConnectStandaloneService(ConnectServiceBase):
self.start_and_wait_to_load_plugins(node, 'standalone', remote_connector_configs)
elif self.startup_mode == self.STARTUP_MODE_INSTANT:
self.start_and_return_immediately(node, 'standalone', remote_connector_configs)
elif self.startup_mode == self.STARTUP_MODE_JOIN:
self.start_and_wait_to_join_group(node, 'standalone', remote_connector_configs)
else:
# The default mode is to wait until the complete startup of the worker
self.start_and_wait_to_start_listening(node, 'standalone', remote_connector_configs)
......@@ -321,6 +335,7 @@ class ConnectDistributedService(ConnectServiceBase):
def __init__(self, context, num_nodes, kafka, files, offsets_topic="connect-offsets",
configs_topic="connect-configs", status_topic="connect-status", startup_timeout_sec = 60):
super(ConnectDistributedService, self).__init__(context, num_nodes, kafka, files, startup_timeout_sec)
self.startup_mode = self.STARTUP_MODE_JOIN
self.offsets_topic = offsets_topic
self.configs_topic = configs_topic
self.status_topic = status_topic
......@@ -354,9 +369,11 @@ class ConnectDistributedService(ConnectServiceBase):
self.start_and_wait_to_load_plugins(node, 'distributed', '')
elif self.startup_mode == self.STARTUP_MODE_INSTANT:
self.start_and_return_immediately(node, 'distributed', '')
elif self.startup_mode == self.STARTUP_MODE_LISTEN:
self.start_and_wait_to_start_listening(node, 'distributed', '')
else:
# The default mode is to wait until the complete startup of the worker
self.start_and_wait_to_start_listening(node, 'distributed', '')
self.start_and_wait_to_join_group(node, 'distributed', '')
if len(self.pids(node)) == 0:
raise RuntimeError("No process ids recorded")
......
0% 加载中 .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册