Skip to content
代码片段 群组 项目
未验证 提交 6e0a10b4 编辑于 作者: JoelWee's avatar JoelWee 提交者: GitHub
浏览文件

KAFKA-10277: Allow null keys with non-null mappedKey in KStreamKGlobalTable join (#9186)

Reviewers: A. Sophie Blee-Goldman <sophie@confluent.io>, Matthias J. Sax <matthias@confluent.io>
上级 3669cde0
No related branches found
No related tags found
无相关合并请求
......@@ -58,23 +58,23 @@ class KStreamKTableJoinProcessor<K1, K2, V1, V2, R> extends AbstractProcessor<K1
@Override
public void process(final K1 key, final V1 value) {
// we do join iff keys are equal, thus, if key is null we cannot join and just ignore the record
// If {@code keyMapper} returns {@code null} it implies there is no match,
// so ignore unless it is a left join
// we do join iff the join keys are equal, thus, if {@code keyMapper} returns {@code null} we
// cannot join and just ignore the record. Note for KTables, this is the same as having a null key
// since keyMapper just returns the key, but for GlobalKTables we can have other keyMappers
//
// we also ignore the record if value is null, because in a key-value data model a null-value indicates
// an empty message (ie, there is nothing to be joined) -- this contrast SQL NULL semantics
// furthermore, on left/outer joins 'null' in ValueJoiner#apply() indicates a missing record --
// thus, to be consistent and to avoid ambiguous null semantics, null values are ignored
if (key == null || value == null) {
final K2 mappedKey = keyMapper.apply(key, value);
if (mappedKey == null || value == null) {
LOG.warn(
"Skipping record due to null key or value. key=[{}] value=[{}] topic=[{}] partition=[{}] offset=[{}]",
key, value, context().topic(), context().partition(), context().offset()
"Skipping record due to null join key or value. key=[{}] value=[{}] topic=[{}] partition=[{}] offset=[{}]",
key, value, context().topic(), context().partition(), context().offset()
);
droppedRecordsSensor.record();
} else {
final K2 mappedKey = keyMapper.apply(key, value);
final V2 value2 = mappedKey == null ? null : getValueOrNull(valueGetter.get(mappedKey));
final V2 value2 = getValueOrNull(valueGetter.get(mappedKey));
if (leftJoin || value2 != null) {
context().forward(key, joiner.apply(value, value2));
}
......
......@@ -87,7 +87,7 @@ public class KStreamGlobalKTableJoinTest {
driver.close();
}
private void pushToStream(final int messageCount, final String valuePrefix, final boolean includeForeignKey) {
private void pushToStream(final int messageCount, final String valuePrefix, final boolean includeForeignKey, final boolean includeNullKey) {
final TestInputTopic<Integer, String> inputTopic =
driver.createInputTopic(streamTopic, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ofMillis(1L));
for (int i = 0; i < messageCount; i++) {
......@@ -95,7 +95,11 @@ public class KStreamGlobalKTableJoinTest {
if (includeForeignKey) {
value = value + ",FKey" + expectedKeys[i];
}
inputTopic.pipeInput(expectedKeys[i], value);
Integer key = expectedKeys[i];
if (includeNullKey && i == 0) {
key = null;
}
inputTopic.pipeInput(key, value);
}
}
......@@ -128,7 +132,7 @@ public class KStreamGlobalKTableJoinTest {
// push two items to the primary stream. the globalTable is empty
pushToStream(2, "X", true);
pushToStream(2, "X", true, false);
processor.checkAndClearProcessResult(EMPTY);
}
......@@ -137,7 +141,7 @@ public class KStreamGlobalKTableJoinTest {
// push two items to the primary stream. the globalTable is empty
pushToStream(2, "X", true);
pushToStream(2, "X", true, false);
processor.checkAndClearProcessResult(EMPTY);
// push two items to the globalTable. this should not produce any item.
......@@ -147,7 +151,7 @@ public class KStreamGlobalKTableJoinTest {
// push all four items to the primary stream. this should produce two items.
pushToStream(4, "X", true);
pushToStream(4, "X", true, false);
processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "X0,FKey0+Y0", 0),
new KeyValueTimestamp<>(1, "X1,FKey1+Y1", 1));
......@@ -158,7 +162,7 @@ public class KStreamGlobalKTableJoinTest {
// push all four items to the primary stream. this should produce four items.
pushToStream(4, "X", true);
pushToStream(4, "X", true, false);
processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "X0,FKey0+YY0", 0),
new KeyValueTimestamp<>(1, "X1,FKey1+YY1", 1),
new KeyValueTimestamp<>(2, "X2,FKey2+YY2", 2),
......@@ -180,7 +184,7 @@ public class KStreamGlobalKTableJoinTest {
// push all four items to the primary stream. this should produce two items.
pushToStream(4, "X", true);
pushToStream(4, "X", true, false);
processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "X0,FKey0+Y0", 0),
new KeyValueTimestamp<>(1, "X1,FKey1+Y1", 1));
......@@ -196,7 +200,7 @@ public class KStreamGlobalKTableJoinTest {
// push all four items to the primary stream. this should produce four items.
pushToStream(4, "X", true);
pushToStream(4, "X", true, false);
processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "X0,FKey0+Y0", 0),
new KeyValueTimestamp<>(1, "X1,FKey1+Y1", 1),
new KeyValueTimestamp<>(2, "X2,FKey2+Y2", 2),
......@@ -209,7 +213,7 @@ public class KStreamGlobalKTableJoinTest {
// push all four items to the primary stream. this should produce two items.
pushToStream(4, "XX", true);
pushToStream(4, "XX", true, false);
processor.checkAndClearProcessResult(new KeyValueTimestamp<>(2, "XX2,FKey2+Y2", 2),
new KeyValueTimestamp<>(3, "XX3,FKey3+Y3", 3));
}
......@@ -225,8 +229,21 @@ public class KStreamGlobalKTableJoinTest {
// push all four items to the primary stream with no foreign key, resulting in null keyMapper values.
// this should not produce any item.
pushToStream(4, "XXX", false);
pushToStream(4, "XXX", false, false);
processor.checkAndClearProcessResult(EMPTY);
}
@Test
public void shouldJoinOnNullKeyWithNonNullKeyMapperValues() {
// push two items to the globalTable. this should not produce any item.
pushToGlobalTable(2, "Y");
processor.checkAndClearProcessResult(EMPTY);
// push all four items to the primary stream. this should produce two items.
pushToStream(4, "X", true, true);
processor.checkAndClearProcessResult(new KeyValueTimestamp<>(null, "X0,FKey0+Y0", 0),
new KeyValueTimestamp<>(1, "X1,FKey1+Y1", 1));
}
}
......@@ -87,7 +87,7 @@ public class KStreamGlobalKTableLeftJoinTest {
driver.close();
}
private void pushToStream(final int messageCount, final String valuePrefix, final boolean includeForeignKey) {
private void pushToStream(final int messageCount, final String valuePrefix, final boolean includeForeignKey, final boolean includeNullKey) {
final TestInputTopic<Integer, String> inputTopic =
driver.createInputTopic(streamTopic, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ofMillis(1L));
for (int i = 0; i < messageCount; i++) {
......@@ -95,7 +95,11 @@ public class KStreamGlobalKTableLeftJoinTest {
if (includeForeignKey) {
value = value + ",FKey" + expectedKeys[i];
}
inputTopic.pipeInput(expectedKeys[i], value);
Integer key = expectedKeys[i];
if (includeNullKey && i == 0) {
key = null;
}
inputTopic.pipeInput(key, value);
}
}
......@@ -128,7 +132,7 @@ public class KStreamGlobalKTableLeftJoinTest {
// push two items to the primary stream. the globalTable is empty
pushToStream(2, "X", true);
pushToStream(2, "X", true, false);
processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "X0,FKey0+null", 0),
new KeyValueTimestamp<>(1, "X1,FKey1+null", 1));
}
......@@ -138,7 +142,7 @@ public class KStreamGlobalKTableLeftJoinTest {
// push two items to the primary stream. the globalTable is empty
pushToStream(2, "X", true);
pushToStream(2, "X", true, false);
processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "X0,FKey0+null", 0),
new KeyValueTimestamp<>(1, "X1,FKey1+null", 1));
......@@ -149,7 +153,7 @@ public class KStreamGlobalKTableLeftJoinTest {
// push all four items to the primary stream. this should produce four items.
pushToStream(4, "X", true);
pushToStream(4, "X", true, false);
processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "X0,FKey0+Y0", 0),
new KeyValueTimestamp<>(1, "X1,FKey1+Y1", 1),
new KeyValueTimestamp<>(2, "X2,FKey2+null", 2),
......@@ -162,7 +166,7 @@ public class KStreamGlobalKTableLeftJoinTest {
// push all four items to the primary stream. this should produce four items.
pushToStream(4, "X", true);
pushToStream(4, "X", true, false);
processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "X0,FKey0+YY0", 0),
new KeyValueTimestamp<>(1, "X1,FKey1+YY1", 1),
new KeyValueTimestamp<>(2, "X2,FKey2+YY2", 2),
......@@ -184,7 +188,7 @@ public class KStreamGlobalKTableLeftJoinTest {
// push all four items to the primary stream. this should produce four items.
pushToStream(4, "X", true);
pushToStream(4, "X", true, false);
processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "X0,FKey0+Y0", 0),
new KeyValueTimestamp<>(1, "X1,FKey1+Y1", 1),
new KeyValueTimestamp<>(2, "X2,FKey2+null", 2),
......@@ -202,7 +206,7 @@ public class KStreamGlobalKTableLeftJoinTest {
// push all four items to the primary stream. this should produce four items.
pushToStream(4, "X", true);
pushToStream(4, "X", true, false);
processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "X0,FKey0+Y0", 0),
new KeyValueTimestamp<>(1, "X1,FKey1+Y1", 1),
new KeyValueTimestamp<>(2, "X2,FKey2+Y2", 2),
......@@ -215,7 +219,7 @@ public class KStreamGlobalKTableLeftJoinTest {
// push all four items to the primary stream. this should produce four items.
pushToStream(4, "XX", true);
pushToStream(4, "XX", true, false);
processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "XX0,FKey0+null", 0),
new KeyValueTimestamp<>(1, "XX1,FKey1+null", 1),
new KeyValueTimestamp<>(2, "XX2,FKey2+Y2", 2),
......@@ -223,7 +227,7 @@ public class KStreamGlobalKTableLeftJoinTest {
}
@Test
public void shouldJoinOnNullKeyMapperValues() {
public void shouldNotJoinOnNullKeyMapperValues() {
// push all items to the globalTable. this should not produce any item
......@@ -231,13 +235,25 @@ public class KStreamGlobalKTableLeftJoinTest {
processor.checkAndClearProcessResult(EMPTY);
// push all four items to the primary stream with no foreign key, resulting in null keyMapper values.
// this should produce four items.
// this should not produce any item.
pushToStream(4, "XXX", false);
processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "XXX0+null", 0),
new KeyValueTimestamp<>(1, "XXX1+null", 1),
new KeyValueTimestamp<>(2, "XXX2+null", 2),
new KeyValueTimestamp<>(3, "XXX3+null", 3));
pushToStream(4, "XXX", false, false);
processor.checkAndClearProcessResult(EMPTY);
}
@Test
public void shouldJoinOnNullKeyWithNonNullKeyMapperValues() {
// push four items to the globalTable. this should not produce any item.
pushToGlobalTable(4, "Y");
processor.checkAndClearProcessResult(EMPTY);
// push all four items to the primary stream. this should produce four items.
pushToStream(4, "X", true, true);
processor.checkAndClearProcessResult(new KeyValueTimestamp<>(null, "X0,FKey0+Y0", 0),
new KeyValueTimestamp<>(1, "X1,FKey1+Y1", 1),
new KeyValueTimestamp<>(2, "X2,FKey2+Y2", 2),
new KeyValueTimestamp<>(3, "X3,FKey3+Y3", 3));
}
}
0% 加载中 .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册