Skip to content
代码片段 群组 项目
未验证 提交 5e4c8f70 编辑于 作者: Divij Vaidya's avatar Divij Vaidya 提交者: GitHub
浏览文件

KAFKA-13943; Make `LocalLogManager` implementation consistent with the...

KAFKA-13943; Make `LocalLogManager` implementation consistent with the `RaftClient` contract (#12224)

Fixes two issues in the implementation of `LocalLogManager`:

- As per the interface contract for `RaftClient.scheduleAtomicAppend()`, it should throw a `NotLeaderException` exception when the provided current leader epoch does not match the current epoch. However, the current `LocalLogManager`'s implementation of the API returns a LONG_MAX instead of throwing an exception. This change fixes the behaviour and makes it consistent with the interface contract.
-  As per the interface contract for `RaftClient.resign(epoch)`if the parameter epoch does not match the current epoch, this call will be ignored. But in the current `LocalLogManager` implementation the leader epoch might change when the thread is waiting to acquire a lock on `shared.tryAppend()` (note that tryAppend() is a synchronized method). In such a case, if a NotALeaderException is thrown (as per code change in above), then resign should be ignored.

Reviewers: José Armando García Sancio <jsancio@users.noreply.github.com>, Tom Bentley <tbentley@redhat.com>, Jason Gustafson <jason@confluent.io>
上级 3ae1afa4
No related branches found
No related tags found
无相关合并请求
...@@ -31,6 +31,7 @@ import org.apache.kafka.raft.Batch; ...@@ -31,6 +31,7 @@ import org.apache.kafka.raft.Batch;
import org.apache.kafka.raft.LeaderAndEpoch; import org.apache.kafka.raft.LeaderAndEpoch;
import org.apache.kafka.raft.OffsetAndEpoch; import org.apache.kafka.raft.OffsetAndEpoch;
import org.apache.kafka.raft.RaftClient; import org.apache.kafka.raft.RaftClient;
import org.apache.kafka.raft.errors.NotLeaderException;
import org.apache.kafka.raft.internals.MemoryBatchReader; import org.apache.kafka.raft.internals.MemoryBatchReader;
import org.apache.kafka.server.common.ApiMessageAndVersion; import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.snapshot.MockRawSnapshotReader; import org.apache.kafka.snapshot.MockRawSnapshotReader;
...@@ -226,16 +227,20 @@ public final class LocalLogManager implements RaftClient<ApiMessageAndVersion>, ...@@ -226,16 +227,20 @@ public final class LocalLogManager implements RaftClient<ApiMessageAndVersion>,
} }
synchronized long tryAppend(int nodeId, int epoch, LocalBatch batch) { synchronized long tryAppend(int nodeId, int epoch, LocalBatch batch) {
if (epoch != leader.epoch()) {
log.trace("tryAppend(nodeId={}, epoch={}): the provided epoch does not " +
"match the current leader epoch of {}.", nodeId, epoch, leader.epoch());
return Long.MAX_VALUE;
}
if (!leader.isLeader(nodeId)) { if (!leader.isLeader(nodeId)) {
log.trace("tryAppend(nodeId={}, epoch={}): the given node id does not " + log.debug("tryAppend(nodeId={}, epoch={}): the given node id does not " +
"match the current leader id of {}.", nodeId, epoch, leader.leaderId()); "match the current leader id of {}.", nodeId, epoch, leader.leaderId());
return Long.MAX_VALUE; throw new NotLeaderException("Append failed because the replication is not the current leader");
}
if (epoch < leader.epoch()) {
throw new NotLeaderException("Append failed because the given epoch " + epoch + " is stale. " +
"Current leader epoch = " + leader.epoch());
} else if (epoch > leader.epoch()) {
throw new IllegalArgumentException("Attempt to append from epoch " + epoch +
" which is larger than the current epoch " + leader.epoch());
} }
log.trace("tryAppend(nodeId={}): appending {}.", nodeId, batch); log.trace("tryAppend(nodeId={}): appending {}.", nodeId, batch);
long offset = append(batch); long offset = append(batch);
electLeaderIfNeeded(); electLeaderIfNeeded();
...@@ -723,9 +728,35 @@ public final class LocalLogManager implements RaftClient<ApiMessageAndVersion>, ...@@ -723,9 +728,35 @@ public final class LocalLogManager implements RaftClient<ApiMessageAndVersion>,
@Override @Override
public void resign(int epoch) { public void resign(int epoch) {
LeaderAndEpoch curLeader = leader; if (epoch < 0) {
LeaderAndEpoch nextLeader = new LeaderAndEpoch(OptionalInt.empty(), curLeader.epoch() + 1); throw new IllegalArgumentException("Attempt to resign from an invalid negative epoch " + epoch);
shared.tryAppend(nodeId, curLeader.epoch(), new LeaderChangeBatch(nextLeader)); }
LeaderAndEpoch leaderAndEpoch = leaderAndEpoch();
int currentEpoch = leaderAndEpoch.epoch();
if (epoch > currentEpoch) {
throw new IllegalArgumentException("Attempt to resign from epoch " + epoch +
" which is larger than the current epoch " + currentEpoch);
} else if (epoch < currentEpoch) {
// If the passed epoch is smaller than the current epoch, then it might mean
// that the listener has not been notified about a leader change that already
// took place. In this case, we consider the call as already fulfilled and
// take no further action.
log.debug("Ignoring call to resign from epoch {} since it is smaller than the " +
"current epoch {}", epoch, currentEpoch);
return;
}
LeaderAndEpoch nextLeader = new LeaderAndEpoch(OptionalInt.empty(), currentEpoch + 1);
try {
shared.tryAppend(nodeId, currentEpoch, new LeaderChangeBatch(nextLeader));
} catch (NotLeaderException exp) {
// the leader epoch has already advanced. resign is a no op.
log.debug("Ignoring call to resign from epoch {}. Either we are not the leader or the provided epoch is " +
"smaller than the current epoch {}", epoch, currentEpoch);
return;
}
} }
@Override @Override
......
...@@ -205,8 +205,11 @@ public interface RaftClient<T> extends AutoCloseable { ...@@ -205,8 +205,11 @@ public interface RaftClient<T> extends AutoCloseable {
* Notification of successful resignation can be observed through * Notification of successful resignation can be observed through
* {@link Listener#handleLeaderChange(LeaderAndEpoch)}. * {@link Listener#handleLeaderChange(LeaderAndEpoch)}.
* *
* @param epoch the epoch to resign from. If this does not match the current epoch, this * @param epoch the epoch to resign from. If this epoch is smaller than the current epoch, this
* call will be ignored. * call will be ignored.
*
* @throws IllegalArgumentException - if the passed epoch is invalid (negative or greater than current) or
* if the listener is not the leader associated with this epoch.
*/ */
void resign(int epoch); void resign(int epoch);
......
...@@ -104,7 +104,8 @@ public class BatchAccumulator<T> implements Closeable { ...@@ -104,7 +104,8 @@ public class BatchAccumulator<T> implements Closeable {
* @throws RecordBatchTooLargeException if the size of one record T is greater than the maximum * @throws RecordBatchTooLargeException if the size of one record T is greater than the maximum
* batch size; if this exception is throw some of the elements in records may have * batch size; if this exception is throw some of the elements in records may have
* been committed * been committed
* @throws NotLeaderException if the epoch doesn't match the leader epoch * @throws NotLeaderException if the epoch is less than the leader epoch
* @throws IllegalArgumentException if the epoch is invalid (greater than the leader epoch)
* @throws BufferAllocationException if we failed to allocate memory for the records * @throws BufferAllocationException if we failed to allocate memory for the records
*/ */
public long append(int epoch, List<T> records) { public long append(int epoch, List<T> records) {
...@@ -123,7 +124,8 @@ public class BatchAccumulator<T> implements Closeable { ...@@ -123,7 +124,8 @@ public class BatchAccumulator<T> implements Closeable {
* @throws RecordBatchTooLargeException if the size of the records is greater than the maximum * @throws RecordBatchTooLargeException if the size of the records is greater than the maximum
* batch size; if this exception is throw none of the elements in records were * batch size; if this exception is throw none of the elements in records were
* committed * committed
* @throws NotLeaderException if the epoch doesn't match the leader epoch * @throws NotLeaderException if the epoch is less than the leader epoch
* @throws IllegalArgumentException if the epoch is invalid (greater than the leader epoch)
* @throws BufferAllocationException if we failed to allocate memory for the records * @throws BufferAllocationException if we failed to allocate memory for the records
*/ */
public long appendAtomic(int epoch, List<T> records) { public long appendAtomic(int epoch, List<T> records) {
...@@ -132,7 +134,8 @@ public class BatchAccumulator<T> implements Closeable { ...@@ -132,7 +134,8 @@ public class BatchAccumulator<T> implements Closeable {
private long append(int epoch, List<T> records, boolean isAtomic) { private long append(int epoch, List<T> records, boolean isAtomic) {
if (epoch < this.epoch) { if (epoch < this.epoch) {
throw new NotLeaderException("Append failed because the epoch doesn't match"); throw new NotLeaderException("Append failed because the given epoch " + epoch + " is stale. " +
"Current leader epoch = " + this.epoch());
} else if (epoch > this.epoch) { } else if (epoch > this.epoch) {
throw new IllegalArgumentException("Attempt to append from epoch " + epoch + throw new IllegalArgumentException("Attempt to append from epoch " + epoch +
" which is larger than the current epoch " + this.epoch); " which is larger than the current epoch " + this.epoch);
......
0% 加载中 .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册