Skip to content
代码片段 群组 项目
未验证 提交 d4660115 编辑于 作者: fred zheng's avatar fred zheng 提交者: GitHub
浏览文件

KDATA-454: add raft test annotation for clm test (#6673)

enhance clm system test to support kraft
上级 5283a01b
No related branches found
No related tags found
无相关合并请求
......@@ -10,6 +10,7 @@ from ducktape.utils.util import wait_until
from kafkatest.services.kafka import KafkaService
from kafkatest.services.kafka import config_property
from kafkatest.services.kafka import quorum
from kafkatest.services.kafka.util import fix_opts_for_new_jvm
from kafkatest.services.verifiable_producer import VerifiableProducer
from kafkatest.services.zookeeper import ZookeeperService
......@@ -17,6 +18,7 @@ from kafkatest.utils.tiered_storage import TierSupport, S3_BACKEND, GCS_BACKEND,
from kafkatest.version import DEV_BRANCH
class BackupObjectLifecycleManagerTest(Test, TierSupport):
"""
This test verifies the behavior of the CustomLifecycleManager component of Backup & Restore. It tests that CLM
......@@ -38,6 +40,7 @@ class BackupObjectLifecycleManagerTest(Test, TierSupport):
# multiple segments are rolled, tiered to S3 and deleted from the local log.
LOG_SEGMENT_BYTES = 4096
MIN_RECORDS_PRODUCED = 25000
BROKER_COUNT = 4
TOPIC_COMMON_CONFIG = {
"topic": "",
......@@ -62,15 +65,26 @@ class BackupObjectLifecycleManagerTest(Test, TierSupport):
def __init__(self, test_context):
super(BackupObjectLifecycleManagerTest, self).__init__(test_context=test_context)
self.zk = ZookeeperService(test_context, num_nodes=1)
self.kafka = KafkaService(test_context, num_nodes=4, zk=self.zk)
self.zk = ZookeeperService(test_context, num_nodes=1) if quorum.for_test(test_context) == quorum.zk else None
controller_num_nodes = 1 if quorum.for_test(test_context) == quorum.remote_kraft else 0
self.kafka = KafkaService(test_context, num_nodes=self.BROKER_COUNT, zk=self.zk,
enable_jmx_on_remote_kafka=False,
controller_num_nodes_override=controller_num_nodes)
self.test_topic_1d = "test_topic_1day"
self.test_topic_4d = "test_topic_4day"
self.num_producers = 1
self.min_num_segments = 2
def setup(self):
self.zk.start()
if self.zk:
self.zk.start()
def teardown(self):
self.kafka.stop()
if self.zk:
self.zk.stop()
def create_test_topics(self):
"""Creates both the topics required by this test"""
......@@ -276,10 +290,11 @@ class BackupObjectLifecycleManagerTest(Test, TierSupport):
# dictionary of object key to deletion status (True if we have a deleteMarker, and it is the latest version /
# False otherwise)
deletion_status = {}
for non_delete_marker in all_versions['Versions']:
k = non_delete_marker['Key']
if non_delete_marker['IsLatest']:
deletion_status[k] = False
if 'Versions' in all_versions:
for non_delete_marker in all_versions['Versions']:
k = non_delete_marker['Key']
if non_delete_marker['IsLatest']:
deletion_status[k] = False
if 'DeleteMarkers' in all_versions:
for delete_marker in all_versions['DeleteMarkers']:
k = delete_marker['Key']
......@@ -344,6 +359,19 @@ class BackupObjectLifecycleManagerTest(Test, TierSupport):
self.ObjectExistenceState.LIVE),
timeout_sec=300, backoff_sec=1, err_msg="CLM did not complete in time")
def may_cleanup_lifecycle_manager_state(self, backend):
"""delete lifecycle_manager_state_file from backend to cleanup test env"""
lifecycle_manager_state_file = "3/" + self.kafka.cluster_id() + ".lifecycle-manager-state"
self.logger.info("clean up lifecycle_manager_state_file from " + backend)
if backend == S3_BACKEND:
output = self.delete_s3_object(lifecycle_manager_state_file)
elif backend == GCS_BACKEND:
output = self.delete_gcs_object(lifecycle_manager_state_file)
elif backend == AZURE_BLOCK_BLOB_BACKEND:
pass
self.logger.info(output)
def restore_segment(self, segment, backend):
"""Restore the given segment from non-current to live status."""
self.logger.debug("Restoring %s" % segment)
......@@ -361,9 +389,16 @@ class BackupObjectLifecycleManagerTest(Test, TierSupport):
assert self.check_azure_object_existence_state(segment, self.ObjectExistenceState.LIVE)
@cluster(num_nodes=9)
@matrix(client_version=[str(DEV_BRANCH)], backend=[S3_BACKEND, GCS_BACKEND])
def test_backup_object_lifecycle_manager(self, client_version, backend):
@matrix(client_version=[str(DEV_BRANCH)], backend=[GCS_BACKEND], metadata_quorum=quorum.all)
@cluster(num_nodes=9)
@matrix(client_version=[str(DEV_BRANCH)], backend=[S3_BACKEND], metadata_quorum=[quorum.zk])
def test_backup_object_lifecycle_manager(self, client_version, backend, metadata_quorum=quorum.zk):
"""
The test matrix for object store provider and quorum is large. Since KRaft related code is independent of
object store calls, and we have verified the whole test matrix passed.
To reduce the test matrix and runtime, here we only configure it run all types of quorum against GCS, and
only run ZK against S3.
This test will check the basic functions of backup object LifecycleManager(LM) which is a component of
backup & restore. Deleted objects at the tiered storage buckets are kept around as
non-current objects (versioning). LifecycleManager looks up these non-current objects and determines
......@@ -379,7 +414,6 @@ class BackupObjectLifecycleManagerTest(Test, TierSupport):
- non-current objects belonging to the topic with 4 day retention must not get deleted.
- live objects must not get deleted.
"""
self.logger.info("client_version: " + client_version)
self.configure_tiering(backend, metadata_replication_factor=1, log_segment_bytes=self.LOG_SEGMENT_BYTES,
hotset_ms=1, hotset_bytes=1, prefer_tier_fetch_ms=-1, metadata_num_partitions=1,
......@@ -394,6 +428,9 @@ class BackupObjectLifecycleManagerTest(Test, TierSupport):
if backend == GCS_BACKEND:
self.setup_gsutil()
start_time = datetime.now()
self.may_cleanup_lifecycle_manager_state(backend)
self.create_test_topics()
self.produce_to_topic_and_tier_segments(self.test_topic_1d)
......@@ -416,15 +453,25 @@ class BackupObjectLifecycleManagerTest(Test, TierSupport):
self.kafka.delete_topic(self.test_topic_1d)
self.kafka.delete_topic(self.test_topic_4d)
self.may_cleanup_lifecycle_manager_state(backend)
@cluster(num_nodes=7)
@matrix(client_version=[str(DEV_BRANCH)], backend=[GCS_BACKEND, S3_BACKEND])
def test_lifecycle_manager_does_not_delete_restored_objects(self, client_version, backend):
"""Backup object LifecycleManager scans the tier state topic and finds the segments that have been deleted by Kafka.
@cluster(num_nodes=9)
@matrix(client_version=[str(DEV_BRANCH)], backend=[GCS_BACKEND], metadata_quorum=quorum.all)
@cluster(num_nodes=9)
@matrix(client_version=[str(DEV_BRANCH)], backend=[S3_BACKEND], metadata_quorum=[quorum.zk])
def test_lifecycle_manager_does_not_delete_restored_objects(self, client_version, backend, metadata_quorum=quorum.zk):
"""
The test matrix for object store provider and quorum is large. Since KRaft related code is independent of
object store calls, and we have verified the whole test matrix passed.
To reduce the test matrix and runtime, here we only configure it run all types of quorum against GCS, and
only run ZK against S3.
Backup object LifecycleManager scans the tier state topic and finds the segments that have been deleted by Kafka.
It furthers determines the time when the corresponding non-current versions will get deleted. Before deleting
the non-current versions, it checks if the object is actually live (not non-current). This can happen if the
object is being restored as part of an ongoing data restore for the affected partition, or if tiered storage
had marked it as deleted due to a bug. This test checks that LM must not delete any segments that are live."""
had marked it as deleted due to a bug. This test checks that LM must not delete any segments that are live.
"""
self.logger.info("client_version: " + client_version)
self.configure_tiering(backend, metadata_replication_factor=1, log_segment_bytes=self.LOG_SEGMENT_BYTES,
hotset_ms=1, hotset_bytes=1, prefer_tier_fetch_ms=-1, metadata_num_partitions=1,
......@@ -439,6 +486,8 @@ class BackupObjectLifecycleManagerTest(Test, TierSupport):
self.setup_gsutil()
start_time = datetime.now()
self.may_cleanup_lifecycle_manager_state(backend)
self.TOPIC_COMMON_CONFIG["topic"] = self.test_topic_1d
self.TOPIC_COMMON_CONFIG["configs"]["retention.ms"] = 86400000
self.kafka.create_topic(self.TOPIC_COMMON_CONFIG)
......@@ -494,3 +543,5 @@ class BackupObjectLifecycleManagerTest(Test, TierSupport):
assert self.check_azure_object_existence_state(segment, self.ObjectExistenceState.LIVE)
self.kafka.delete_topic(self.test_topic_1d)
self.may_cleanup_lifecycle_manager_state(backend)
......@@ -225,7 +225,6 @@ class ScalabeRestoreE2ETest(ProduceConsumeValidateTest, TierSupport):
@cluster(num_nodes=5)
@matrix(client_version=[str(DEV_BRANCH)], backend=[GCS_BACKEND, S3_BACKEND], metadata_quorum=quorum.all_non_upgrade)
def test_scalable_restore_e2e(self, client_version, backend, metadata_quorum=quorum.zk):
self.logger.info("Step #1: setup topic")
self.configure_tiering(backend, metadata_replication_factor=self.BROKER_COUNT,
log_segment_bytes=self.LOG_SEGMENT_BYTES,
......
......@@ -425,6 +425,16 @@ class TierSupport():
output += line.rstrip()
return output
def delete_s3_object(self, object_path):
node = self.kafka.nodes[0]
bucket = node.config[config_property.CONFLUENT_TIER_S3_BUCKET]
cmd = "aws s3api delete-object --bucket %s --key %s" % (bucket, object_path)
output = ""
for line in node.account.ssh_capture(cmd, allow_fail=True):
output += line.rstrip()
self.logger.debug(output)
return output
def setup_gsutil(self):
for node in self.kafka.nodes:
cmd = "gcloud auth activate-service-account --key-file %s" % node.config[config_property.CONFLUENT_TIER_GCS_CRED_FILE_PATH]
......@@ -464,6 +474,26 @@ class TierSupport():
output += line.rstrip()
return output
def delete_gcs_object(self, object_path):
node = self.kafka.nodes[0]
bucket = node.config[config_property.CONFLUENT_TIER_GCS_BUCKET]
prefix = ''
if config_property.CONFLUENT_TIER_GCS_PREFIX in node.config:
prefix = node.config[config_property.CONFLUENT_TIER_GCS_PREFIX]
# override the sdk version temporarily
# until the vagrant base image can be upgraded
# https://confluentinc.atlassian.net/browse/ST-4137
path = "gs://{}/{}".format(bucket, object_path)
if prefix not in ('', None):
path = "gs://{}/{}/{}".format(bucket, prefix, object_path)
cmd = "gsutil rm {}".format(path)
output = ""
for line in node.account.ssh_capture(cmd, allow_fail=True):
output += line.rstrip()
self.logger.debug(output)
return output
def azure_container_information(self):
node = self.kafka.nodes[0]
container = node.config.get(config_property.CONFLUENT_TIER_AZURE_BLOCK_BLOB_CONTAINER, '')
......
0% 加载中 .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册