Skip to content
代码片段 群组 项目
未验证 提交 d9e0cbff 编辑于 作者: Rajini Sivaram's avatar Rajini Sivaram 提交者: GitHub
浏览文件

KGLOBAL-1786: Trodgor task for consumer group operations and listing offsets (#6963)

Reviewers: Nikhil Bhatia <rite2nikhil@gmail.com>
上级 9fc44268
No related branches found
No related tags found
无相关合并请求
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.trogdor.workload;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.kafka.trogdor.task.TaskController;
import org.apache.kafka.trogdor.task.TaskSpec;
import org.apache.kafka.trogdor.task.TaskWorker;
import java.util.Collections;
import java.util.List;
import java.util.Map;
public class ConsumerGroupsSpec extends TaskSpec {
enum ConsumerGroupOperation {
LIST_GROUPS,
DESCRIBE_GROUPS,
LIST_OFFSETS,
ALTER_OFFSETS
}
private final String clientNode;
private final String bootstrapServers;
private final Map<String, String> commonClientConf;
private final Map<String, String> adminClientConf;
private final int targetOperationsPerSec;
private final List<ConsumerGroupOperation> operations;
private final TopicsSpec activeTopics;
private final boolean initializeGroupOffsets;
private final String groupPrefix;
private final int noOfGroups;
@JsonCreator
public ConsumerGroupsSpec(@JsonProperty("startMs") long startMs,
@JsonProperty("durationMs") long durationMs,
@JsonProperty("clientNode") String clientNode,
@JsonProperty("bootstrapServers") String bootstrapServers,
@JsonProperty("commonClientConf") Map<String, String> commonClientConf,
@JsonProperty("adminClientConf") Map<String, String> adminClientConf,
@JsonProperty("targetOperationsPerSec") int targetOperationsPerSec,
@JsonProperty("operations") List<ConsumerGroupOperation> operations,
@JsonProperty("activeTopics") TopicsSpec activeTopics,
@JsonProperty("initializeGroupOffsets") Boolean initializeGroupOffsets,
@JsonProperty("groupPrefix") String groupPrefix,
@JsonProperty("noOfGroups") int noOfGroups) {
super(startMs, durationMs);
this.clientNode = clientNode == null ? "" : clientNode;
this.bootstrapServers = bootstrapServers == null ? "" : bootstrapServers;
this.commonClientConf = commonClientConf == null ? Collections.emptyMap() : commonClientConf;
this.adminClientConf = adminClientConf == null ? Collections.emptyMap() : adminClientConf;
this.targetOperationsPerSec = targetOperationsPerSec;
this.operations = operations;
this.activeTopics = activeTopics;
this.initializeGroupOffsets = initializeGroupOffsets == null || initializeGroupOffsets;
this.groupPrefix = groupPrefix == null ? "" : groupPrefix;
this.noOfGroups = noOfGroups;
}
@JsonProperty
public String clientNode() {
return clientNode;
}
@JsonProperty
public String bootstrapServers() {
return bootstrapServers;
}
@JsonProperty
public Map<String, String> commonClientConf() {
return commonClientConf;
}
@JsonProperty
public Map<String, String> adminClientConf() {
return adminClientConf;
}
@JsonProperty
public int targetOperationsPerSec() {
return targetOperationsPerSec;
}
@JsonProperty
public List<ConsumerGroupOperation> operations() {
return operations;
}
@JsonProperty
public TopicsSpec activeTopics() {
return activeTopics;
}
@JsonProperty
public boolean initializeGroupOffsets() {
return initializeGroupOffsets;
}
@JsonProperty
public String groupPrefix() {
return groupPrefix;
}
@JsonProperty
public int noOfGroups() {
return noOfGroups;
}
@Override
public TaskController newController(String id) {
return topology -> Collections.singleton(clientNode);
}
@Override
public TaskWorker newTaskWorker(String id) {
return new ConsumerGroupsWorker(id, this);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.trogdor.workload;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.TextNode;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.ConsumerGroupDescription;
import org.apache.kafka.clients.admin.ConsumerGroupListing;
import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsSpec;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.utils.ThreadUtils;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.trogdor.common.JsonUtil;
import org.apache.kafka.trogdor.common.Platform;
import org.apache.kafka.trogdor.common.WorkerUtils;
import org.apache.kafka.trogdor.task.TaskWorker;
import org.apache.kafka.trogdor.task.WorkerStatusTracker;
import org.apache.kafka.trogdor.workload.ConsumerGroupsSpec.ConsumerGroupOperation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.stream.Collectors;
public class ConsumerGroupsWorker implements TaskWorker {
private static final Logger log = LoggerFactory.getLogger(ConsumerGroupsWorker.class);
private static final String NAME = ConsumerGroupsWorker.class.getSimpleName();
private static final ListConsumerGroupOffsetsSpec ALL_PARTITIONS = new ListConsumerGroupOffsetsSpec();
private static final int THROTTLE_PERIOD_MS = 1000;
private static final Time TIME = Time.SYSTEM;
private static final AtomicLong COUNTER = new AtomicLong(0);
private final String id;
private final ConsumerGroupsSpec spec;
private final List<String> groupIds;
private final HashSet<TopicPartition> partitions;
private final AtomicBoolean running = new AtomicBoolean(false);
private Future<?> statusUpdaterFuture;
private ExecutorService workerExecutor;
private ScheduledExecutorService statusUpdaterExecutor;
private WorkerStatusTracker status;
private KafkaFutureImpl<String> doneFuture;
private long totalCalls;
private long startTimeMs;
public ConsumerGroupsWorker(String id, ConsumerGroupsSpec spec) {
this.id = id;
this.spec = spec;
String groupPrefix = NAME + spec.groupPrefix();
this.groupIds = new ArrayList<>(spec.noOfGroups());
for (int i = 0; i < spec.noOfGroups(); i++)
this.groupIds.add(groupPrefix + "-" + i);
partitions = new HashSet<>();
spec.activeTopics().materialize().forEach((topic, partSpec) -> {
for (Integer partitionNumber : partSpec.partitionNumbers()) {
partitions.add(new TopicPartition(topic, partitionNumber));
}
});
}
@Override
public void start(Platform platform,
WorkerStatusTracker status,
KafkaFutureImpl<String> doneFuture) {
if (!running.compareAndSet(false, true)) {
throw new IllegalStateException("ConsumerOffsetsSpec is already running.");
}
synchronized (ConsumerGroupsWorker.this) {
this.totalCalls = 0;
this.startTimeMs = TIME.milliseconds();
}
log.info("{}: Activating ConsumerOffsetsSpec.", id);
try {
this.status = status;
this.doneFuture = doneFuture;
validateConfigs();
this.workerExecutor = Executors.newFixedThreadPool(1,
ThreadUtils.createThreadFactory(NAME + "%d", false));
this.workerExecutor.submit(new Prepare());
statusUpdaterExecutor = Executors.newScheduledThreadPool(1,
ThreadUtils.createThreadFactory("StatusUpdaterWorkerThread%d", false));
statusUpdaterFuture = statusUpdaterExecutor.scheduleAtFixedRate(
new StatusUpdater(), 30, 10, TimeUnit.MILLISECONDS);
} catch (Throwable e) {
WorkerUtils.abort(log, NAME, e, doneFuture);
}
}
private void validateConfigs() {
if (spec.targetOperationsPerSec() <= 0) {
throw new ConfigException("Can't have targetOperationsPerSec <= 0.");
}
}
class Prepare implements Runnable {
@Override
public void run() {
if (spec.noOfGroups() <= 0) {
workerExecutor.submit(new Worker());
return;
}
try {
Map<String, NewTopic> newTopics = new HashMap<>();
HashSet<TopicPartition> active = new HashSet<>();
for (Map.Entry<String, PartitionsSpec> entry : spec.activeTopics().materialize().entrySet()) {
String topicName = entry.getKey();
PartitionsSpec partSpec = entry.getValue();
newTopics.put(topicName, partSpec.newTopic(topicName));
for (Integer partitionNumber : partSpec.partitionNumbers()) {
active.add(new TopicPartition(topicName, partitionNumber));
}
}
if (!active.isEmpty()) {
status.update(new TextNode("Creating " + newTopics.keySet().size() + " topic(s)"));
WorkerUtils.createTopics(log, spec.bootstrapServers(), spec.commonClientConf(),
spec.adminClientConf(), newTopics, false);
status.update(new TextNode("Created " + newTopics.keySet().size() + " topic(s)"));
if (spec.initializeGroupOffsets()) {
try (Admin admin = createAdmin()) {
alterOffsets(admin, 0);
}
status.update(new TextNode("Initialized offsets for " + groupIds.size() +
" group(s) with " + partitions.size() + " partitions each."));
}
}
workerExecutor.submit(new Worker());
} catch (Throwable e) {
WorkerUtils.abort(log, "Prepare", e, doneFuture);
}
}
}
class Worker implements Runnable {
@Override
public void run() {
int perPeriod = WorkerUtils.perSecToPerPeriod(spec.targetOperationsPerSec(), THROTTLE_PERIOD_MS);
Throttle throttle = new Throttle(perPeriod, THROTTLE_PERIOD_MS);
Admin admin = null;
try {
admin = createAdmin();
while (!doneFuture.isDone()) {
throttle.increment();
int noOfOps = runOperations(admin, spec.operations(), COUNTER.incrementAndGet());
synchronized (ConsumerGroupsWorker.this) {
totalCalls = totalCalls + noOfOps;
}
}
} catch (Throwable e) {
Utils.closeQuietly(admin, "Admin");
WorkerUtils.abort(log, NAME + "#Worker", e, doneFuture);
}
}
}
private int runOperations(Admin admin, List<ConsumerGroupOperation> operations, long index) throws ExecutionException, InterruptedException {
for (ConsumerGroupOperation op : operations) {
switch (op) {
case LIST_GROUPS:
listGroups(admin);
break;
case DESCRIBE_GROUPS:
describeGroups(admin);
break;
case LIST_OFFSETS:
listOffsets(admin);
break;
case ALTER_OFFSETS:
alterOffsets(admin, index);
break;
default:
throw new IllegalArgumentException("Unsupported operation " + op);
}
}
return 1;
}
private void listGroups(Admin admin) throws ExecutionException, InterruptedException {
List<String> groups = admin.listConsumerGroups().all().get().stream()
.map(ConsumerGroupListing::groupId)
.filter(groupIds::contains)
.collect(Collectors.toList());
if (groups.size() != this.groupIds.size()) {
log.error("Some consumer groups are missing in listConsumerGroups response, expected {}, got {}",
groupIds.size(), groups.size());
}
}
private void describeGroups(Admin admin) throws ExecutionException, InterruptedException {
Map<String, ConsumerGroupDescription> groups = admin.describeConsumerGroups(this.groupIds).all().get();
if (groups.size() != this.groupIds.size()) {
log.error("Some consumer groups are missing in describeConsumerGroups response, expected {}, got {}",
groupIds.size(), groups.size());
}
}
private void listOffsets(Admin admin) throws ExecutionException, InterruptedException {
Map<String, ListConsumerGroupOffsetsSpec> offsetSpecs = groupIds.stream()
.collect(Collectors.toMap(Function.identity(), g -> ALL_PARTITIONS));
Map<String, Map<TopicPartition, OffsetAndMetadata>> offsets = admin.listConsumerGroupOffsets(offsetSpecs).all().get();
if (offsets.size() != this.groupIds.size()) {
log.error("Some consumer groups are missing in listConsumerGroupOffsets response, expected {}, got {}",
groupIds.size(), offsets.size());
} else {
offsets.forEach((g, o) -> {
if (o.size() != partitions.size()) {
log.debug("Offsets for some topic partitions of group {} are missing, expected {}, got {}", g, partitions.size(), o.size());
}
});
}
}
private void alterOffsets(Admin admin, long index) throws ExecutionException, InterruptedException {
OffsetAndMetadata offset = new OffsetAndMetadata(index);
Map<TopicPartition, OffsetAndMetadata> offsets = partitions.stream()
.collect(Collectors.toMap(Function.identity(), unused -> offset));
for (String group : groupIds) {
admin.alterConsumerGroupOffsets(group, offsets).all().get();
}
}
private Admin createAdmin() {
Map<String, Object> props = new HashMap<>();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, spec.bootstrapServers());
props.putAll(spec.commonClientConf());
props.putAll(spec.adminClientConf());
return Admin.create(props);
}
private class StatusUpdater implements Runnable {
@Override
public void run() {
try {
long lastTimeMs = Time.SYSTEM.milliseconds();
JsonNode node = JsonUtil.JSON_SERDE.valueToTree(
new ConsumerGroupsWorker.StatusData(totalCalls,
(totalCalls * 1000.0) / (lastTimeMs - startTimeMs)));
status.update(node);
} catch (Exception e) {
WorkerUtils.abort(log, "StatusUpdater", e, doneFuture);
}
}
}
public static class StatusData {
private final double callsPerSec;
private final double totalCalls;
@JsonCreator
StatusData(@JsonProperty("totalCalls") double totalCalls,
@JsonProperty("callsPerSec") double callsPerSec) {
this.callsPerSec = callsPerSec;
this.totalCalls = totalCalls;
}
@JsonProperty
public double callsPerSec() {
return callsPerSec;
}
@JsonProperty
public double totalCalls() {
return totalCalls;
}
}
@Override
public void stop(Platform platform) throws Exception {
if (!running.compareAndSet(true, false)) {
throw new IllegalStateException(NAME + " is not running.");
}
log.info("{}: Deactivating " + NAME, id);
doneFuture.complete("");
this.statusUpdaterFuture.cancel(false);
this.statusUpdaterExecutor.shutdown();
this.statusUpdaterExecutor.awaitTermination(1, TimeUnit.DAYS);
this.statusUpdaterExecutor = null;
workerExecutor.shutdownNow();
workerExecutor.awaitTermination(1, TimeUnit.DAYS);
new StatusUpdater().run();
this.workerExecutor = null;
this.doneFuture = null;
long lastTimeMs = Time.SYSTEM.milliseconds();
double callsPerSec = (totalCalls * 1000.0) / (lastTimeMs - startTimeMs);
log.info("Achieved CallsPerSec : {}", callsPerSec);
log.info("{}: Deactivated " + NAME, id);
}
}
0% 加载中 .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册