Skip to content
代码片段 群组 项目
未验证 提交 b52f5f9f 编辑于 作者: Josh H's avatar Josh H 提交者: GitHub
浏览文件

[KDATA-509] Tier Topic Partition Snapshot (TTPS): FlatBuffers and serialization wrapper (#6979)

Reviewers: @RamanVerma @alok123t 
上级 6156ceb0
No related branches found
No related tags found
无相关合并请求
......@@ -205,6 +205,7 @@
</subpackage>
<subpackage name="domain">
<allow class="com.google.flatbuffers.FlatBufferBuilder" />
<allow class="com.google.flatbuffers.ByteVector" />
</subpackage>
<subpackage name="fetcher">
<allow class="kafka.server.DelayedOperation" />
......
/*
Copyright 2022 Confluent Inc.
*/
package kafka.tier.domain;
import com.google.flatbuffers.FlatBufferBuilder;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.SortedMap;
import java.util.TreeMap;
import kafka.tier.serdes.OffsetRange;
import kafka.tier.serdes.TierTopicSnapshot;
import kafka.tier.serdes.TierTopicSnapshotEntry;
import kafka.tier.serdes.TierTopicSnapshotHeader;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.record.TimestampType;
/**
* TierTopicPartitionSnapshot domain object. The schema for this file is defined in
* <a href="file:core/src/main/resources/serde/immutable/tier_topic_snapshot.fbs">tier_topic_snapshot.fbs</a>
* <p>
* This class is a serialization wrapper for the TierTopicSnapshot (TTPS) FlatBuffer-generated class. It encodes the
* schema outlined in "Tier Topic Snapshots" (KDATA-211), with some minor modifications. Most significantly, instead of
* using a separate index file, the offset range per partition is stored in the header of each TTPS file.
* <p>
* The input to this class constructor assumes that the submitted entries are complete insofar as they would contain
* events from every single partition, not a subset of partitions, in order.
*/
public class TierTopicPartitionSnapshot {
private static final byte VERSION_0 = 0; // Initial version
private static final byte CURRENT_VERSION = VERSION_0;
private static final int INITIAL_BUFFER_SIZE = 500;
protected static final int UNKNOWN_LEADER_EPOCH = -1;
protected static final String TIER_TOPIC_NAME = org.apache.kafka.common.internals.Topic.TIER_TOPIC_NAME;
protected static final TimestampType TIMESTAMP_TYPE = TimestampType.CREATE_TIME;
private final TierTopicSnapshot tierTopicSnapshot;
public TierTopicPartitionSnapshot(List<ConsumerRecords<byte[], byte[]>> recordsBuffer) {
FlatBufferBuilder builder = new FlatBufferBuilder(TierTopicPartitionSnapshot.INITIAL_BUFFER_SIZE)
.forceDefaults(false);
int entryId = buildTierTopicPartitionSnapshot(builder, recordsBuffer);
TierTopicSnapshot.finishTierTopicSnapshotBuffer(builder, entryId);
this.tierTopicSnapshot = TierTopicSnapshot.getRootAsTierTopicSnapshot(builder.dataBuffer());
}
static int buildTierTopicPartitionSnapshot(FlatBufferBuilder builder,
List<ConsumerRecords<byte[], byte[]>> recordsBuffer) {
/* We need to keep track of the minimum-encountered and maximum-encountered offset for each partition
* in order by partition number to produce the sorted OffsetRange vector. There will only ever be at most
* 50 partitions, so maintaining a sorted map is of little performance consequence, and keeps the code simple.
*/
SortedMap<Integer, Long> startOffsetMap = new TreeMap<>();
SortedMap<Integer, Long> endOffsetMap = new TreeMap<>();
/* [TierTopicSnapshotEntry] */
int[] entryOffsets = new int[recordsBuffer.stream().mapToInt(ConsumerRecords::count).sum()];
int i = 0;
for (ConsumerRecords<byte[], byte[]> records : recordsBuffer) {
for (ConsumerRecord<byte[], byte[]> record : records) {
int partition = record.partition();
// Update OffsetRange[min, max] for this partition
long eventOffset = record.offset();
// The first time we encounter a partition, its offset is the first/starting offset
startOffsetMap.putIfAbsent(partition, eventOffset);
// In any subsequent encounter of a partition, its offset is the last/ending offset
endOffsetMap.put(partition, eventOffset);
int keyOffset = TierTopicSnapshotEntry.createKeyVector(builder, record.key());
int valueOffset = TierTopicSnapshotEntry.createValueVector(builder, record.value());
TierTopicSnapshotEntry.startTierTopicSnapshotEntry(builder);
TierTopicSnapshotEntry.addPartition(builder, (byte) partition);
int offsetAndEpochOffset = kafka.tier.serdes.OffsetAndEpoch.createOffsetAndEpoch(builder, eventOffset,
record.leaderEpoch().orElse(UNKNOWN_LEADER_EPOCH)); // structs must be serialized inline
TierTopicSnapshotEntry.addOffsetAndEpoch(builder, offsetAndEpochOffset);
TierTopicSnapshotEntry.addTimestamp(builder, record.timestamp());
TierTopicSnapshotEntry.addKey(builder, keyOffset);
TierTopicSnapshotEntry.addValue(builder, valueOffset);
int entryOffset = TierTopicSnapshotEntry.endTierTopicSnapshotEntry(builder);
TierTopicSnapshotEntry.finishTierTopicSnapshotEntryBuffer(builder, entryOffset);
entryOffsets[i++] = entryOffset;
}
}
int entriesOffset = TierTopicSnapshot.createTierTopicSnapshotEntriesVector(builder, entryOffsets);
/* TierTopicSnapshotHeader */
TierTopicSnapshotHeader.startOffsetsVector(builder, startOffsetMap.size());
startOffsetMap.forEach((partition, startOffset) ->
OffsetRange.createOffsetRange(builder, startOffset, endOffsetMap.get(partition)));
int offsetsOffset = builder.endVector();
int headerOffset = TierTopicSnapshotHeader.createTierTopicSnapshotHeader(builder, offsetsOffset);
/* TierTopicSnapshot */
TierTopicSnapshot.startTierTopicSnapshot(builder);
TierTopicSnapshot.addVersion(builder, CURRENT_VERSION);
TierTopicSnapshot.addTierTopicSnapshotHeader(builder, headerOffset);
TierTopicSnapshot.addTierTopicSnapshotEntries(builder, entriesOffset);
return TierTopicSnapshot.endTierTopicSnapshot(builder);
}
protected TierTopicPartitionSnapshot(TierTopicSnapshot snapshot) {
this.tierTopicSnapshot = snapshot;
}
public static TierTopicPartitionSnapshot read(FileChannel channel) throws IOException {
ByteBuffer buf = ByteBuffer.allocate((int) channel.size());
channel.read(buf);
return new TierTopicPartitionSnapshot(TierTopicSnapshot.getRootAsTierTopicSnapshot(buf));
}
public void write(FileChannel channel) throws IOException {
channel.write(tierTopicSnapshot.getByteBuffer());
}
// TODO: although we don't expect TTPS files to be large, we may want to implement an iterator to save memory
public List<ConsumerRecord<byte[], byte[]>> entries() {
List<ConsumerRecord<byte[], byte[]>> output = new ArrayList<>();
TierTopicSnapshotEntry.Vector vector = tierTopicSnapshot.tierTopicSnapshotEntriesVector();
int size = vector.length();
for (int i = 0; i < size; ++i) {
TierTopicSnapshotEntry entry = vector.get(i);
byte[] key = new byte[entry.keyLength()];
entry.keyAsByteBuffer().get(key);
byte[] value = new byte[entry.valueLength()];
entry.valueAsByteBuffer().get(value);
output.add(makeTierTopicRecord(entry.partition(), entry.offsetAndEpoch().offset(), entry.timestamp(), key,
value, entry.offsetAndEpoch().epoch()));
}
return output;
}
public long startOffset(int partition) {
if (partition >= tierTopicSnapshot.tierTopicSnapshotHeader().offsetsLength())
throw new IndexOutOfBoundsException("Invalid partition number " + partition);
return tierTopicSnapshot.tierTopicSnapshotHeader().offsets(partition).start();
}
public Long endOffset(int partition) {
if (partition >= tierTopicSnapshot.tierTopicSnapshotHeader().offsetsLength())
throw new IndexOutOfBoundsException("Invalid partition number " + partition);
return tierTopicSnapshot.tierTopicSnapshotHeader().offsets(partition).end();
}
public ByteBuffer payloadBuffer() {
return tierTopicSnapshot.getByteBuffer().duplicate();
}
/* Logic for making a new ConsumerRecord, shared between this class and unit tests */
protected static ConsumerRecord<byte[], byte[]> makeTierTopicRecord(int partition, long offset, long timestamp,
byte[] key, byte[] value, int leaderEpoch) {
// Assumption: users of TTPS file do not need ConsumerRecord.headers(), so the Headers field was not
// serialized. Thus, we simply create an empty Headers here to satisfy the ConsumerRecord constructor.
Headers headers = new RecordHeaders();
return new ConsumerRecord<>(TIER_TOPIC_NAME, partition, offset, timestamp, TIMESTAMP_TYPE, key.length,
value.length, key, value, headers,
leaderEpoch == UNKNOWN_LEADER_EPOCH ? Optional.empty() : Optional.of(leaderEpoch));
}
}
/*
Copyright 2019 Confluent Inc.
Copyright 2022 Confluent Inc.
*/
namespace kafka.tier.serdes;
/*
* Serialization format for kafka.tier.state.OffsetAndEpoch. When the Epoch field is empty,
* its value should be serialized as -1.
*/
struct OffsetAndEpoch {
offset: long (id: 0);
epoch: int (id: 1);
......
/*
Copyright 2022 Confluent Inc.
*/
namespace kafka.tier.serdes;
struct OffsetRange {
start: long (id: 0);
end: long (id: 1);
}
/*
Copyright 2022 Confluent Inc.
*/
include "tier_topic_snapshot_header.fbs";
include "tier_topic_snapshot_entry.fbs";
namespace kafka.tier.serdes;
/*
* Format derived from KDATA-211 with addition of OffsetRange to each header to replace the TTPS
* index file.
*
* Do not use this class directly. This format is wrapped by tier.domain.TierTopicPartitionSnapshot.
*
* IMPORTANT NOTES:
* 1. Do not reuse/mutate field IDs as this will break forward and backward compatibility! Instead,
* mark them as deprecated, e.g. myfield: byte (id: 9, deprecated)
* 2. You must bump TierTopicPartitionSnapshot#CURRENT_VERSION when making changes to this schema
or either of its child schemas (TierTopicSnapshotHeader and TierTopicSnapshotEntry).
*
* Version 0: Initial version
*/
table TierTopicSnapshot {
version: byte (id: 0);
tier_topic_snapshot_header: TierTopicSnapshotHeader (id: 1);
tier_topic_snapshot_entries: [TierTopicSnapshotEntry] (id: 2);
}
root_type TierTopicSnapshot;
/*
Copyright 2022 Confluent Inc.
*/
include "offset_epoch.fbs";
namespace kafka.tier.serdes;
/*
* Format derived from KDATA-211 with addition of OffsetRange to each header to replace the
* Tier Topic Partition Snapshot index file. This Entry class serializes most of the data contained
* in a ConsumerRecord object, except that certain fields are excluded (to save space) because it is
* specialized for tier topic records.
*
* Do not use this class directly. This is a nested table of TierTopicSnapshot, which itself is
* wrapped by tier.domain.TierTopicPartitionSnapshot.
*
* IMPORTANT NOTES:
* 1. Do not reuse/mutate field IDs as this will break forward and backward compatibility! Instead,
* mark them as deprecated, e.g. myfield: byte (id: 9, deprecated)
* 2. You must bump TierTopicPartitionSnapshot#CURRENT_VERSION when making changes to this schema.
*
* Version 0: Initial version
*/
table TierTopicSnapshotEntry {
/* partition: the partition in the tier state topic the event/message was written to */
partition: int8 (id: 0);
/* offset_and_epoch: the offset in the tier state partition to which the event/message was written
* along with its optional epoch
*/
offset_and_epoch: OffsetAndEpoch (id: 1);
/* timestamp: timestamp of the event/message written to the tier state partition; note that
* the tier state topic is configured to use TimestampType.CREATE_TIME
*/
timestamp: long (id: 2);
/* key: the ConsumerRecord.key() byte array */
key: [byte] (id: 3);
/* value: the ConsumerRecord.value() byte array */
value: [byte] (id: 4);
}
root_type TierTopicSnapshotEntry;
/*
Copyright 2022 Confluent Inc.
*/
include "offset_range.fbs";
namespace kafka.tier.serdes;
/*
* Format derived from KDATA-211 with addition of OffsetRange to each header to replace the TTPS
* index file.
*
* Do not use this class directly. This is a nested table of TierTopicSnapshot, which itself is
* wrapped by tier.domain.TierTopicPartitionSnapshot.
*
* IMPORTANT NOTES:
* 1. Do not reuse/mutate field IDs as this will break forward and backward compatibility! Instead,
* mark them as deprecated, e.g. myfield: byte (id: 9, deprecated)
* 2. You must bump TierTopicPartitionSnapshot#CURRENT_VERSION when making changes to this schema.
*
* Version 0: Initial version
*/
table TierTopicSnapshotHeader {
/* offsets: array of 50 entries (one per tier state partition), where each entry represents the
* range (inclusive) of offsets of events present in this snapshot file, sorted
* in order by partition number (i.e. array index equals partition number)
*/
offsets: [OffsetRange] (id: 0);
}
root_type TierTopicSnapshotHeader;
/*
Copyright 2022 Confluent Inc.
*/
package kafka.tier.domain;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static kafka.tier.domain.TierTopicPartitionSnapshot.TIER_TOPIC_NAME;
import static kafka.tier.domain.TierTopicPartitionSnapshot.makeTierTopicRecord;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.stream.Collectors;
import kafka.tier.TopicIdPartition;
import kafka.tier.state.OffsetAndEpoch;
import kafka.tier.store.TierObjectStore;
import kafka.tier.store.TierObjectStore.OpaqueData;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;
import org.junit.jupiter.api.Test;
public class TierTopicSnapshotTest {
/**
* Converts list of raw ConsumerRecords into AbstractTierMetadata objects using AbstractTierMetadata.deserialize()
*
* @param entries List of ConsumerRecord objects whose key and value are expected to be deserializable into
* AbstractTierMetadata objects
* @return List of each valid AbstractTierMetadata deserialized from entries
*/
public static List<AbstractTierMetadata> deserializeEntries(List<ConsumerRecord<byte[], byte[]>> entries) {
return entries.stream()
.map(entry -> AbstractTierMetadata.deserialize(entry.key(), entry.value(), entry.timestamp()))
.filter(Optional::isPresent)
.map(Optional::get)
.collect(Collectors.toList());
}
/**
* Verify that original tier metadata objects are equal to those that have been serialized into and deserialized out
* of TierTopicPartitionSnapshot's FlatBuffer
*/
@Test
public void testTierTopicSnapshotSerializationDeserialization() {
TopicIdPartition tpid = new TopicIdPartition("foo", UUID.randomUUID(), 10);
int tierPartition = 0;
long tierOffset0 = 0;
long tierOffset1 = tierOffset0 + 100;
/* Construct first tier metadata and serialize into ConsumerRecord */
TierSegmentUploadInitiate tsui = makeTierSegmentUploadInitiate(tpid);
ConsumerRecord<byte[], byte[]> tsuiRecord = makeTierTopicRecord(tierPartition, tierOffset0, 0,
tsui.serializeKey(), tsui.serializeValue(), tsui.tierEpoch());
/* Construct second tier metadata and serialize into ConsumerRecord */
TierSegmentUploadComplete tsuc = makeTierSegmentUploadComplete(tpid);
ConsumerRecord<byte[], byte[]> tsucRecord = makeTierTopicRecord(tierPartition, tierOffset1, 100,
tsuc.serializeKey(), tsuc.serializeValue(), tsuc.tierEpoch());
/* Construct TTPS FlatBuffer from ConsumerRecords */
TierTopicPartitionSnapshot ttps = makeTierTopicPartitionSnapshot(
Collections.singletonList(Collections.singletonMap(tierPartition, Arrays.asList(tsuiRecord, tsucRecord))));
/* Compare the tier metadata that was serialized into and deserialized out of the TTPS FlatBuffer
* to the original tier metadata objects */
List<AbstractTierMetadata> actualEntries = deserializeEntries(ttps.entries());
List<AbstractTierMetadata> expectedEntries = Arrays.asList(tsui, tsuc);
assertEquals(expectedEntries, actualEntries,
"Manually-constructed List<AbstractTierMetadata> did not match list produced from deserializing entries out of FlatBuffer");
}
/*
* Verify that the TTPS Header has expected offset range and that it throws error on out of range partition number
*/
@Test
public void testOffsetRanges() {
TopicIdPartition tpid = new TopicIdPartition("foo", UUID.randomUUID(), 10);
int tierPartition = 0;
long tierOffset0 = 0;
long tierOffset1 = tierOffset0 + 100;
/* Construct first tier metadata and serialize into ConsumerRecord */
TierSegmentUploadInitiate tsui = makeTierSegmentUploadInitiate(tpid);
ConsumerRecord<byte[], byte[]> tsuiRecord = makeTierTopicRecord(tierPartition, tierOffset0, 0,
tsui.serializeKey(), tsui.serializeValue(), tsui.tierEpoch());
/* Construct second tier metadata and serialize into ConsumerRecord */
TierSegmentUploadComplete tsuc = makeTierSegmentUploadComplete(tpid);
ConsumerRecord<byte[], byte[]> tsucRecord = makeTierTopicRecord(tierPartition, tierOffset1, 100,
tsuc.serializeKey(), tsuc.serializeValue(), tsuc.tierEpoch());
/* Construct TTPS FlatBuffer from ConsumerRecords */
TierTopicPartitionSnapshot ttps = makeTierTopicPartitionSnapshot(
Collections.singletonList(Collections.singletonMap(tierPartition, Arrays.asList(tsuiRecord, tsucRecord))));
/* Compare offset ranges in the TTPS Header */
assertEquals(tierOffset0, ttps.startOffset(tierPartition),
"Unexpected starting offset found in TTPS header for partition " + tierPartition);
assertEquals(tierOffset1, ttps.endOffset(tierPartition),
"Unexpected ending offset found in TTPS header for partition " + tierPartition);
/* Ensure offset range throws error on invalid partition number */
assertThrows(IndexOutOfBoundsException.class, () -> ttps.startOffset(1));
}
/*
* Manually constructing a test input to the constructor for TierTopicPartitionSnapshot is convoluted, so its
* logic is encapsulated here for testing purposes. To use it, you can pass a list of Collections.singletonMaps
* where the key of each map is a tier partition and the value is a list of ConsumerRecord objects that should
* belong to that tier partition.
*/
private static TierTopicPartitionSnapshot makeTierTopicPartitionSnapshot(
List<Map<Integer, List<ConsumerRecord<byte[], byte[]>>>> input) {
Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> recordsMap = new HashMap<>();
for (Map<Integer, List<ConsumerRecord<byte[], byte[]>>> inputMap : input) {
inputMap.forEach((partition, recordList) ->
recordsMap.put(new TopicPartition(TIER_TOPIC_NAME, partition), recordList));
}
ConsumerRecords<byte[], byte[]> records = new ConsumerRecords<>(recordsMap);
List<ConsumerRecords<byte[], byte[]>> recordsBuffer = Collections.singletonList(records);
return new TierTopicPartitionSnapshot(recordsBuffer);
}
/* PRIVATE HELPERS FOR CONSTRUCTING DUMMY TIER METADATA OBJECTS */
private static TierSegmentUploadInitiate makeTierSegmentUploadInitiate(TopicIdPartition tpid) {
int tierEpoch = 0;
UUID objectId = UUID.randomUUID();
long baseOffset = 0;
long endOffset = 500;
long maxTimestamp = 1000;
long firstBatchTimestamp = 250;
int size = 100;
boolean hasEpochState = false;
boolean hasAbortedTxns = false;
boolean hasProducerState = false;
TierUploadType uploadType = TierUploadType.Archive;
OffsetAndEpoch stateOffset = new OffsetAndEpoch(0, Optional.empty());
TierObjectStore.OpaqueData opaqueData = OpaqueData.fromByteArray(new byte[]{0, 1, 2, 3, 4, 5});
return new TierSegmentUploadInitiate(tpid, tierEpoch, objectId, Optional.empty(),
baseOffset, endOffset, maxTimestamp, firstBatchTimestamp, size, hasEpochState, hasAbortedTxns,
hasProducerState, uploadType, stateOffset, opaqueData);
}
private static TierSegmentUploadComplete makeTierSegmentUploadComplete(TopicIdPartition tpid) {
int tierEpoch = 0;
UUID objectId = UUID.randomUUID();
OffsetAndEpoch stateOffset = new OffsetAndEpoch(100, Optional.empty());
return new TierSegmentUploadComplete(tpid, tierEpoch, objectId, stateOffset);
}
}
0% 加载中 .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册