Skip to content
代码片段 群组 项目
未验证 提交 55949019 编辑于 作者: Ron Dagostino's avatar Ron Dagostino 提交者: GitHub
浏览文件

KMETA-295: RuntimeException in TelemetryReporter init on KRaft controllers (#6913)

上级 ec7a3450
No related branches found
No related tags found
无相关合并请求
......@@ -36,6 +36,7 @@ import io.confluent.telemetry.provider.KafkaServerProvider;
import io.confluent.telemetry.provider.Provider;
import io.confluent.telemetry.provider.ProviderRegistry;
import io.opencensus.proto.resource.v1.Resource;
import kafka.server.KafkaConfig;
import org.apache.kafka.common.ClusterResource;
import org.apache.kafka.common.ClusterResourceListener;
import org.apache.kafka.common.config.ConfigException;
......@@ -654,10 +655,28 @@ public class TelemetryReporter implements MetricsReporter, ClusterResourceListen
return configs;
}
private static Map<String, Object> maybeInjectLocalExporter(Provider provider, Map<String, Object> originals) {
Map<String, Object> configs = new HashMap<>();
private static boolean isRunningInsideBroker(Provider provider, Map<String, Object> originals) {
// this check is how we determine if we're inside the broker
// Note that ideally we should not care that we are in a KRaft remote controller, but currently we do not support
// reporting KRaft remote controller metrics to the local topic because we cannot derive the bootstrap servers
// in the same way that we do when we are in the broker. We need to use a different mechanism, which is tracked
// by KMETA-300, and we will revert this exclusion of KRaft remote controllers as part of that implementation.
if (provider instanceof KafkaServerProvider) {
// Pre-KRaft we are definitely running inside the broker if we have a KafkaServerProvider,
// but with KRaft we have to also explicitly check to make sure that this is not a remote KRaft controller
// because remote KRaft controllers also leverage the same KafkaServerProvider.
Object processRoles = originals.getOrDefault(KafkaConfig.ProcessRolesProp(), "");
String processRolesToString = processRoles.toString();
boolean isKRaftRemoteController = processRolesToString.contains("controller") && !processRolesToString.contains("broker");
return !isKRaftRemoteController;
} else {
return false;
}
}
private static Map<String, Object> maybeInjectLocalExporter(Provider provider, Map<String, Object> originals) {
Map<String, Object> configs = new HashMap<>();
if (isRunningInsideBroker(provider, originals)) {
// first add the local exporter default values
configs.putAll(
prefixedExporterConfigs(
......
0% 加载中 .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册