Skip to content
代码片段 群组 项目
提交 6515359f 编辑于 作者: David Jacot's avatar David Jacot
浏览文件

KAFKA-10458; Updating controller quota does not work since Token Bucket (#9272)

This PR fixes two issues that have been introduced by #9114.
- When the metric was switched from Rate to TokenBucket in the ControllerMutationQuotaManager, the metrics were mixed up. That broke the quota update path.
- When a quota is updated, the ClientQuotaManager updates the MetricConfig of the KafkaMetric. That update was not reflected into the Sensor so the Sensor was still using the MetricConfig that it has been created with.

Reviewers: Anna Povzner <anna@confluent.io>, Rajini Sivaram <rajinisivaram@googlemail.com>
上级 05ec79c7
No related branches found
No related tags found
无相关合并请求
......@@ -16,7 +16,6 @@
*/
package org.apache.kafka.common.metrics;
import java.util.stream.Collectors;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.CompoundStat.NamedMeasurable;
import org.apache.kafka.common.metrics.stats.TokenBucket;
......@@ -31,6 +30,8 @@ import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import static java.util.Arrays.asList;
import static java.util.Collections.unmodifiableList;
......@@ -54,12 +55,20 @@ public final class Sensor {
private final Object metricLock;
private static class StatAndConfig {
public final Stat stat;
public final MetricConfig config;
private final Stat stat;
private final Supplier<MetricConfig> configSupplier;
StatAndConfig(Stat stat, MetricConfig config) {
StatAndConfig(Stat stat, Supplier<MetricConfig> configSupplier) {
this.stat = stat;
this.config = config;
this.configSupplier = configSupplier;
}
public Stat stat() {
return stat;
}
public MetricConfig config() {
return configSupplier.get();
}
}
......@@ -212,7 +221,7 @@ public final class Sensor {
synchronized (metricLock()) {
// increment all the stats
for (StatAndConfig statAndConfig : this.stats) {
statAndConfig.stat.record(statAndConfig.config, value, timeMs);
statAndConfig.stat.record(statAndConfig.config(), value, timeMs);
}
}
if (checkQuotas)
......@@ -271,7 +280,7 @@ public final class Sensor {
return false;
final MetricConfig statConfig = config == null ? this.config : config;
stats.add(new StatAndConfig(Objects.requireNonNull(stat), statConfig));
stats.add(new StatAndConfig(Objects.requireNonNull(stat), () -> statConfig));
Object lock = metricLock();
for (NamedMeasurable m : stat.stats()) {
final KafkaMetric metric = new KafkaMetric(lock, m.name(), m.stat(), statConfig, time);
......@@ -317,7 +326,7 @@ public final class Sensor {
);
registry.registerMetric(metric);
metrics.put(metric.metricName(), metric);
stats.add(new StatAndConfig(Objects.requireNonNull(stat), statConfig));
stats.add(new StatAndConfig(Objects.requireNonNull(stat), metric::config));
return true;
}
}
......
......@@ -296,9 +296,39 @@ public class SensorTest {
Mockito.verify(stat1).record(stat1Config, 10, 1);
Mockito.verify(stat2).record(stat2Config, 10, 1);
Mockito.when(stat1.measure(stat1Config, 2)).thenReturn(2.0);
Mockito.when(stat2.measure(stat2Config, 2)).thenReturn(2.0);
sensor.checkQuotas(2);
Mockito.verify(stat1).measure(stat1Config, 2);
Mockito.verify(stat2).measure(stat2Config, 2);
metrics.close();
}
@Test
public void testUpdatingMetricConfigIsReflectedInTheSensor() {
final Time time = new MockTime(0, System.currentTimeMillis(), 0);
final Metrics metrics = new Metrics(time);
final Sensor sensor = metrics.sensor("sensor");
final MeasurableStat stat = Mockito.mock(MeasurableStat.class);
final MetricName statName = metrics.metricName("stat", "test-group");
final MetricConfig statConfig = new MetricConfig().quota(Quota.upperBound(5));
sensor.add(statName, stat, statConfig);
sensor.record(10, 1);
Mockito.verify(stat).record(statConfig, 10, 1);
sensor.checkQuotas(2);
Mockito.verify(stat).measure(statConfig, 2);
// Update the config of the KafkaMetric
final MetricConfig newConfig = new MetricConfig().quota(Quota.upperBound(10));
metrics.metric(statName).config(newConfig);
sensor.record(10, 3);
Mockito.verify(stat).record(newConfig, 10, 3);
sensor.checkQuotas(4);
Mockito.verify(stat).measure(newConfig, 4);
metrics.close();
}
......
......@@ -268,7 +268,7 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
val tenantsManager = activeTenantsManager.getOrElse(throw new IllegalStateException("ActiveTenantsManager not available"))
val activeTenants = tenantsManager.getActiveTenants(resetQuotaCallback)
activeTenants.foreach((metricTags: Map[String, String]) => {
val clientMetric = metrics.metrics().get(clientRateMetricName(metricTags))
val clientMetric = metrics.metrics().get(clientQuotaMetricName(metricTags))
if (clientMetric != null) {
clientMetric.config(getQuotaMetricConfig(quotaLimit(metricTags.asJava)))
}
......@@ -417,7 +417,7 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
}
private def dynamicQuotaLimit(metricTags: Map[String, String], defaultVal: Option[Double] = None): Double = {
val clientMetric = metrics.metrics().get(clientRateMetricName(metricTags))
val clientMetric = metrics.metrics().get(clientQuotaMetricName(metricTags))
if (clientMetric != null) clientMetric.config.quota.bound() else defaultVal.getOrElse(quotaLimit(metricTags.asJava))
}
......@@ -462,7 +462,7 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
protected def registerQuotaMetrics(metricTags: Map[String, String])(sensor: Sensor): Unit = {
sensor.add(
clientRateMetricName(metricTags),
clientQuotaMetricName(metricTags),
new Rate,
getQuotaMetricConfig(metricTags)
)
......@@ -549,7 +549,7 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
var backpressured = false
var totalUsage = 0.0
activeTenants.foreach((metricTags: Map[String, String]) => {
val clientMetric = metrics.metrics().get(clientRateMetricName(metricTags))
val clientMetric = metrics.metrics().get(clientQuotaMetricName(metricTags))
if (clientMetric != null) {
totalUsage += clientMetric.metricValue.asInstanceOf[Double]
backpressured = backpressured || quotaLimit(metricTags.asJava) != dynamicQuotaLimit(metricTags)
......@@ -579,8 +579,8 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
debug(s"Auto-tuning active tenants' $clientQuotaType quotas when total tenant usage is $totalUsage and broker quota limit is $getBrokerQuotaLimit")
val sortedTenants = activeTenants.toList.sortWith((metricTags1: Map[String, String], metricTags2: Map[String, String]) => {
val clientMetric1 = metrics.metrics().get(clientRateMetricName(metricTags1))
val clientMetric2 = metrics.metrics().get(clientRateMetricName(metricTags2))
val clientMetric1 = metrics.metrics().get(clientQuotaMetricName(metricTags1))
val clientMetric2 = metrics.metrics().get(clientQuotaMetricName(metricTags2))
clientMetric1.metricValue.asInstanceOf[Double] < clientMetric2.metricValue.asInstanceOf[Double]
})
......@@ -588,7 +588,7 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
var remainingCapacity = getBrokerQuotaLimit
for (metricTags <- sortedTenants) {
val clientMetric = metrics.metrics().get(clientRateMetricName(metricTags))
val clientMetric = metrics.metrics().get(clientQuotaMetricName(metricTags))
val clientUsage = clientMetric.metricValue.asInstanceOf[Double]
var newLimit = remainingCapacity / remainingClients
......@@ -608,7 +608,7 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
}
def resetQuotaCallback(metricTags: Map[String, String]): Unit = {
val clientMetric = metrics.metrics().get(clientRateMetricName(metricTags))
val clientMetric = metrics.metrics().get(clientQuotaMetricName(metricTags))
if (clientMetric != null) {
val dynamicLimit = dynamicQuotaLimit(metricTags)
val originalLimit = quotaLimit(metricTags.asJava)
......@@ -709,7 +709,7 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
val clientId = quotaEntity.clientId
val metricTags = Map(DefaultTags.User -> user, DefaultTags.ClientId -> clientId)
val quotaMetricName = clientRateMetricName(metricTags)
val quotaMetricName = clientQuotaMetricName(metricTags)
// Change the underlying metric config if the sensor has been created
val metric = allMetrics.get(quotaMetricName)
if (metric != null) {
......@@ -719,7 +719,7 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
}
}
} else {
val quotaMetricName = clientRateMetricName(Map.empty)
val quotaMetricName = clientQuotaMetricName(Map.empty)
allMetrics.forEach { (metricName, metric) =>
if (metricName.name == quotaMetricName.name && metricName.group == quotaMetricName.group) {
val metricTags = metricName.tags
......@@ -734,7 +734,11 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
}
}
protected def clientRateMetricName(quotaMetricTags: Map[String, String]): MetricName = {
/**
* Returns the MetricName of the metric used for the quota. The name is used to create the
* metric but also to find the metric when the quota is changed.
*/
protected def clientQuotaMetricName(quotaMetricTags: Map[String, String]): MetricName = {
metrics.metricName("byte-rate", quotaType.toString,
"Tracking byte-rate per user/client-id",
quotaMetricTags.asJava)
......
......@@ -123,7 +123,7 @@ class ClientRequestQuotaManager(private val config: ClientQuotaManagerConfig,
QuotaUtils.boundedThrottleTime(e, maxThrottleTimeMs, timeMs)
}
override protected def clientRateMetricName(quotaMetricTags: Map[String, String]): MetricName = {
override protected def clientQuotaMetricName(quotaMetricTags: Map[String, String]): MetricName = {
metrics.metricName("request-time", QuotaType.Request.toString,
"Tracking request-time per user/client-id",
quotaMetricTags.asJava)
......
......@@ -168,15 +168,15 @@ class ControllerMutationQuotaManager(private val config: ClientQuotaManagerConfi
private val quotaCallback: Option[ClientQuotaCallback])
extends ClientQuotaManager(config, metrics, QuotaType.ControllerMutation, time, threadNamePrefix, quotaCallback) {
override protected def clientRateMetricName(quotaMetricTags: Map[String, String]): MetricName = {
metrics.metricName("mutation-rate", QuotaType.ControllerMutation.toString,
"Tracking mutation-rate per user/client-id",
override protected def clientQuotaMetricName(quotaMetricTags: Map[String, String]): MetricName = {
metrics.metricName("tokens", QuotaType.ControllerMutation.toString,
"Tracking remaining tokens in the token bucket per user/client-id",
quotaMetricTags.asJava)
}
private def clientTokenBucketMetricName(quotaMetricTags: Map[String, String]): MetricName = {
metrics.metricName("tokens", QuotaType.ControllerMutation.toString,
"Tracking remaining tokens in the token bucket per user/client-id",
private def clientRateMetricName(quotaMetricTags: Map[String, String]): MetricName = {
metrics.metricName("mutation-rate", QuotaType.ControllerMutation.toString,
"Tracking mutation-rate per user/client-id",
quotaMetricTags.asJava)
}
......@@ -186,7 +186,7 @@ class ControllerMutationQuotaManager(private val config: ClientQuotaManagerConfi
new Rate
)
sensor.add(
clientTokenBucketMetricName(metricTags),
clientQuotaMetricName(metricTags),
new TokenBucket,
getQuotaMetricConfig(metricTags)
)
......
......@@ -17,6 +17,7 @@ import java.util.Properties
import java.util.concurrent.ExecutionException
import java.util.concurrent.TimeUnit
import kafka.server.ClientQuotaManager.DefaultTags
import kafka.utils.TestUtils
import org.apache.kafka.common.internals.KafkaFutureImpl
import org.apache.kafka.common.message.CreatePartitionsRequestData
......@@ -24,6 +25,7 @@ import org.apache.kafka.common.message.CreatePartitionsRequestData.CreatePartiti
import org.apache.kafka.common.message.CreateTopicsRequestData
import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic
import org.apache.kafka.common.message.DeleteTopicsRequestData
import org.apache.kafka.common.metrics.KafkaMetric
import org.apache.kafka.common.protocol.ApiKeys
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.quota.ClientQuotaAlteration
......@@ -41,6 +43,7 @@ import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.security.auth.KafkaPrincipalBuilder
import org.junit.Assert.assertEquals
import org.junit.Assert.assertTrue
import org.junit.Assert.fail
import org.junit.{Before, Ignore, Test}
import scala.jdk.CollectionConverters._
......@@ -79,6 +82,8 @@ object ControllerMutationQuotaTest {
val TopicsWith30Partitions = Map(Topic1 -> 30, Topic2 -> 30)
val TopicsWith31Partitions = Map(Topic1 -> 31, Topic2 -> 31)
val ControllerQuotaSamples = 10
val ControllerQuotaWindowSizeSeconds = 1
val ControllerMutationRate = 2.0
}
......@@ -94,8 +99,8 @@ class ControllerMutationQuotaTest extends BaseRequestTest {
properties.put(KafkaConfig.PrincipalBuilderClassProp,
classOf[ControllerMutationQuotaTest.TestPrincipalBuilder].getName)
// Specify number of samples and window size.
properties.put(KafkaConfig.NumControllerQuotaSamplesProp, "10")
properties.put(KafkaConfig.ControllerQuotaWindowSizeSecondsProp, "1")
properties.put(KafkaConfig.NumControllerQuotaSamplesProp, ControllerQuotaSamples.toString)
properties.put(KafkaConfig.ControllerQuotaWindowSizeSecondsProp, ControllerQuotaWindowSizeSeconds.toString)
}
@Before
......@@ -123,6 +128,29 @@ class ControllerMutationQuotaTest extends BaseRequestTest {
waitUserQuota(principal.getName, Long.MaxValue)
}
@Ignore // XXX: CNKAF-1112
@Test
def testQuotaMetric(): Unit = {
asPrincipal(ThrottledPrincipal) {
// Metric is lazily created
assertTrue(quotaMetric(principal.getName).isEmpty)
// Create a topic to create the metrics
val (_, errors) = createTopics(Map("topic" -> 1), StrictDeleteTopicsRequestVersion)
assertEquals(Set(Errors.NONE), errors.values.toSet)
// Metric must be there with the correct config
verifyQuotaMetric(principal.getName, ControllerMutationRate)
// Update quota
defineUserQuota(ThrottledPrincipal.getName, Some(ControllerMutationRate * 2))
waitUserQuota(ThrottledPrincipal.getName, ControllerMutationRate * 2)
// Metric must be there with the updated config
verifyQuotaMetric(principal.getName, ControllerMutationRate * 2)
}
}
@Ignore // XXX: CNKAF-1112
@Test
def testStrictCreateTopicsRequest(): Unit = {
......@@ -349,6 +377,29 @@ class ControllerMutationQuotaTest extends BaseRequestTest {
}, s"Quota of $user is not $expectedQuota but $actualQuota")
}
private def quotaMetric(user: String): Option[KafkaMetric] = {
val metrics = servers.head.metrics
val metricName = metrics.metricName(
"tokens",
QuotaType.ControllerMutation.toString,
"Tracking remaining tokens in the token bucket per user/client-id",
Map(DefaultTags.User -> user, DefaultTags.ClientId -> "").asJava)
Option(servers.head.metrics.metric(metricName))
}
private def verifyQuotaMetric(user: String, expectedQuota: Double): Unit = {
quotaMetric(user) match {
case Some(metric) =>
val config = metric.config()
assertEquals(expectedQuota, config.quota().bound(), 0.1)
assertEquals(ControllerQuotaSamples, config.samples())
assertEquals(ControllerQuotaWindowSizeSeconds * 1000, config.timeWindowMs())
case None =>
fail(s"Quota metric of $user is not defined")
}
}
private def alterClientQuotas(request: Map[ClientQuotaEntity, Map[String, Option[Double]]]): Map[ClientQuotaEntity, KafkaFutureImpl[Void]] = {
val entries = request.map { case (entity, alter) =>
val ops = alter.map { case (key, value) =>
......
0% 加载中 .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册