Skip to content
代码片段 群组 项目
未验证 提交 b21d7252 编辑于 作者: Bob Barrett's avatar Bob Barrett 提交者: GitHub
浏览文件

KCFUN-199: Convert dynamic quotas tests to KRaft (#6969)

上级 61c41d15
No related branches found
No related tags found
无相关合并请求
......@@ -5,11 +5,11 @@
package integration.kafka.coordinator.quota
import io.confluent.kafka.multitenant.TenantUtils
import kafka.api.IntegrationTestHarness
import kafka.coordinator.quota.{QuotaDescription, QuotaEntity}
import kafka.integration.KafkaServerTestHarness
import org.apache.kafka.common.internals.Topic
import kafka.server.KafkaConfig
import kafka.utils.TestUtils
import kafka.utils.{TestInfoUtils, TestUtils}
import org.apache.kafka.clients.admin.AlterConfigOp
import org.apache.kafka.clients.{ApiVersions, ClientResponse, ManualMetadataUpdater, NetworkClient, NetworkClientUtils}
import org.apache.kafka.common.config.ConfigResource
......@@ -26,13 +26,14 @@ import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.utils.{LogContext, Time}
import org.apache.kafka.server.quota.{ClientQuotaCallback, ClientQuotaEntity, ClientQuotaType, ClusterLevelQuotaCallback}
import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue, fail}
import org.junit.jupiter.api.{BeforeEach, Test, TestInfo}
import org.junit.jupiter.api.{BeforeEach, TestInfo}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
import java.{lang, util}
import java.util.Properties
import scala.jdk.CollectionConverters._
class DynamicQuotasTest extends KafkaServerTestHarness {
class DynamicQuotasTest extends IntegrationTestHarness {
val numServers = 3
val numQuotasTopicPartitions = 3
val broker0 = 0
......@@ -45,15 +46,25 @@ class DynamicQuotasTest extends KafkaServerTestHarness {
@BeforeEach
override def setUp(testInfo: TestInfo): Unit = {
// Configure brokers to use dynamic configs
serverConfig.put(KafkaConfig.DynamicQuotaEnabledProp, true.toString)
serverConfig.put(KafkaConfig.QuotasTopicPartitionsProp, numQuotasTopicPartitions.toString)
serverConfig.put(KafkaConfig.QuotasTopicReplicationFactorProp, 2.toString)
serverConfig.put(KafkaConfig.ControlledShutdownEnableProp, true.toString)
serverConfig.put(KafkaConfig.ClientQuotaCallbackClassProp, classOf[TestClientQuotaCallback].getName)
serverConfig.put(KafkaConfig.QuotasExpirationTimeMsProp, 0.toString) // Disable expiration by default to prevent flakiness
serverConfig.put(KafkaConfig.QuotasExpirationIntervalMsProp, (2*1000).toString) // Check expiration frequently so that we can test it using a dynamic config
serverConfig.put(ConfluentConfigs.MAX_BROKER_TENANT_PRODUCER_BYTE_RATE_CONFIG, brokerLimitProducer.toString)
super.setUp(testInfo)
TestUtils.waitForAllPartitionsMetadata(servers, Topic.QUOTA_TOPIC_NAME, numQuotasTopicPartitions)
servers.foreach(server => TestUtils.waitUntilTrue(server.quotaCoordinatorOpt.get.isActive.get,
s"Quota coordinator on broker $server never became active"))
TestUtils.waitForAllPartitionsMetadata(brokers, Topic.QUOTA_TOPIC_NAME, numQuotasTopicPartitions)
brokers.foreach(broker => TestUtils.waitUntilTrue(broker.quotaCoordinatorOpt.get.isActive.get,
s"Quota coordinator on broker $broker never became active"))
config = servers.head.config
time = servers.head.time
val metrics = servers.head.metrics
config = brokers.head.config
time = brokers.head.time
val metrics = brokers.head.metrics
val logContext: LogContext = new LogContext("DynamicQuotasTest ")
val channelBuilder = ChannelBuilders.clientChannelBuilder(
......@@ -96,18 +107,19 @@ class DynamicQuotasTest extends KafkaServerTestHarness {
)
}
@Test
def testReportConsumption(): Unit = {
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
def testReportConsumption(quorum: String): Unit = {
val quotaEntity = QuotaEntity(Map(TenantUtils.TENANT_TAG -> "tenant1"))
val partition = servers.head.quotaCoordinatorOpt.get.partitionFor(quotaEntity)
val partition = brokers.head.quotaCoordinatorOpt.get.partitionFor(quotaEntity)
val coordinatorNode =
servers.head.metadataCache.getPartitionLeaderEndpoint(Topic.QUOTA_TOPIC_NAME, partition, config.interBrokerListenerName) match {
brokers.head.metadataCache.getPartitionLeaderEndpoint(Topic.QUOTA_TOPIC_NAME, partition, config.interBrokerListenerName) match {
case Some(node) => node
case None => fail(s"Could not get node for quotas topic partition $partition")
}
servers.foreach { server =>
val brokerId = server.config.brokerId
brokers.foreach { broker =>
val brokerId = broker.config.brokerId
val requestBuilder = getRequestBuilder(brokerId, quotaEntity, ClientQuotaType.PRODUCE, (brokerId + 1) * 100.0)
def requestCallback(response: ClientResponse): Unit = {
......@@ -124,28 +136,29 @@ class DynamicQuotasTest extends KafkaServerTestHarness {
NetworkClientUtils.sendAndReceive(networkClient, request, time)
}
val quotas = servers(coordinatorNode.id).quotaCoordinatorOpt.get.describeQuota(quotaEntity)._2.brokerQuotas
servers.foreach { server =>
val quotas = brokers(coordinatorNode.id).quotaCoordinatorOpt.get.describeQuota(quotaEntity)._2.brokerQuotas
brokers.foreach { broker =>
// Each broker should have computed quota proportional to their reported usage
val brokerId = server.config.brokerId
val brokerId = broker.config.brokerId
val expected = (TestClientQuotaCallback.clusterLevelQuota / 6).toDouble * (brokerId + 1)
val actual = quotas(brokerId)(ClientQuotaType.PRODUCE.toString)
assertEquals(expected, actual, s"Got quota of $actual for broker $brokerId but expected $expected")
}
}
@Test
def testQuotasTopicFailover(): Unit = {
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
def testQuotasTopicFailover(quorum: String): Unit = {
// Construct a quota object
val quotaEntity = QuotaEntity(Map(TenantUtils.TENANT_TAG -> "tenant1"))
val usageBroker0 = 1000.0
val usageBroker1 = 2000.0
// Get the coordinator of the partition of the quota object and store it
val partition = servers.head.quotaCoordinatorOpt.get.partitionFor(quotaEntity)
val initialLeader = TestUtils.waitForPartitionMetadata(servers, Topic.QUOTA_TOPIC_NAME, partition).leader
val partition = brokers.head.quotaCoordinatorOpt.get.partitionFor(quotaEntity)
val initialLeader = TestUtils.waitForPartitionMetadata(brokers, Topic.QUOTA_TOPIC_NAME, partition).leader
var coordinatorNode =
servers.head.metadataCache.getPartitionLeaderEndpoint(Topic.QUOTA_TOPIC_NAME, partition, config.interBrokerListenerName) match {
brokers.head.metadataCache.getPartitionLeaderEndpoint(Topic.QUOTA_TOPIC_NAME, partition, config.interBrokerListenerName) match {
case Some(node) => node
case None => fail(s"Could not get node for quotas topic partition $partition")
}
......@@ -165,17 +178,17 @@ class DynamicQuotasTest extends KafkaServerTestHarness {
}
NetworkClientUtils.sendAndReceive(networkClient, requestBroker0, time)
TestUtils.waitUntilTrue(() => servers(initialLeader).logManager.getLog(new TopicPartition(Topic.QUOTA_TOPIC_NAME, partition)).get.highWatermark > 0,
TestUtils.waitUntilTrue(() => brokers(initialLeader).logManager.getLog(new TopicPartition(Topic.QUOTA_TOPIC_NAME, partition)).get.highWatermark > 0,
s"Quota records were not successfully written to the log on broker $initialLeader")
val lastUpdatedTimestamp = servers(initialLeader).quotaCoordinatorOpt.get.quotaStateManager.getQuota(quotaEntity).get.lastUpdatedTimestamp
val lastUpdatedTimestamp = brokers(initialLeader).quotaCoordinatorOpt.get.quotaStateManager.getQuota(quotaEntity).get.lastUpdatedTimestamp
// Shut down the initial partition leader so that leadership moves to a new broker
servers(initialLeader).shutdown()
servers(initialLeader).awaitShutdown()
brokers(initialLeader).shutdown()
brokers(initialLeader).awaitShutdown()
val newLeader = TestUtils.waitForPartitionMetadata(servers, Topic.QUOTA_TOPIC_NAME, partition).leader
val newCoordinator = servers(newLeader).quotaCoordinatorOpt.get
val newLeader = TestUtils.waitForPartitionMetadata(brokers, Topic.QUOTA_TOPIC_NAME, partition).leader
val newCoordinator = brokers(newLeader).quotaCoordinatorOpt.get
val (error, loadedQuota) = newCoordinator.describeQuota(quotaEntity)
val quotaValue = loadedQuota.brokerQuotas(broker0)(ClientQuotaType.PRODUCE.toString)
......@@ -190,7 +203,7 @@ class DynamicQuotasTest extends KafkaServerTestHarness {
val requestBuilderBroker1 = getRequestBuilder(broker1, quotaEntity, ClientQuotaType.PRODUCE, usageBroker1)
coordinatorNode =
servers.head.metadataCache.getPartitionLeaderEndpoint(Topic.QUOTA_TOPIC_NAME, partition, config.interBrokerListenerName) match {
brokers.head.metadataCache.getPartitionLeaderEndpoint(Topic.QUOTA_TOPIC_NAME, partition, config.interBrokerListenerName) match {
case Some(node) => node
case None => fail(s"Could not get node for quotas topic partition $partition")
}
......@@ -211,18 +224,19 @@ class DynamicQuotasTest extends KafkaServerTestHarness {
s"Loaded quotas $quotaValueBroker1 did not match expected quota 80000.0")
}
@Test
def testQuotaExpiration(): Unit = {
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
def testQuotaExpiration(quorum: String): Unit = {
val quotaEntity = QuotaEntity(Map(TenantUtils.TENANT_TAG -> "tenant1"))
val partition = servers.head.quotaCoordinatorOpt.get.partitionFor(quotaEntity)
val partition = brokers.head.quotaCoordinatorOpt.get.partitionFor(quotaEntity)
val coordinatorNode =
servers.head.metadataCache.getPartitionLeaderEndpoint(Topic.QUOTA_TOPIC_NAME, partition, config.interBrokerListenerName) match {
brokers.head.metadataCache.getPartitionLeaderEndpoint(Topic.QUOTA_TOPIC_NAME, partition, config.interBrokerListenerName) match {
case Some(node) => node
case None => fail(s"Could not get node for quotas topic partition $partition")
}
servers.foreach { server =>
val brokerId = server.config.brokerId
brokers.foreach { broker =>
val brokerId = broker.config.brokerId
val requestBuilder = getRequestBuilder(brokerId, quotaEntity, ClientQuotaType.PRODUCE, (brokerId + 1) * 100.0)
def requestCallback(response: ClientResponse): Unit = {
......@@ -239,10 +253,10 @@ class DynamicQuotasTest extends KafkaServerTestHarness {
NetworkClientUtils.sendAndReceive(networkClient, request, time)
}
val quotas = servers(coordinatorNode.id).quotaCoordinatorOpt.get.describeQuota(quotaEntity)._2.brokerQuotas
servers.foreach { server =>
val quotas = brokers(coordinatorNode.id).quotaCoordinatorOpt.get.describeQuota(quotaEntity)._2.brokerQuotas
brokers.foreach { broker =>
// Each broker should have computed quota proportional to their reported usage
val brokerId = server.config.brokerId
val brokerId = broker.config.brokerId
val expected = (TestClientQuotaCallback.clusterLevelQuota / 6).toDouble * (brokerId + 1)
val actual = quotas(brokerId)(ClientQuotaType.PRODUCE.toString)
assertEquals(expected, actual, s"Got quota of $actual for broker $brokerId but expected $expected")
......@@ -269,34 +283,13 @@ class DynamicQuotasTest extends KafkaServerTestHarness {
// Wait 10 seconds to make sure that the quotas have expired and the thread has run, then describe the entity
Thread.sleep(10000)
val quotasAfterExpiration = servers(coordinatorNode.id).quotaCoordinatorOpt.get.describeQuota(quotaEntity)
val quotasAfterExpiration = brokers(coordinatorNode.id).quotaCoordinatorOpt.get.describeQuota(quotaEntity)
quotasAfterExpiration match { case (error: Errors, quotaDescription: QuotaDescription) =>
assertEquals(QuotaDescription(Map.empty), quotaDescription, s"Got description $quotaDescription when describing entity $quotaEntity, but expected an empty map")
assertEquals(Errors.QUOTA_ENTITY_NOT_FOUND, error, s"Got error $error but expected ${Errors.QUOTA_ENTITY_NOT_FOUND}")
}
}
/**
* Implementations must override this method to return a set of KafkaConfigs. This method will be invoked for every
* test and should not reuse previous configurations unless they select their ports randomly when servers are started.
*/
override def generateConfigs: collection.Seq[KafkaConfig] = {
TestUtils.createBrokerConfigs(numServers, zkConnect).map(KafkaConfig.fromProps(_, serverProps()))
}
private def serverProps(): Properties = {
val serverProps = new Properties()
serverProps.put(KafkaConfig.DynamicQuotaEnabledProp, true.toString)
serverProps.put(KafkaConfig.QuotasTopicPartitionsProp, numQuotasTopicPartitions.toString)
serverProps.put(KafkaConfig.QuotasTopicReplicationFactorProp, 2.toString)
serverProps.put(KafkaConfig.ControlledShutdownEnableProp, true.toString)
serverProps.put(KafkaConfig.ClientQuotaCallbackClassProp, classOf[TestClientQuotaCallback].getName)
serverProps.put(KafkaConfig.QuotasExpirationTimeMsProp, 0.toString) // Disable expiration by default to prevent flakiness
serverProps.put(KafkaConfig.QuotasExpirationIntervalMsProp, (2*1000).toString) // Check expiration frequently so that we can test it using a dynamic config
serverProps.put(ConfluentConfigs.MAX_BROKER_TENANT_PRODUCER_BYTE_RATE_CONFIG, brokerLimitProducer.toString)
serverProps
}
private def getRequestBuilder(brokerId: Int,
quotaEntity: QuotaEntity,
clientQuotaType: ClientQuotaType,
......@@ -318,6 +311,8 @@ class DynamicQuotasTest extends KafkaServerTestHarness {
.setEntries(List(entryData).asJava)
new ReportQuotaConsumptionRequest.Builder(requestData)
}
override protected def brokerCount: Int = numServers
}
object TestClientQuotaCallback {
......
0% 加载中 .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册