Skip to content
代码片段 群组 项目
未验证 提交 83bbf7e8 编辑于 作者: Vikas Singh's avatar Vikas Singh 提交者: GitHub
浏览文件

KAFKALESS-1216: Add ReplicaEntity and ReplicaMetricSample classes (#6710)

* KAFKALESS-1216: Add ReplicaEntity and ReplicaMetricSample classes

This change adds a ReplicaMetricSample and ReplicaEntity class that can
be used to capture replica level metrics. This is very simlar to the
existing PartitionMetricSample/PartitionEntity and
BrokerMetricSample/BrokerEntity clases.

Added a test to make sure ser/deser works (we may take that out along
with SampleStore later)
上级 f01c894f
No related branches found
No related tags found
无相关合并请求
/*
* Copyright (C) 2022 Confluent Inc.
*/
package com.linkedin.kafka.cruisecontrol.monitor.sampling.holder;
import com.linkedin.cruisecontrol.model.Entity;
import com.linkedin.cruisecontrol.monitor.sampling.aggregator.MetricSampleAggregator;
import org.apache.kafka.common.TopicPartition;
import java.util.Objects;
/**
* The {@link Entity} class representing a Replica used by {@link MetricSampleAggregator}
*/
public class ReplicaEntity extends Entity<String> {
private final TopicPartition tp;
private final int brokerId;
private final boolean isLeader;
public ReplicaEntity(TopicPartition tp, int brokerId, boolean isLeader) {
this.tp = tp;
this.brokerId = brokerId;
this.isLeader = isLeader;
}
@Override
public String group() {
return tp.toString();
}
public TopicPartition tp() {
return tp;
}
public int brokerId() {
return brokerId;
}
public boolean isLeader() {
return isLeader;
}
@Override
public int hashCode() {
return Objects.hash(tp, brokerId);
}
@Override
public boolean equals(Object other) {
return other instanceof ReplicaEntity
&& tp.equals(((ReplicaEntity) other).tp())
&& this.brokerId == ((ReplicaEntity) other).brokerId;
}
}
/*
* Copyright (C) 2022 Confluent Inc.
*/
package com.linkedin.kafka.cruisecontrol.monitor.sampling.holder;
import com.linkedin.cruisecontrol.metricdef.MetricDef;
import com.linkedin.cruisecontrol.monitor.sampling.MetricSample;
import com.linkedin.kafka.cruisecontrol.monitor.metricdefinition.KafkaMetricDef;
import org.apache.kafka.common.TopicPartition;
import java.util.Date;
/**
* The class that hosts all the metric samples for a replica of a partition.
*/
public class ReplicaMetricSample extends MetricSample<String, ReplicaEntity> {
public ReplicaMetricSample(int brokerId, TopicPartition tp, boolean isLeader) {
super(new ReplicaEntity(tp, brokerId, isLeader));
}
/**
* The id of the broker from which the metrics are from.
*/
public int brokerId() {
return entity().brokerId();
}
@Override
public String toString() {
MetricDef metricDef = KafkaMetricDef.commonMetricDef();
StringBuilder builder = new StringBuilder().append("{");
valuesByMetricId.forEach((key, value) -> builder.append(metricDef.metricInfo(key).name())
.append("=")
.append(value.toString())
.append(", "));
if (!valuesByMetricId.isEmpty()) {
builder.delete(builder.length() - 2, builder.length());
}
builder.append("}");
return String.format("[brokerId: %d, Partition: %s, time: %s, metrics: %s]", entity.brokerId(), entity().tp(),
new Date(sampleTime), builder);
}
}
/*
* Copyright (C) 2022 Confluent Inc.
*/
package com.linkedin.kafka.cruisecontrol.monitor.sampling.holder;
import com.linkedin.kafka.cruisecontrol.monitor.metricdefinition.KafkaMetricDef;
import org.apache.kafka.common.TopicPartition;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertThrows;
/**
* The unit test for {@link ReplicaMetricSample}
*/
public class ReplicaMetricSampleTest {
@Test
public void testRecordAfterClose() {
ReplicaMetricSample sample = new ReplicaMetricSample(0, new TopicPartition("topic", 0), false);
sample.close(0);
assertThrows(IllegalStateException.class,
() -> sample.record(KafkaMetricDef.commonMetricDefInfo(KafkaMetricDef.CPU_USAGE), 10.0));
}
@Test
public void testRecordSameResourceMetricAgain() {
ReplicaMetricSample sample = new ReplicaMetricSample(0, new TopicPartition("topic", 0), false);
sample.record(KafkaMetricDef.commonMetricDefInfo(KafkaMetricDef.CPU_USAGE), 20.0);
assertThrows(IllegalStateException.class,
() -> sample.record(KafkaMetricDef.commonMetricDefInfo(KafkaMetricDef.CPU_USAGE), 10.0));
}
}
0% 加载中 .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册