Skip to content
代码片段 群组 项目
未验证 提交 0f08da02 编辑于 作者: Vikas Singh's avatar Vikas Singh 提交者: GitHub
浏览文件

KAFKALESS-1217: Move TopicPartition to PartitionInfo (#6711)

* KAFKALESS-1217: Move TopicPartition to PartitionInfo

SBC metric collection code uses TopicPartition class to keep track of
topic metrics. However this class only contains partition information
and there is no replica information. The PartitionInfo class instead has
all the replica and ISR information. This is also the class that is
present in the Cluster class that we get from cluster metadata.

This change updates the code to use PartitionInfo instead of
TopicPartition so that replica level metrics can be captured. This is
simple refactoring, all tests pass.
上级 6e7f128d
No related branches found
No related tags found
无相关合并请求
显示
69 个添加120 个删除
......@@ -15,7 +15,7 @@ import com.linkedin.kafka.cruisecontrol.monitor.sampling.holder.BrokerMetricSamp
import com.linkedin.kafka.cruisecontrol.monitor.sampling.holder.PartitionMetricSample;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.PartitionInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -29,7 +29,6 @@ import static com.linkedin.kafka.cruisecontrol.monitor.sampling.SamplingUtils.bu
import static com.linkedin.kafka.cruisecontrol.monitor.sampling.SamplingUtils.buildPartitionMetricSample;
import static com.linkedin.kafka.cruisecontrol.monitor.sampling.SamplingUtils.leaderDistribution;
/**
* Process the raw metrics collected by {@link io.confluent.cruisecontrol.metricsreporter.ConfluentTelemetryReporterSampler} from the Kafka cluster.
*/
......@@ -117,7 +116,7 @@ public class CruiseControlMetricsProcessor {
* @return the constructed metric samples.
*/
public MetricSampler.Samples process(Cluster cluster,
Set<TopicPartition> partitions) {
Set<PartitionInfo> partitions) {
updateCachedNumCoresByBroker(cluster);
updateDiskCapacityByBroker(cluster);
// Theoretically we should not move forward at all if a broker reported a different all topic bytes in from all
......@@ -155,14 +154,14 @@ public class CruiseControlMetricsProcessor {
* @return The number of skipped partitions.
*/
private int addPartitionMetricSamples(Cluster cluster,
Set<TopicPartition> topicPartitions,
Set<PartitionInfo> topicPartitions,
Set<PartitionMetricSample> partitionMetricSamples) {
int skippedPartition = 0;
int loggedPartitionErrors = 0;
Map<Integer, Map<String, Integer>> leaderDistribution = leaderDistribution(cluster);
for (TopicPartition topicPartition : topicPartitions) {
for (PartitionInfo topicPartition : topicPartitions) {
try {
PartitionMetricSample sample = buildPartitionMetricSample(cluster, leaderDistribution, topicPartition,
PartitionMetricSample sample = buildPartitionMetricSample(leaderDistribution, topicPartition,
brokerLoad, maxMetricTimestamp, cachedNumCoresByBroker);
if (sample != null) {
LOG.trace("Added partition metrics sample for {}.", topicPartition);
......
......@@ -6,18 +6,12 @@ package com.linkedin.kafka.cruisecontrol.monitor.sampling;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import static com.linkedin.kafka.cruisecontrol.monitor.sampling.MetricFetcherManager.SUPPORTED_NUM_METRIC_FETCHER;
import java.util.stream.Collectors;
/**
* The default implementation of metric sampler partition assignor.
......@@ -35,29 +29,10 @@ public class DefaultMetricSamplerPartitionAssignor implements MetricSamplerParti
}
@SuppressWarnings("deprecation")
@Override
public List<Set<TopicPartition>> assignPartitions(Cluster cluster, int numMetricFetchers) {
if (numMetricFetchers != SUPPORTED_NUM_METRIC_FETCHER) {
throw new IllegalArgumentException("DefaultMetricSamplerPartitionAssignor supports only a single metric fetcher.");
}
// Create an array to host the assignment of all the metric fetchers.
List<Set<TopicPartition>> assignments = new ArrayList<>();
assignments.add(assignPartitions(cluster));
return assignments;
}
@Override
public Set<TopicPartition> assignPartitions(Cluster cluster) {
// Create an array to host the assignment of the metric fetcher.
Set<TopicPartition> assignment = new HashSet<>();
for (String topic : cluster.topics()) {
List<PartitionInfo> partitionsForTopic = cluster.partitionsForTopic(topic);
for (PartitionInfo partitionInfo : partitionsForTopic) {
assignment.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
}
}
LOG.trace("Partition assignment for metric fetcher: {}", assignment);
return assignment;
public Set<PartitionInfo> assignPartitions(Cluster cluster) {
return cluster.topics().stream()
.flatMap(topic -> cluster.partitionsForTopic(topic).stream())
.collect(Collectors.toSet());
}
}
......@@ -25,12 +25,11 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import io.confluent.databalancer.metrics.DataBalancerMetricsRegistry;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* The class manages the metric fetchers. It periodically kicks off the sampling and refreshes the metadata as well.
*/
......@@ -133,7 +132,7 @@ public class MetricFetcherManager {
SampleStore sampleStore) {
LOG.debug("Kicking off partition metric sampling for time range [{}, {}], duration {} ms with timeout {} ms.",
startMs, endMs, endMs - startMs, timeoutMs);
Set<TopicPartition> partitionAssignment = partitionAssignor.assignPartitions(metadataClient.cluster());
Set<PartitionInfo> partitionAssignment = partitionAssignor.assignPartitions(metadataClient.cluster());
MetricFetcher samplingFetcher = new SamplingFetcher(metricSampler,
metadataClient,
partitionMetricSampleAggregator,
......
......@@ -12,7 +12,7 @@ import com.linkedin.kafka.cruisecontrol.monitor.sampling.holder.PartitionMetricS
import java.util.Collections;
import java.util.Set;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.PartitionInfo;
/**
* The interface to get metric samples of given topic partitions.
......@@ -52,7 +52,7 @@ public interface MetricSampler extends CruiseControlConfigurable, AutoCloseable
* @return the PartitionMetricSample of the topic partition and replica id
*/
Samples getSamples(Cluster cluster,
Set<TopicPartition> assignedPartitions,
Set<PartitionInfo> assignedPartitions,
long startTimeMs,
long endTimeMs,
MetricDef metricDef,
......
......@@ -6,9 +6,8 @@ package com.linkedin.kafka.cruisecontrol.monitor.sampling;
import com.linkedin.cruisecontrol.common.CruiseControlConfigurable;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.PartitionInfo;
import java.util.List;
import java.util.Set;
/**
......@@ -16,22 +15,11 @@ import java.util.Set;
*/
public interface MetricSamplerPartitionAssignor extends CruiseControlConfigurable {
/**
* @deprecated Please use {@link #assignPartitions(Cluster)}.
* Assign the partitions in the cluster to the metric fetchers.
*
* @param cluster The Kafka cluster.
* @param numFetchers The number of metric fetchers.
* @return A List of partition assignment for each of the fetchers.
*/
@Deprecated
List<Set<TopicPartition>> assignPartitions(Cluster cluster, int numFetchers);
/**
* Assign the partitions in the cluster to the single metric fetcher.
*
* @param cluster The Kafka cluster
* @return Set of topic partitions assigned to the fetcher.
*/
Set<TopicPartition> assignPartitions(Cluster cluster);
Set<PartitionInfo> assignPartitions(Cluster cluster);
}
......@@ -10,13 +10,12 @@ import java.util.Set;
import com.linkedin.kafka.cruisecontrol.exception.MetricSamplingException;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.PartitionInfo;
public class NoopSampler implements MetricSampler {
@Override
public Samples getSamples(Cluster cluster, Set<TopicPartition> assignedPartitions, long startTimeMs, long endTimeMs,
public Samples getSamples(Cluster cluster, Set<PartitionInfo> assignedPartitions, long startTimeMs, long endTimeMs,
MetricDef metricDef, long timeout) throws MetricSamplingException {
return null;
}
......
......@@ -16,8 +16,10 @@ import com.linkedin.kafka.cruisecontrol.monitor.sampling.holder.PartitionMetricS
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import java.util.stream.Collectors;
import com.yammer.metrics.core.TimerContext;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -33,7 +35,8 @@ class SamplingFetcher extends MetricFetcher {
private final KafkaPartitionMetricSampleAggregator partitionMetricSampleAggregator;
private final KafkaBrokerMetricSampleAggregator brokerMetricSampleAggregator;
private final SampleStore sampleStore;
private final Set<TopicPartition> assignedPartitions;
private final Set<PartitionInfo> assignedPartitions;
private final Set<TopicPartition> assignedTopicPartitions;
private final long startTimeMs;
private final long endTimeMs;
private final boolean leaderValidation;
......@@ -47,7 +50,7 @@ class SamplingFetcher extends MetricFetcher {
KafkaPartitionMetricSampleAggregator partitionMetricSampleAggregator,
KafkaBrokerMetricSampleAggregator brokerMetricSampleAggregator,
SampleStore sampleStore,
Set<TopicPartition> assignedPartitions,
Set<PartitionInfo> assignedPartitions,
long startTimeMs,
long endTimeMs,
boolean leaderValidation,
......@@ -67,6 +70,10 @@ class SamplingFetcher extends MetricFetcher {
this.fetchTimer = fetchTimer;
this.fetchFailureRate = fetchFailureRate;
timeout = System.currentTimeMillis() + (endTimeMs - startTimeMs) / 2;
assignedTopicPartitions = assignedPartitions.stream()
.map(pi -> new TopicPartition(pi.topic(), pi.partition()))
.collect(Collectors.toSet());
}
/**
......@@ -110,7 +117,7 @@ class SamplingFetcher extends MetricFetcher {
private void addPartitionSamples(Set<PartitionMetricSample> partitionMetricSamples) {
// Give an initial capacity to avoid resizing.
Set<TopicPartition> returnedPartitions = new HashSet<>(assignedPartitions.size());
int returnedPartitions = 0;
// Ignore the null value if the metric sampler did not return a sample
if (partitionMetricSamples != null) {
int discarded = 0;
......@@ -118,7 +125,7 @@ class SamplingFetcher extends MetricFetcher {
while (iter.hasNext()) {
PartitionMetricSample partitionMetricSample = iter.next();
TopicPartition tp = partitionMetricSample.entity().tp();
if (assignedPartitions.contains(tp)) {
if (assignedTopicPartitions.contains(tp)) {
// we close the metric sample in case the implementation forgot to do so.
partitionMetricSample.close(endTimeMs);
// We remove the sample from the returning set if it is not accepted.
......@@ -129,14 +136,14 @@ class SamplingFetcher extends MetricFetcher {
discarded++;
LOG.trace("Failed to add partition metric sample {}", partitionMetricSample);
}
returnedPartitions.add(tp);
returnedPartitions++;
} else {
LOG.warn("Collected partition metric sample for partition {} which is not an assigned partition. "
+ "The metric sample will be ignored.", tp);
}
}
LOG.debug("Collected {} ({} discarded) partition metric samples for {} partitions. Total partition assigned: {}.",
partitionMetricSamples.size(), discarded, returnedPartitions.size(), assignedPartitions.size());
partitionMetricSamples.size(), discarded, returnedPartitions, assignedTopicPartitions.size());
} else {
LOG.warn("Failed to collect partition metric samples for {} assigned partitions", assignedPartitions.size());
}
......
......@@ -15,6 +15,7 @@ import com.linkedin.kafka.cruisecontrol.monitor.sampling.holder.BrokerMetricSamp
import com.linkedin.kafka.cruisecontrol.monitor.sampling.holder.PartitionMetricSample;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -32,8 +33,6 @@ import static com.linkedin.kafka.cruisecontrol.metricsreporter.metric.RawMetricT
import static com.linkedin.kafka.cruisecontrol.metricsreporter.metric.RawMetricType.BROKER_CPU_UTIL;
import static com.linkedin.kafka.cruisecontrol.metricsreporter.metric.RawMetricType.PARTITION_SIZE;
public class SamplingUtils {
private static final Logger LOG = LoggerFactory.getLogger(SamplingUtils.class);
private static final String SKIP_BUILDING_SAMPLE_PREFIX = "Skip generating metric sample for ";
......@@ -96,13 +95,12 @@ public class SamplingUtils {
* @param cachedNumCoresByBroker Cached number of cores by broker.
* @return Metric sample populated with topic and partition metrics, or {@code null} if sample generation is skipped.
*/
static PartitionMetricSample buildPartitionMetricSample(Cluster cluster,
Map<Integer, Map<String, Integer>> leaderDistribution,
TopicPartition topicPartition,
static PartitionMetricSample buildPartitionMetricSample(Map<Integer, Map<String, Integer>> leaderDistribution,
PartitionInfo topicPartition,
Map<Integer, BrokerLoad> brokerLoadById,
long maxMetricTimestamp,
Map<Integer, Short> cachedNumCoresByBroker) {
Node leaderNode = cluster.leaderFor(topicPartition);
Node leaderNode = topicPartition.leader();
if (leaderNode == null) {
LOG.trace("Partition {} has no current leader.", topicPartition);
return null;
......@@ -115,7 +113,7 @@ public class SamplingUtils {
// Fill in all the common metrics.
MetricDef commonMetricDef = KafkaMetricDef.commonMetricDef();
PartitionMetricSample pms = new PartitionMetricSample(leaderId, topicPartition);
PartitionMetricSample pms = new PartitionMetricSample(leaderId, new TopicPartition(topicPartition.topic(), topicPartition.partition()));
int numLeaders = leaderDistribution.get(leaderId).get(topicPartition.topic());
for (RawMetricType rawMetricType : RawMetricType.topicMetricTypes()) {
double sampleValue = numLeaders == 0 ? 0 : (brokerLoad.topicMetrics(topicPartition.topic(), rawMetricType)) / numLeaders;
......@@ -178,7 +176,7 @@ public class SamplingUtils {
* @param cachedNumCoresByBroker Cached number of cores by broker.
* @return True to skip generating partition metric sample, false otherwise.
*/
private static boolean skipBuildingPartitionMetricSample(TopicPartition topicPartition,
private static boolean skipBuildingPartitionMetricSample(PartitionInfo topicPartition,
int leaderId,
BrokerLoad brokerLoad,
Map<Integer, Short> cachedNumCoresByBroker) {
......
......@@ -129,11 +129,6 @@ public class ConfluentMetricsReporterSampler extends ConfluentMetricsSamplerBase
return metricList;
}
@Override
protected String defaultMetricSamplerGroupId() {
return "ConfluentMetricsReporterSampler";
}
private CruiseControlMetric convertVolumeMetrics(ConfluentMetric.MetricsMessage metricsMessage) {
List<ConfluentMetric.VolumeMetrics> volumes = metricsMessage.getSystemMetrics().getVolumesList();
if (!volumes.isEmpty()) {
......
......@@ -18,6 +18,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
......@@ -166,7 +167,7 @@ public abstract class ConfluentMetricsSamplerBase implements MetricSampler {
*/
@Override
public Samples getSamples(Cluster cluster,
Set<TopicPartition> assignedPartitions,
Set<PartitionInfo> assignedPartitions,
long startTimeMs,
long endTimeMs,
MetricDef metricDef,
......@@ -268,8 +269,6 @@ public abstract class ConfluentMetricsSamplerBase implements MetricSampler {
protected abstract List<CruiseControlMetric> convertMetricRecord(ConsumerRecord<byte[], byte[]> record);
protected abstract String defaultMetricSamplerGroupId();
@SuppressWarnings("deprecation")
@Override
public void configure(Map<String, ?> configs) {
......
......@@ -120,11 +120,6 @@ public class ConfluentTelemetryReporterSampler extends ConfluentMetricsSamplerBa
return ccMetrics;
}
@Override
protected String defaultMetricSamplerGroupId() {
return "ConfluentTelemetryReporterSampler";
}
private void createCruiseControlMetrics(String name, Point point, int brokerId, Map<String, String> labels, List<CruiseControlMetric> ccMetrics) {
String topic = labels.get(TOPIC_KEY);
long timestamp = Instant.ofEpochSecond(point.getTimestamp().getSeconds(), point.getTimestamp().getNanos()).toEpochMilli();
......
......@@ -17,6 +17,7 @@ import java.util.Set;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.Time;
......@@ -46,7 +47,7 @@ public class MockSampler implements MetricSampler {
*/
@Override
public Samples getSamples(Cluster cluster,
Set<TopicPartition> assignedPartitions,
Set<PartitionInfo> assignedPartitions,
long startTime,
long endTime,
MetricDef metricDef,
......@@ -57,12 +58,12 @@ public class MockSampler implements MetricSampler {
throw new MetricSamplingException("Error");
}
Set<PartitionMetricSample> partitionMetricSamples = new HashSet<>(assignedPartitions.size());
for (TopicPartition tp : assignedPartitions) {
Node leader = cluster.partition(tp).leader();
for (PartitionInfo partitionInfo : assignedPartitions) {
Node leader = partitionInfo.leader();
if (leader == null) {
continue;
}
PartitionMetricSample sample = new PartitionMetricSample(leader.id(), tp);
PartitionMetricSample sample = new PartitionMetricSample(leader.id(), new TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
long now = time.milliseconds();
for (MetricInfo metricInfo : KafkaMetricDef.commonMetricDef().all()) {
......
......@@ -35,8 +35,8 @@ import static com.linkedin.kafka.cruisecontrol.common.TestConstants.TOPIC1;
import static com.linkedin.kafka.cruisecontrol.common.TestConstants.TOPIC_WITH_DOT;
import static com.linkedin.kafka.cruisecontrol.model.ModelUtils.estimateLeaderCpuUtil;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
/**
......@@ -77,7 +77,7 @@ public class CruiseControlMetricsProcessorTest {
private static final double T1P1_BYTES_SIZE = 300.0;
private static final double T3P0_BYTES_SIZE = 200.0;
private static final double T3P1_BYTES_SIZE = 500.0;
private static final Set<TopicPartition> TEST_PARTITIONS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(T1P0, T1P1, T3P0, T3P1)));
private static Set<PartitionInfo> testPartitions;
private static final Map<TopicPartition, Double> CPU_UTIL = new HashMap<>(4);
static {
CPU_UTIL.put(T1P0, MOCK_NUM_CPU_CORES *
......@@ -134,7 +134,7 @@ public class CruiseControlMetricsProcessorTest {
}
Cluster cluster = getCluster();
processor.process(cluster, TEST_PARTITIONS);
processor.process(cluster, testPartitions);
for (Node node : cluster.nodes()) {
assertNull(processor.cachedNumCoresByBroker().get(node.id()));
}
......@@ -150,7 +150,7 @@ public class CruiseControlMetricsProcessorTest {
for (CruiseControlMetric metric : metrics) {
processor.addMetric(metric);
}
processor.process(cluster, TEST_PARTITIONS);
processor.process(cluster, testPartitions);
assertEquals(MOCK_NUM_CPU_CORES, (short) processor.cachedNumCoresByBroker().get(BROKER_ID_0));
assertNull(processor.cachedNumCoresByBroker().get(BROKER_ID_1));
}
......@@ -176,7 +176,7 @@ public class CruiseControlMetricsProcessorTest {
for (CruiseControlMetric cruiseControlMetric : metrics) {
processor.addMetric(cruiseControlMetric);
}
processor.process(cluster, TEST_PARTITIONS);
processor.process(cluster, testPartitions);
}
@Test
......@@ -196,7 +196,7 @@ public class CruiseControlMetricsProcessorTest {
for (CruiseControlMetric cruiseControlMetric : metrics) {
processor.addMetric(cruiseControlMetric);
}
processor.process(cluster, TEST_PARTITIONS);
processor.process(cluster, testPartitions);
}
@Test
......@@ -206,7 +206,7 @@ public class CruiseControlMetricsProcessorTest {
Cluster cluster = getCluster();
metrics.forEach(processor::addMetric);
MetricSampler.Samples samples = processor.process(cluster, TEST_PARTITIONS);
MetricSampler.Samples samples = processor.process(cluster, testPartitions);
for (Node node : cluster.nodes()) {
assertEquals(MOCK_NUM_CPU_CORES, (short) processor.cachedNumCoresByBroker().get(node.id()));
}
......@@ -243,7 +243,7 @@ public class CruiseControlMetricsProcessorTest {
}
}
assertTrue(!samples.partitionMetricSamples().isEmpty());
assertFalse(samples.partitionMetricSamples().isEmpty());
}
@Test
......@@ -258,7 +258,7 @@ public class CruiseControlMetricsProcessorTest {
}
}
Cluster cluster = getCluster();
MetricSampler.Samples samples = processor.process(cluster, TEST_PARTITIONS);
MetricSampler.Samples samples = processor.process(cluster, testPartitions);
assertEquals(1, samples.partitionMetricSamples().size(), "Should have ignored partitions on broker 0");
assertEquals(1, samples.brokerMetricSamples().size(), "Should have ignored broker 0");
}
......@@ -275,7 +275,7 @@ public class CruiseControlMetricsProcessorTest {
processor.addMetric(metric);
}
}
MetricSampler.Samples samples = processor.process(cluster, TEST_PARTITIONS);
MetricSampler.Samples samples = processor.process(cluster, testPartitions);
assertEquals(4, samples.partitionMetricSamples().size(), "Should have all 4 partition metrics.");
assertEquals(1, samples.brokerMetricSamples().size(), "Should have ignored broker 0");
}
......@@ -297,7 +297,7 @@ public class CruiseControlMetricsProcessorTest {
}
}
Cluster cluster = getCluster();
MetricSampler.Samples samples = processor.process(cluster, TEST_PARTITIONS);
MetricSampler.Samples samples = processor.process(cluster, testPartitions);
assertEquals(3, samples.partitionMetricSamples().size(), "Should have ignored partition " + T1P0);
assertEquals(2, samples.brokerMetricSamples().size(), "Should have reported both brokers");
}
......@@ -321,7 +321,7 @@ public class CruiseControlMetricsProcessorTest {
}
Cluster cluster = getCluster();
MetricSampler.Samples samples = processor.process(cluster, TEST_PARTITIONS);
MetricSampler.Samples samples = processor.process(cluster, testPartitions);
assertEquals(4, samples.partitionMetricSamples().size());
assertEquals(2, samples.brokerMetricSamples().size());
......@@ -454,12 +454,13 @@ public class CruiseControlMetricsProcessorTest {
Set<Node> allNodes = new HashSet<>();
allNodes.add(node0);
allNodes.add(node1);
Set<PartitionInfo> parts = new HashSet<>();
parts.add(new PartitionInfo(TOPIC1, P0, node0, nodes, nodes));
parts.add(new PartitionInfo(TOPIC1, P1, node1, nodes, nodes));
parts.add(new PartitionInfo(TOPIC_WITH_DOT, P0, node0, nodes, nodes));
parts.add(new PartitionInfo(TOPIC_WITH_DOT, P1, node0, nodes, nodes));
return new Cluster("testCluster", allNodes, parts, Collections.emptySet(), Collections.emptySet());
testPartitions = new HashSet<>();
testPartitions.add(new PartitionInfo(TOPIC1, P0, node0, nodes, nodes));
testPartitions.add(new PartitionInfo(TOPIC1, P1, node1, nodes, nodes));
testPartitions.add(new PartitionInfo(TOPIC_WITH_DOT, P0, node0, nodes, nodes));
testPartitions.add(new PartitionInfo(TOPIC_WITH_DOT, P1, node0, nodes, nodes));
return new Cluster("testCluster", allNodes, testPartitions, Collections.emptySet(), Collections.emptySet());
}
private Cluster getClusterMissingNode0() {
......
......@@ -7,7 +7,6 @@ package com.linkedin.kafka.cruisecontrol.monitor.sampling;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.junit.jupiter.api.Test;
import java.util.Collections;
......@@ -52,14 +51,14 @@ public class DefaultMetricSamplerPartitionAssignorTest {
}
Cluster cluster = new Cluster("cluster", allNodes, partitions, Collections.emptySet(), Collections.emptySet());
MetricSamplerPartitionAssignor assignor = new DefaultMetricSamplerPartitionAssignor();
Set<TopicPartition> assignment = assignor.assignPartitions(cluster);
Set<PartitionInfo> assignment = assignor.assignPartitions(cluster);
int maxAssignedNumPartitionsForFetcher = -1;
int minAssignedNumPartitionsForFetcher = Integer.MAX_VALUE;
int totalAssignedNumPartitions = 0;
maxAssignedNumPartitionsForFetcher = Math.max(maxAssignedNumPartitionsForFetcher, assignment.size());
minAssignedNumPartitionsForFetcher = Math.min(minAssignedNumPartitionsForFetcher, assignment.size());
Set<TopicPartition> uniqueAssignedPartitions = new HashSet<>(assignment);
Set<PartitionInfo> uniqueAssignedPartitions = new HashSet<>(assignment);
totalAssignedNumPartitions += assignment.size();
// Make sure all the partitions are assigned and there is no double assignment.
assertEquals(totalNumPartitions, totalAssignedNumPartitions,
......
......@@ -14,7 +14,6 @@ import kafka.server.KafkaConfig;
import kafka.server.MetadataCache;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
......@@ -23,11 +22,10 @@ import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;
import static com.linkedin.kafka.cruisecontrol.monitor.sampling.MetricFetcherManager.BROKER_CAPACITY_CONFIG_RESOLVER_OBJECT_CONFIG;
import static org.junit.jupiter.api.Assertions.assertFalse;
......@@ -77,13 +75,9 @@ public class ConfluentTelemetryReporterSamplerIntegrationTest extends MetricRepo
Cluster cluster = metadataCache.getClusterMetadata(zkClient.getClusterId().get(),
kafkaConfig(0).interBrokerListenerName());
Set<TopicPartition> partitions = new HashSet<>();
for (String topic : cluster.topics()) {
List<PartitionInfo> partitionsForTopic = cluster.partitionsForTopic(topic);
for (PartitionInfo partitionInfo : partitionsForTopic) {
partitions.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
}
}
Set<PartitionInfo> partitions = cluster.nodes().stream()
.flatMap(node -> cluster.partitionsForNode(node.id()).stream())
.collect(Collectors.toSet());
TestUtils.retryOnExceptionWithTimeout(5000, 30000, () -> {
MetricSampler.Samples samples = sampler.getSamples(cluster, partitions, 0L, System.currentTimeMillis(),
......
0% 加载中 .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册