Skip to content
代码片段 群组 项目
未验证 提交 901bf57c 编辑于 作者: Gonzalo Muñoz's avatar Gonzalo Muñoz 提交者: GitHub
浏览文件

KAFKA-10503: MockProducer doesn't throw ClassCastException when no partition...

KAFKA-10503: MockProducer doesn't throw ClassCastException when no partition for topic exists (#9309)

Reviewer: Matthias J. Sax <matthias@confluent.io>
上级 b8090add
No related branches found
No related tags found
无相关合并请求
......@@ -302,6 +302,12 @@ public class MockProducer<K, V> implements Producer<K, V> {
int partition = 0;
if (!this.cluster.partitionsForTopic(record.topic()).isEmpty())
partition = partition(record, this.cluster);
else {
//just to throw ClassCastException if serializers are not the proper ones to serialize key/value
keySerializer.serialize(record.topic(), record.key());
valueSerializer.serialize(record.topic(), record.value());
}
TopicPartition topicPartition = new TopicPartition(record.topic(), partition);
ProduceRequestResult result = new ProduceRequestResult(topicPartition);
FutureRecordMetadata future = new FutureRecordMetadata(result, 0, RecordBatch.NO_TIMESTAMP,
......
......@@ -24,6 +24,7 @@ import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.test.MockSerializer;
import org.junit.After;
......@@ -43,6 +44,7 @@ import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
......@@ -765,6 +767,14 @@ public class MockProducerTest {
fail("Should have thrown as producer is already closed");
} catch (IllegalStateException e) { }
}
@Test
@SuppressWarnings("unchecked")
public void shouldThrowClassCastException() {
try (MockProducer<Integer, String> customProducer = new MockProducer<>(true, new IntegerSerializer(), new StringSerializer());) {
assertThrows(ClassCastException.class, () -> customProducer.send(new ProducerRecord(topic, "key1", "value1")));
}
}
@Test
public void shouldBeFlushedIfNoBufferedRecords() {
......
0% 加载中 .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册