Skip to content
代码片段 群组 项目
提交 e9b58cf7 编辑于 作者: Brian Byrne's avatar Brian Byrne 提交者: Ismael Juma
浏览文件

KAFKA-10158: Fix flaky testDescribeUnderReplicatedPartitionsWhenReassignmentIsInProgress (#9022)

Set `replica.fetch.max.bytes` to `1` and produce multiple record batches to allow
for throttling to take place. This helps avoid a race condition where the
reassignment would complete more quickly than expected causing an
assertion to fail some times.

Reviewers: Lucas Bradstreet <lucas@confluent.io>, Jason Gustafson <jason@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>, Ismael Juma <ismael@juma.me.uk>
上级 7f9187fe
No related branches found
No related tags found
无相关合并请求
......@@ -49,14 +49,20 @@ class TopicCommandWithAdminClientTest extends KafkaServerTestHarness with Loggin
/**
* Implementations must override this method to return a set of KafkaConfigs. This method will be invoked for every
* test and should not reuse previous configurations unless they select their ports randomly when servers are started.
*
* Note the replica fetch max bytes is set to `1` in order to throttle the rate of replication for test
* `testDescribeUnderReplicatedPartitionsWhenReassignmentIsInProgress`.
*/
override def generateConfigs: Seq[KafkaConfig] = TestUtils.createBrokerConfigs(
numConfigs = 6,
zkConnect = zkConnect,
rackInfo = Map(0 -> "rack1", 1 -> "rack2", 2 -> "rack2", 3 -> "rack1", 4 -> "rack3", 5 -> "rack3"),
numPartitions = numPartitions,
defaultReplicationFactor = defaultReplicationFactor
).map(KafkaConfig.fromProps)
defaultReplicationFactor = defaultReplicationFactor,
).map { props =>
props.put(KafkaConfig.ReplicaFetchMaxBytesProp, "1")
KafkaConfig.fromProps(props)
}
private val numPartitions = 1
private val defaultReplicationFactor = 1.toShort
......@@ -655,8 +661,13 @@ class TopicCommandWithAdminClientTest extends KafkaServerTestHarness with Loggin
adminClient.createTopics(
Collections.singletonList(new NewTopic(testTopicName, partitions, replicationFactor).configs(configMap))).all().get()
waitForTopicCreated(testTopicName)
// Produce multiple batches.
TestUtils.generateAndProduceMessages(servers, testTopicName, numMessages = 10, acks = -1)
TestUtils.generateAndProduceMessages(servers, testTopicName, numMessages = 10, acks = -1)
// Enable throttling. Note the broker config sets the replica max fetch bytes to `1` upon to minimize replication
// throughput so the reassignment doesn't complete quickly.
val brokerIds = servers.map(_.config.brokerId)
TestUtils.setReplicationThrottleForPartitions(adminClient, brokerIds, Set(tp), throttleBytes = 1)
......@@ -686,6 +697,10 @@ class TopicCommandWithAdminClientTest extends KafkaServerTestHarness with Loggin
topicService.describeTopic(new TopicCommandOptions(Array("--under-replicated-partitions"))))
assertEquals("--under-replicated-partitions shouldn't return anything", "", underReplicatedOutput)
// Verify reassignment is still ongoing.
val reassignments = adminClient.listPartitionReassignments(Collections.singleton(tp)).reassignments.get().get(tp)
assertFalse(Option(reassignments).forall(_.addingReplicas.isEmpty))
TestUtils.removeReplicationThrottleForPartitions(adminClient, brokerIds, Set(tp))
TestUtils.waitForAllReassignmentsToComplete(adminClient)
}
......
0% 加载中 .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册