Skip to content
代码片段 群组 项目
提交 1d07fb8c 编辑于 作者: John Roesler's avatar John Roesler 提交者: John Roesler
浏览文件

KAFKA-10173: Fix suppress changelog binary schema compatibility (#8905)

We inadvertently changed the binary schema of the suppress buffer changelog
in 2.4.0 without bumping the schema version number. As a result, it is impossible
to upgrade from 2.3.x to 2.4+ if you are using suppression.

* Refactor the schema compatibility test to use serialized data from older versions
as a more foolproof compatibility test.
* Refactor the upgrade system test to use the smoke test application so that we
actually exercise a significant portion of the Streams API during upgrade testing
* Add more recent versions to the upgrade system test matrix
* Fix the compatibility bug by bumping the schema version to 3

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, Guozhang Wang <wangguoz@gmail.com>
上级 8090d733
No related branches found
No related tags found
无相关合并请求
显示
690 个添加258 个删除
......@@ -30,6 +30,7 @@ import java.io.PrintWriter;
import java.io.StringWriter;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.charset.StandardCharsets;
......@@ -276,6 +277,37 @@ public final class Utils {
return dest;
}
/**
* Starting from the current position, read an integer indicating the size of the byte array to read,
* then read the array. Consumes the buffer: upon returning, the buffer's position is after the array
* that is returned.
* @param buffer The buffer to read a size-prefixed array from
* @return The array
*/
public static byte[] getNullableSizePrefixedArray(final ByteBuffer buffer) {
final int size = buffer.getInt();
return getNullableArray(buffer, size);
}
/**
* Read a byte array of the given size. Consumes the buffer: upon returning, the buffer's position
* is after the array that is returned.
* @param buffer The buffer to read a size-prefixed array from
* @param size The number of bytes to read out of the buffer
* @return The array
*/
public static byte[] getNullableArray(final ByteBuffer buffer, final int size) {
if (size > buffer.remaining()) {
// preemptively throw this when the read is doomed to fail, so we don't have to allocate the array.
throw new BufferUnderflowException();
}
final byte[] oldBytes = size == -1 ? null : new byte[size];
if (oldBytes != null) {
buffer.get(oldBytes);
}
return oldBytes;
}
/**
* Returns a copy of src byte array
* @param src The byte array to copy
......
......@@ -25,6 +25,7 @@ import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.File;
import java.io.IOException;
import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.charset.StandardCharsets;
......@@ -51,6 +52,8 @@ import static org.apache.kafka.common.utils.Utils.validHostPattern;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.any;
......@@ -70,7 +73,7 @@ public class UtilsTest {
cases.put("a-little-bit-long-string".getBytes(), -985981536);
cases.put("a-little-bit-longer-string".getBytes(), -1486304829);
cases.put("lkjh234lh9fiuh90y23oiuhsafujhadof229phr9h19h89h8".getBytes(), -58897971);
cases.put(new byte[]{'a', 'b', 'c'}, 479470107);
cases.put(new byte[] {'a', 'b', 'c'}, 479470107);
for (Map.Entry c : cases.entrySet()) {
assertEquals((int) c.getValue(), murmur2((byte[]) c.getKey()));
......@@ -203,6 +206,65 @@ public class UtilsTest {
assertEquals(2, buffer.position());
}
@Test
public void getNullableSizePrefixedArrayExact() {
byte[] input = {0, 0, 0, 2, 1, 0};
final ByteBuffer buffer = ByteBuffer.wrap(input);
final byte[] array = Utils.getNullableSizePrefixedArray(buffer);
assertArrayEquals(new byte[] {1, 0}, array);
assertEquals(6, buffer.position());
assertFalse(buffer.hasRemaining());
}
@Test
public void getNullableSizePrefixedArrayExactEmpty() {
byte[] input = {0, 0, 0, 0};
final ByteBuffer buffer = ByteBuffer.wrap(input);
final byte[] array = Utils.getNullableSizePrefixedArray(buffer);
assertArrayEquals(new byte[] {}, array);
assertEquals(4, buffer.position());
assertFalse(buffer.hasRemaining());
}
@Test
public void getNullableSizePrefixedArrayRemainder() {
byte[] input = {0, 0, 0, 2, 1, 0, 9};
final ByteBuffer buffer = ByteBuffer.wrap(input);
final byte[] array = Utils.getNullableSizePrefixedArray(buffer);
assertArrayEquals(new byte[] {1, 0}, array);
assertEquals(6, buffer.position());
assertTrue(buffer.hasRemaining());
}
@Test
public void getNullableSizePrefixedArrayNull() {
// -1
byte[] input = {-1, -1, -1, -1};
final ByteBuffer buffer = ByteBuffer.wrap(input);
final byte[] array = Utils.getNullableSizePrefixedArray(buffer);
assertNull(array);
assertEquals(4, buffer.position());
assertFalse(buffer.hasRemaining());
}
@Test
public void getNullableSizePrefixedArrayInvalid() {
// -2
byte[] input = {-1, -1, -1, -2};
final ByteBuffer buffer = ByteBuffer.wrap(input);
assertThrows(NegativeArraySizeException.class, () -> Utils.getNullableSizePrefixedArray(buffer));
}
@Test
public void getNullableSizePrefixedArrayUnderflow() {
// Integer.MAX_VALUE
byte[] input = {127, -1, -1, -1};
final ByteBuffer buffer = ByteBuffer.wrap(input);
// note, we get a buffer underflow exception instead of an OOME, even though the encoded size
// would be 2,147,483,647 aka 2.1 GB, probably larger than the available heap
assertThrows(BufferUnderflowException.class, () -> Utils.getNullableSizePrefixedArray(buffer));
}
@Test
public void utf8ByteArraySerde() {
String utf8String = "A\u00ea\u00f1\u00fcC";
......@@ -414,7 +476,7 @@ public class UtilsTest {
String expectedBufferContent = fileChannelMockExpectReadWithRandomBytes(channelMock, bufferSize);
Utils.readFullyOrFail(channelMock, buffer, 0L, "test");
assertEquals("The buffer should be populated correctly", expectedBufferContent,
new String(buffer.array()));
new String(buffer.array()));
assertFalse("The buffer should be filled", buffer.hasRemaining());
verify(channelMock, atLeastOnce()).read(any(), anyLong());
}
......@@ -431,7 +493,7 @@ public class UtilsTest {
ByteBuffer buffer = ByteBuffer.allocate(bufferSize);
Utils.readFully(channelMock, buffer, 0L);
assertEquals("The buffer should be populated correctly.", expectedBufferContent,
new String(buffer.array()));
new String(buffer.array()));
assertFalse("The buffer should be filled", buffer.hasRemaining());
verify(channelMock, atLeastOnce()).read(any(), anyLong());
}
......@@ -480,7 +542,7 @@ public class UtilsTest {
*
* @param channelMock The mocked FileChannel object
* @param bufferSize The buffer size
* @return Expected buffer string
* @return Expected buffer string
* @throws IOException If an I/O error occurs
*/
private String fileChannelMockExpectReadWithRandomBytes(final FileChannel channelMock,
......@@ -517,8 +579,9 @@ public class UtilsTest {
@Override
public void close() throws IOException {
closed = true;
if (closeException != null)
if (closeException != null) {
throw closeException;
}
}
static TestCloseable[] createCloseables(boolean... exceptionOnClose) {
......
......@@ -23,6 +23,7 @@ import org.apache.kafka.common.serialization.Serializer;
import java.nio.ByteBuffer;
import static java.util.Objects.requireNonNull;
import static org.apache.kafka.common.utils.Utils.getNullableSizePrefixedArray;
public final class FullChangeSerde<T> {
private final Serde<T> inner;
......@@ -68,33 +69,6 @@ public final class FullChangeSerde<T> {
return new Change<>(newValue, oldValue);
}
/**
* We used to serialize a Change into a single byte[]. Now, we don't anymore, but we still keep this logic here
* so that we can produce the legacy format to test that we can still deserialize it.
*/
public static byte[] mergeChangeArraysIntoSingleLegacyFormattedArray(final Change<byte[]> serialChange) {
if (serialChange == null) {
return null;
}
final int oldSize = serialChange.oldValue == null ? -1 : serialChange.oldValue.length;
final int newSize = serialChange.newValue == null ? -1 : serialChange.newValue.length;
final ByteBuffer buffer = ByteBuffer.allocate(Integer.BYTES * 2 + Math.max(0, oldSize) + Math.max(0, newSize));
buffer.putInt(oldSize);
if (serialChange.oldValue != null) {
buffer.put(serialChange.oldValue);
}
buffer.putInt(newSize);
if (serialChange.newValue != null) {
buffer.put(serialChange.newValue);
}
return buffer.array();
}
/**
* We used to serialize a Change into a single byte[]. Now, we don't anymore, but we still
* need to be able to read it (so that we can load the state store from previously-written changelog records).
......@@ -104,19 +78,8 @@ public final class FullChangeSerde<T> {
return null;
}
final ByteBuffer buffer = ByteBuffer.wrap(data);
final int oldSize = buffer.getInt();
final byte[] oldBytes = oldSize == -1 ? null : new byte[oldSize];
if (oldBytes != null) {
buffer.get(oldBytes);
}
final int newSize = buffer.getInt();
final byte[] newBytes = newSize == -1 ? null : new byte[newSize];
if (newBytes != null) {
buffer.get(newBytes);
}
final byte[] oldBytes = getNullableSizePrefixedArray(buffer);
final byte[] newBytes = getNullableSizePrefixedArray(buffer);
return new Change<>(newBytes, oldBytes);
}
......
......@@ -26,6 +26,8 @@ import java.nio.ByteBuffer;
import java.util.Objects;
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.Objects.requireNonNull;
import static org.apache.kafka.common.utils.Utils.getNullableSizePrefixedArray;
public class ProcessorRecordContext implements RecordContext {
......@@ -161,12 +163,10 @@ public class ProcessorRecordContext implements RecordContext {
public static ProcessorRecordContext deserialize(final ByteBuffer buffer) {
final long timestamp = buffer.getLong();
final long offset = buffer.getLong();
final int topicSize = buffer.getInt();
final String topic;
{
// not handling the null topic condition, because we believe the topic will never be null when we serialize
final byte[] topicBytes = new byte[topicSize];
buffer.get(topicBytes);
// we believe the topic will never be null when we serialize
final byte[] topicBytes = requireNonNull(getNullableSizePrefixedArray(buffer));
topic = new String(topicBytes, UTF_8);
}
final int partition = buffer.getInt();
......@@ -177,19 +177,8 @@ public class ProcessorRecordContext implements RecordContext {
} else {
final Header[] headerArr = new Header[headerCount];
for (int i = 0; i < headerCount; i++) {
final int keySize = buffer.getInt();
final byte[] keyBytes = new byte[keySize];
buffer.get(keyBytes);
final int valueSize = buffer.getInt();
final byte[] valueBytes;
if (valueSize == -1) {
valueBytes = null;
} else {
valueBytes = new byte[valueSize];
buffer.get(valueBytes);
}
final byte[] keyBytes = requireNonNull(getNullableSizePrefixedArray(buffer));
final byte[] valueBytes = getNullableSizePrefixedArray(buffer);
headerArr[i] = new RecordHeader(new String(keyBytes, UTF_8), valueBytes);
}
headers = new RecordHeaders(headerArr);
......
......@@ -22,6 +22,9 @@ import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Objects;
import static org.apache.kafka.common.utils.Utils.getNullableArray;
import static org.apache.kafka.common.utils.Utils.getNullableSizePrefixedArray;
public final class BufferValue {
private static final int NULL_VALUE_SENTINEL = -1;
private static final int OLD_PREV_DUPLICATE_VALUE_SENTINEL = -2;
......@@ -67,35 +70,21 @@ public final class BufferValue {
static BufferValue deserialize(final ByteBuffer buffer) {
final ProcessorRecordContext context = ProcessorRecordContext.deserialize(buffer);
final byte[] priorValue = extractValue(buffer);
final byte[] priorValue = getNullableSizePrefixedArray(buffer);
final byte[] oldValue;
final int oldValueLength = buffer.getInt();
if (oldValueLength == NULL_VALUE_SENTINEL) {
oldValue = null;
} else if (oldValueLength == OLD_PREV_DUPLICATE_VALUE_SENTINEL) {
if (oldValueLength == OLD_PREV_DUPLICATE_VALUE_SENTINEL) {
oldValue = priorValue;
} else {
oldValue = new byte[oldValueLength];
buffer.get(oldValue);
oldValue = getNullableArray(buffer, oldValueLength);
}
final byte[] newValue = extractValue(buffer);
final byte[] newValue = getNullableSizePrefixedArray(buffer);
return new BufferValue(priorValue, oldValue, newValue, context);
}
private static byte[] extractValue(final ByteBuffer buffer) {
final int valueLength = buffer.getInt();
if (valueLength == NULL_VALUE_SENTINEL) {
return null;
} else {
final byte[] value = new byte[valueLength];
buffer.get(value);
return value;
}
}
ByteBuffer serialize(final int endPadding) {
final int sizeOfValueLength = Integer.BYTES;
......@@ -120,7 +109,7 @@ public final class BufferValue {
if (oldValue == null) {
buffer.putInt(NULL_VALUE_SENTINEL);
} else if (priorValue == oldValue) {
} else if (Arrays.equals(priorValue, oldValue)) {
buffer.putInt(OLD_PREV_DUPLICATE_VALUE_SENTINEL);
} else {
buffer.putInt(sizeOfOldValue);
......
......@@ -22,6 +22,8 @@ import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Objects;
import static org.apache.kafka.common.utils.Utils.getNullableSizePrefixedArray;
public class ContextualRecord {
private final byte[] value;
private final ProcessorRecordContext recordContext;
......@@ -43,36 +45,10 @@ public class ContextualRecord {
return (value == null ? 0 : value.length) + recordContext.residentMemorySizeEstimate();
}
ByteBuffer serialize(final int endPadding) {
final byte[] serializedContext = recordContext.serialize();
final int sizeOfContext = serializedContext.length;
final int sizeOfValueLength = Integer.BYTES;
final int sizeOfValue = value == null ? 0 : value.length;
final ByteBuffer buffer = ByteBuffer.allocate(sizeOfContext + sizeOfValueLength + sizeOfValue + endPadding);
buffer.put(serializedContext);
if (value == null) {
buffer.putInt(-1);
} else {
buffer.putInt(value.length);
buffer.put(value);
}
return buffer;
}
static ContextualRecord deserialize(final ByteBuffer buffer) {
final ProcessorRecordContext context = ProcessorRecordContext.deserialize(buffer);
final int valueLength = buffer.getInt();
if (valueLength == -1) {
return new ContextualRecord(null, context);
} else {
final byte[] value = new byte[valueLength];
buffer.get(value);
return new ContextualRecord(value, context);
}
final byte[] value = getNullableSizePrefixedArray(buffer);
return new ContextualRecord(value, context);
}
@Override
......
......@@ -38,9 +38,11 @@ import org.apache.kafka.streams.processor.internals.RecordQueue;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.state.internals.TimeOrderedKeyValueBufferChangelogDeserializationHelper.DeserializationResult;
import org.apache.kafka.streams.state.internals.metrics.StateStoreMetrics;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
......@@ -54,14 +56,19 @@ import java.util.function.Consumer;
import java.util.function.Supplier;
import static java.util.Objects.requireNonNull;
import static org.apache.kafka.streams.state.internals.TimeOrderedKeyValueBufferChangelogDeserializationHelper.deserializeV0;
import static org.apache.kafka.streams.state.internals.TimeOrderedKeyValueBufferChangelogDeserializationHelper.deserializeV1;
import static org.apache.kafka.streams.state.internals.TimeOrderedKeyValueBufferChangelogDeserializationHelper.deserializeV3;
import static org.apache.kafka.streams.state.internals.TimeOrderedKeyValueBufferChangelogDeserializationHelper.duckTypeV2;
public final class InMemoryTimeOrderedKeyValueBuffer<K, V> implements TimeOrderedKeyValueBuffer<K, V> {
private static final BytesSerializer KEY_SERIALIZER = new BytesSerializer();
private static final ByteArraySerializer VALUE_SERIALIZER = new ByteArraySerializer();
private static final RecordHeaders V_1_CHANGELOG_HEADERS =
new RecordHeaders(new Header[] {new RecordHeader("v", new byte[] {(byte) 1})});
private static final RecordHeaders V_2_CHANGELOG_HEADERS =
new RecordHeaders(new Header[] {new RecordHeader("v", new byte[] {(byte) 2})});
private static final byte[] V_1_CHANGELOG_HEADER_VALUE = {(byte) 1};
private static final byte[] V_2_CHANGELOG_HEADER_VALUE = {(byte) 2};
private static final byte[] V_3_CHANGELOG_HEADER_VALUE = {(byte) 3};
static final RecordHeaders CHANGELOG_HEADERS =
new RecordHeaders(new Header[] {new RecordHeader("v", V_3_CHANGELOG_HEADER_VALUE)});
private static final String METRIC_SCOPE = "in-memory-suppression";
private final Map<Bytes, BufferKey> index = new HashMap<>();
......@@ -259,12 +266,11 @@ public final class InMemoryTimeOrderedKeyValueBuffer<K, V> implements TimeOrdere
final int sizeOfBufferTime = Long.BYTES;
final ByteBuffer buffer = value.serialize(sizeOfBufferTime);
buffer.putLong(bufferKey.time());
collector.send(
changelogTopic,
key,
buffer.array(),
V_2_CHANGELOG_HEADERS,
CHANGELOG_HEADERS,
partition,
null,
KEY_SERIALIZER,
......@@ -286,6 +292,15 @@ public final class InMemoryTimeOrderedKeyValueBuffer<K, V> implements TimeOrdere
private void restoreBatch(final Collection<ConsumerRecord<byte[], byte[]>> batch) {
for (final ConsumerRecord<byte[], byte[]> record : batch) {
if (record.partition() != partition) {
throw new IllegalStateException(
String.format(
"record partition [%d] is being restored by the wrong suppress partition [%d]",
record.partition(),
partition
)
);
}
final Bytes key = Bytes.wrap(record.key());
if (record.value() == null) {
// This was a tombstone. Delete the record.
......@@ -299,92 +314,63 @@ public final class InMemoryTimeOrderedKeyValueBuffer<K, V> implements TimeOrdere
minTimestamp = sortedMap.isEmpty() ? Long.MAX_VALUE : sortedMap.firstKey().time();
}
}
if (record.partition() != partition) {
throw new IllegalStateException(
String.format(
"record partition [%d] is being restored by the wrong suppress partition [%d]",
record.partition(),
partition
)
);
}
} else {
if (record.headers().lastHeader("v") == null) {
// in this case, the changelog value is just the serialized record value
final ByteBuffer timeAndValue = ByteBuffer.wrap(record.value());
final long time = timeAndValue.getLong();
final byte[] changelogValue = new byte[record.value().length - 8];
timeAndValue.get(changelogValue);
final Change<byte[]> change = requireNonNull(FullChangeSerde.decomposeLegacyFormattedArrayIntoChangeArrays(changelogValue));
final ProcessorRecordContext recordContext = new ProcessorRecordContext(
record.timestamp(),
record.offset(),
record.partition(),
record.topic(),
record.headers()
);
cleanPut(
time,
key,
new BufferValue(
index.containsKey(key)
? internalPriorValueForBuffered(key)
: change.oldValue,
change.oldValue,
change.newValue,
recordContext
)
);
} else if (V_1_CHANGELOG_HEADERS.lastHeader("v").equals(record.headers().lastHeader("v"))) {
// in this case, the changelog value is a serialized ContextualRecord
final ByteBuffer timeAndValue = ByteBuffer.wrap(record.value());
final long time = timeAndValue.getLong();
final byte[] changelogValue = new byte[record.value().length - 8];
timeAndValue.get(changelogValue);
final ContextualRecord contextualRecord = ContextualRecord.deserialize(ByteBuffer.wrap(changelogValue));
final Change<byte[]> change = requireNonNull(FullChangeSerde.decomposeLegacyFormattedArrayIntoChangeArrays(contextualRecord.value()));
cleanPut(
time,
key,
new BufferValue(
index.containsKey(key)
? internalPriorValueForBuffered(key)
: change.oldValue,
change.oldValue,
change.newValue,
contextualRecord.recordContext()
)
);
} else if (V_2_CHANGELOG_HEADERS.lastHeader("v").equals(record.headers().lastHeader("v"))) {
// in this case, the changelog value is a serialized BufferValue
final ByteBuffer valueAndTime = ByteBuffer.wrap(record.value());
final BufferValue bufferValue = BufferValue.deserialize(valueAndTime);
final long time = valueAndTime.getLong();
cleanPut(time, key, bufferValue);
final Header versionHeader = record.headers().lastHeader("v");
if (versionHeader == null) {
// Version 0:
// value:
// - buffer time
// - old value
// - new value
final byte[] previousBufferedValue = index.containsKey(key)
? internalPriorValueForBuffered(key)
: null;
final DeserializationResult deserializationResult = deserializeV0(record, key, previousBufferedValue);
cleanPut(deserializationResult.time(), deserializationResult.key(), deserializationResult.bufferValue());
} else if (Arrays.equals(versionHeader.value(), V_3_CHANGELOG_HEADER_VALUE)) {
// Version 3:
// value:
// - record context
// - prior value
// - old value
// - new value
// - buffer time
final DeserializationResult deserializationResult = deserializeV3(record, key);
cleanPut(deserializationResult.time(), deserializationResult.key(), deserializationResult.bufferValue());
} else if (Arrays.equals(versionHeader.value(), V_2_CHANGELOG_HEADER_VALUE)) {
// Version 2:
// value:
// - record context
// - old value
// - new value
// - prior value
// - buffer time
// NOTE: 2.4.0, 2.4.1, and 2.5.0 actually encode Version 3 formatted data,
// but still set the Version 2 flag, so to deserialize, we have to duck type.
final DeserializationResult deserializationResult = duckTypeV2(record, key);
cleanPut(deserializationResult.time(), deserializationResult.key(), deserializationResult.bufferValue());
} else if (Arrays.equals(versionHeader.value(), V_1_CHANGELOG_HEADER_VALUE)) {
// Version 1:
// value:
// - buffer time
// - record context
// - old value
// - new value
final byte[] previousBufferedValue = index.containsKey(key)
? internalPriorValueForBuffered(key)
: null;
final DeserializationResult deserializationResult = deserializeV1(record, key, previousBufferedValue);
cleanPut(deserializationResult.time(), deserializationResult.key(), deserializationResult.bufferValue());
} else {
throw new IllegalArgumentException("Restoring apparently invalid changelog record: " + record);
}
}
if (record.partition() != partition) {
throw new IllegalStateException(
String.format(
"record partition [%d] is being restored by the wrong suppress partition [%d]",
record.partition(),
partition
)
);
}
}
updateBufferMetrics();
}
@Override
public void evictWhile(final Supplier<Boolean> predicate,
final Consumer<Eviction<K, V>> callback) {
......@@ -481,8 +467,7 @@ public final class InMemoryTimeOrderedKeyValueBuffer<K, V> implements TimeOrdere
final BufferValue buffered = getBuffered(serializedKey);
final byte[] serializedPriorValue;
if (buffered == null) {
final V priorValue = value.oldValue;
serializedPriorValue = (priorValue == null) ? null : valueSerde.innerSerde().serializer().serialize(changelogTopic, priorValue);
serializedPriorValue = serialChange.oldValue;
} else {
serializedPriorValue = buffered.priorValue();
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.state.internals;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.kstream.internals.Change;
import org.apache.kafka.streams.kstream.internals.FullChangeSerde;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import java.nio.ByteBuffer;
import static java.util.Objects.requireNonNull;
final class TimeOrderedKeyValueBufferChangelogDeserializationHelper {
private TimeOrderedKeyValueBufferChangelogDeserializationHelper() {}
static final class DeserializationResult {
private final long time;
private final Bytes key;
private final BufferValue bufferValue;
private DeserializationResult(final long time, final Bytes key, final BufferValue bufferValue) {
this.time = time;
this.key = key;
this.bufferValue = bufferValue;
}
long time() {
return time;
}
Bytes key() {
return key;
}
BufferValue bufferValue() {
return bufferValue;
}
}
static DeserializationResult deserializeV0(final ConsumerRecord<byte[], byte[]> record,
final Bytes key,
final byte[] previousBufferedValue) {
final ByteBuffer timeAndValue = ByteBuffer.wrap(record.value());
final long time = timeAndValue.getLong();
final byte[] changelogValue = new byte[record.value().length - 8];
timeAndValue.get(changelogValue);
final Change<byte[]> change = requireNonNull(FullChangeSerde.decomposeLegacyFormattedArrayIntoChangeArrays(changelogValue));
final ProcessorRecordContext recordContext = new ProcessorRecordContext(
record.timestamp(),
record.offset(),
record.partition(),
record.topic(),
record.headers()
);
return new DeserializationResult(
time,
key,
new BufferValue(
previousBufferedValue == null ? change.oldValue : previousBufferedValue,
change.oldValue,
change.newValue,
recordContext
)
);
}
static DeserializationResult deserializeV1(final ConsumerRecord<byte[], byte[]> record,
final Bytes key,
final byte[] previousBufferedValue) {
final ByteBuffer timeAndValue = ByteBuffer.wrap(record.value());
final long time = timeAndValue.getLong();
final byte[] changelogValue = new byte[record.value().length - 8];
timeAndValue.get(changelogValue);
final ContextualRecord contextualRecord = ContextualRecord.deserialize(ByteBuffer.wrap(changelogValue));
final Change<byte[]> change = requireNonNull(FullChangeSerde.decomposeLegacyFormattedArrayIntoChangeArrays(contextualRecord.value()));
return new DeserializationResult(
time,
key,
new BufferValue(
previousBufferedValue == null ? change.oldValue : previousBufferedValue,
change.oldValue,
change.newValue,
contextualRecord.recordContext()
)
);
}
static DeserializationResult duckTypeV2(final ConsumerRecord<byte[], byte[]> record, final Bytes key) {
DeserializationResult deserializationResult = null;
RuntimeException v2DeserializationException = null;
RuntimeException v3DeserializationException = null;
try {
deserializationResult = deserializeV2(record, key);
} catch (final RuntimeException e) {
v2DeserializationException = e;
}
// versions 2.4.0, 2.4.1, and 2.5.0 would have erroneously encoded a V3 record with the
// V2 header, so we'll try duck-typing to see if this is decodable as V3
if (deserializationResult == null) {
try {
deserializationResult = deserializeV3(record, key);
} catch (final RuntimeException e) {
v3DeserializationException = e;
}
}
if (deserializationResult == null) {
// ok, it wasn't V3 either. Throw both exceptions:
final RuntimeException exception =
new RuntimeException("Couldn't deserialize record as v2 or v3: " + record,
v2DeserializationException);
exception.addSuppressed(v3DeserializationException);
throw exception;
}
return deserializationResult;
}
private static DeserializationResult deserializeV2(final ConsumerRecord<byte[], byte[]> record,
final Bytes key) {
final ByteBuffer valueAndTime = ByteBuffer.wrap(record.value());
final ContextualRecord contextualRecord = ContextualRecord.deserialize(valueAndTime);
final Change<byte[]> change = requireNonNull(FullChangeSerde.decomposeLegacyFormattedArrayIntoChangeArrays(contextualRecord.value()));
final byte[] priorValue = Utils.getNullableSizePrefixedArray(valueAndTime);
final long time = valueAndTime.getLong();
final BufferValue bufferValue = new BufferValue(priorValue, change.oldValue, change.newValue, contextualRecord.recordContext());
return new DeserializationResult(time, key, bufferValue);
}
static DeserializationResult deserializeV3(final ConsumerRecord<byte[], byte[]> record, final Bytes key) {
final ByteBuffer valueAndTime = ByteBuffer.wrap(record.value());
final BufferValue bufferValue = BufferValue.deserialize(valueAndTime);
final long time = valueAndTime.getLong();
return new DeserializationResult(time, key, bufferValue);
}
}
\ No newline at end of file
......@@ -19,6 +19,8 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Serdes;
import org.junit.Test;
import java.nio.ByteBuffer;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.Is.is;
......@@ -26,10 +28,37 @@ import static org.hamcrest.core.Is.is;
public class FullChangeSerdeTest {
private final FullChangeSerde<String> serde = FullChangeSerde.wrap(Serdes.String());
/**
* We used to serialize a Change into a single byte[]. Now, we don't anymore, but we still keep this logic here
* so that we can produce the legacy format to test that we can still deserialize it.
*/
private static byte[] mergeChangeArraysIntoSingleLegacyFormattedArray(final Change<byte[]> serialChange) {
if (serialChange == null) {
return null;
}
final int oldSize = serialChange.oldValue == null ? -1 : serialChange.oldValue.length;
final int newSize = serialChange.newValue == null ? -1 : serialChange.newValue.length;
final ByteBuffer buffer = ByteBuffer.allocate(Integer.BYTES * 2 + Math.max(0, oldSize) + Math.max(0, newSize));
buffer.putInt(oldSize);
if (serialChange.oldValue != null) {
buffer.put(serialChange.oldValue);
}
buffer.putInt(newSize);
if (serialChange.newValue != null) {
buffer.put(serialChange.newValue);
}
return buffer.array();
}
@Test
public void shouldRoundTripNull() {
assertThat(serde.serializeParts(null, null), nullValue());
assertThat(FullChangeSerde.mergeChangeArraysIntoSingleLegacyFormattedArray(null), nullValue());
assertThat(mergeChangeArraysIntoSingleLegacyFormattedArray(null), nullValue());
assertThat(FullChangeSerde.decomposeLegacyFormattedArrayIntoChangeArrays(null), nullValue());
assertThat(serde.deserializeParts(null, null), nullValue());
}
......@@ -47,7 +76,7 @@ public class FullChangeSerdeTest {
is(new Change<String>(null, null))
);
final byte[] legacyFormat = FullChangeSerde.mergeChangeArraysIntoSingleLegacyFormattedArray(new Change<>(null, null));
final byte[] legacyFormat = mergeChangeArraysIntoSingleLegacyFormattedArray(new Change<>(null, null));
assertThat(
FullChangeSerde.decomposeLegacyFormattedArrayIntoChangeArrays(legacyFormat),
is(new Change<byte[]>(null, null))
......@@ -57,7 +86,7 @@ public class FullChangeSerdeTest {
@Test
public void shouldRoundTripOldNull() {
final Change<byte[]> serialized = serde.serializeParts(null, new Change<>("new", null));
final byte[] legacyFormat = FullChangeSerde.mergeChangeArraysIntoSingleLegacyFormattedArray(serialized);
final byte[] legacyFormat = mergeChangeArraysIntoSingleLegacyFormattedArray(serialized);
final Change<byte[]> decomposedLegacyFormat = FullChangeSerde.decomposeLegacyFormattedArrayIntoChangeArrays(legacyFormat);
assertThat(
serde.deserializeParts(null, decomposedLegacyFormat),
......@@ -68,7 +97,7 @@ public class FullChangeSerdeTest {
@Test
public void shouldRoundTripNewNull() {
final Change<byte[]> serialized = serde.serializeParts(null, new Change<>(null, "old"));
final byte[] legacyFormat = FullChangeSerde.mergeChangeArraysIntoSingleLegacyFormattedArray(serialized);
final byte[] legacyFormat = mergeChangeArraysIntoSingleLegacyFormattedArray(serialized);
final Change<byte[]> decomposedLegacyFormat = FullChangeSerde.decomposeLegacyFormattedArrayIntoChangeArrays(legacyFormat);
assertThat(
serde.deserializeParts(null, decomposedLegacyFormat),
......@@ -79,7 +108,7 @@ public class FullChangeSerdeTest {
@Test
public void shouldRoundTripChange() {
final Change<byte[]> serialized = serde.serializeParts(null, new Change<>("new", "old"));
final byte[] legacyFormat = FullChangeSerde.mergeChangeArraysIntoSingleLegacyFormattedArray(serialized);
final byte[] legacyFormat = mergeChangeArraysIntoSingleLegacyFormattedArray(serialized);
final Change<byte[]> decomposedLegacyFormat = FullChangeSerde.decomposeLegacyFormattedArrayIntoChangeArrays(legacyFormat);
assertThat(
serde.deserializeParts(null, decomposedLegacyFormat),
......
0% 加载中 .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册