Skip to content
代码片段 群组 项目
未验证 提交 4c6b9629 编辑于 作者: Chris Egerton's avatar Chris Egerton 提交者: GitHub
浏览文件

KAFKA-10188: Prevent SinkTask::preCommit from being called after SinkTask::stop (#8910)

上级 7be8bd8c
No related branches found
No related tags found
无相关合并请求
......@@ -94,6 +94,7 @@ class WorkerSinkTask extends WorkerTask {
private int commitFailures;
private boolean pausedForRedelivery;
private boolean committing;
private boolean taskStopped;
private final WorkerErrantRecordReporter workerErrantRecordReporter;
public WorkerSinkTask(ConnectorTaskId id,
......@@ -138,6 +139,7 @@ class WorkerSinkTask extends WorkerTask {
this.sinkTaskMetricsGroup.recordOffsetSequenceNumber(commitSeqno);
this.consumer = consumer;
this.isTopicTrackingEnabled = workerConfig.getBoolean(TOPIC_TRACKING_ENABLE_CONFIG);
this.taskStopped = false;
this.workerErrantRecordReporter = workerErrantRecordReporter;
}
......@@ -168,6 +170,7 @@ class WorkerSinkTask extends WorkerTask {
} catch (Throwable t) {
log.warn("Could not stop task", t);
}
taskStopped = true;
Utils.closeQuietly(consumer, "consumer");
Utils.closeQuietly(transformationChain, "transformation chain");
Utils.closeQuietly(retryWithToleranceOperator, "retry operator");
......@@ -712,6 +715,10 @@ class WorkerSinkTask extends WorkerTask {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
if (taskStopped) {
log.trace("Skipping partition revocation callback as task has already been stopped");
return;
}
log.debug("{} Partitions revoked", WorkerSinkTask.this);
try {
closePartitions();
......
......@@ -315,6 +315,56 @@ public class WorkerSinkTaskTest {
PowerMock.verifyAll();
}
@Test
public void testShutdown() throws Exception {
createTask(initialState);
expectInitializeTask();
expectTaskGetTopic(true);
// first iteration
expectPollInitialAssignment();
// second iteration
EasyMock.expect(sinkTask.preCommit(EasyMock.anyObject())).andReturn(Collections.emptyMap());
expectConsumerPoll(1);
expectConversionAndTransformation(1);
sinkTask.put(EasyMock.<Collection<SinkRecord>>anyObject());
EasyMock.expectLastCall();
// WorkerSinkTask::stop
consumer.wakeup();
PowerMock.expectLastCall();
sinkTask.stop();
PowerMock.expectLastCall();
// WorkerSinkTask::close
consumer.close();
PowerMock.expectLastCall().andAnswer(new IAnswer<Object>() {
@Override
public Object answer() throws Throwable {
rebalanceListener.getValue().onPartitionsRevoked(
asList(TOPIC_PARTITION, TOPIC_PARTITION2)
);
return null;
}
});
transformationChain.close();
PowerMock.expectLastCall();
PowerMock.replayAll();
workerTask.initialize(TASK_CONFIG);
workerTask.initializeAndStart();
workerTask.iteration();
sinkTaskContext.getValue().requestCommit(); // Force an offset commit
workerTask.iteration();
workerTask.stop();
workerTask.close();
PowerMock.verifyAll();
}
@Test
public void testPollRedelivery() throws Exception {
createTask(initialState);
......
0% 加载中 .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册