Skip to content
代码片段 群组 项目
未验证 提交 aa38a863 编辑于 作者: Aneesh Garg's avatar Aneesh Garg 提交者: GitHub
浏览文件

CIAM-1503: Ability to de-code message headers of auth-topic (#6816)

Jira: https://confluentinc.atlassian.net/browse/CIAM-1530

Description:
To debug data plane auth issues, it handy / part of runbooks to log onto a Kafka node and dump/inspect the auth topic via the kafka console consumer.
Header of messages from topic _confluent-metadata-auth is in protobuff format. This PR aims at making the dump more readable by making use of MessageFormatter.
上级 ca363d99
No related branches found
No related tags found
无相关合并请求
// (Copyright) [2022 - 2022] Confluent, Inc.
package io.confluent.kafka.multitenant.serde;
import com.google.protobuf.InvalidProtocolBufferException;
import io.confluent.kafka.multitenant.utils.Utils;
import io.confluent.protobuf.cloud.events.v1.EventsMetadata;
import io.confluent.security.auth.store.data.RoleBindingKey;
import io.confluent.security.auth.store.data.RoleBindingValue;
import io.confluent.security.authorizer.utils.JsonMapper;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.MessageFormatter;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import java.io.IOException;
import java.io.PrintStream;
import java.util.Map;
public class RoleBindingMetadataFormatter implements MessageFormatter {
private boolean printHeader;
private boolean printKey;
private boolean printValue;
private boolean printTimestamp;
private boolean printPartition;
private boolean printOffset;
@Override
public void configure(Map<String, ?> configs) {
printHeader = getBoolean(configs, "print.header", Boolean.FALSE);
printKey = getBoolean(configs, "print.key", Boolean.FALSE);
printValue = getBoolean(configs, "print.value", Boolean.TRUE);
printTimestamp = getBoolean(configs, "print.timestamp", Boolean.FALSE);
printPartition = getBoolean(configs, "print.partition", Boolean.FALSE);
printOffset = getBoolean(configs, "print.offset", Boolean.FALSE);
}
private boolean getBoolean(Map<String, ?> configs, String name, boolean defaultValue) {
return Boolean.parseBoolean(getString(configs, name, Boolean.toString(defaultValue)).trim());
}
private String getString(Map<String, ?> configs, String name, String defaultValue) {
Object o = configs.get(name);
if (o instanceof String)
return (String) o;
if (o == null)
return defaultValue;
throw new IllegalArgumentException("Unexpected type for config value for '" + name +
"', expected String but found " + o.getClass() + ", value: " + o);
}
@Override
public void writeTo(ConsumerRecord<byte[], byte[]> consumerRecord, PrintStream output) {
if (printTimestamp) {
output.append("Timestamp: ")
.append(Long.toString(consumerRecord.timestamp())).append(" | ");
}
if (printPartition) {
output.append("Partition: ")
.append(Integer.toString(consumerRecord.partition())).append(" | ");
}
if (printOffset) {
output.append("Offset: ")
.append(Long.toString(consumerRecord.offset())).append(" | ");
}
if (printHeader && consumerRecord.headers() != null) {
output.append("Header: ")
.append(getEventsMetadataHeader(consumerRecord.headers())).append(" | ");
}
if (printKey) {
output.append("Key: ")
.append(getJsonMapperObject(RoleBindingKey.class, consumerRecord.key())).append(" | ");
}
if (printValue) {
output.append("Value: ")
.append(getJsonMapperObject(RoleBindingValue.class, consumerRecord.key())).append(" | ");
}
output.append("\n");
}
private String getEventsMetadataHeader(Headers headers) {
Header lastEventsMetadataHeader = headers.lastHeader(Utils.EVENTS_METADATA_HEADER_KEY);
if (lastEventsMetadataHeader != null) {
try {
EventsMetadata eventsMetadata = EventsMetadata.parseFrom(lastEventsMetadataHeader.value());
EventsMetadataHeader eventsMetadataHeader = EventsMetadataHeader.fromProtobuf(eventsMetadata);
return eventsMetadataHeader.toString();
} catch (InvalidProtocolBufferException e) {
return "Unable to parse events metadata header: " + e.getMessage();
}
}
return null;
}
private <T> String getJsonMapperObject(Class<T> clazz, byte[] data) {
try {
return JsonMapper.objectMapper().readValue(data, clazz).toString();
} catch (IOException e) {
return "Unable to parse role binding value: " + e.getMessage();
}
}
@Override
public void close() {
}
}
\ No newline at end of file
0% 加载中 .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册