Skip to content
代码片段 群组 项目
未验证 提交 0c4da230 编辑于 作者: Colin Patrick McCabe's avatar Colin Patrick McCabe 提交者: GitHub
浏览文件

KAFKA-14129: KRaft must check manual assignments for createTopics are contiguous (#12467)

KRaft should validate that manual assignments given to createTopics are contiguous. In other words,
they must start with partition 0, and progress through 1, 2, 3, etc. ZK mode does this, but KRaft
mode previously did not. Also fix a null pointer exception when the placement for partition 0
was not specified.

Convert over AddPartitionsTest to use KRaft. This PR converts all of the test except for some of
the placement logic tests, which will need to be redone for KRaft mode in a future change.

Fix null pointer exception in KRaftMetadataCache#getPartitionInfo.  Specifically, we should not
assume that the partition will be found in the hash map. This is another case where we had
"Some(x)" but it should be "Option(x)."

Fix a potential null pointer exception in BrokerServer#state.

Reviewers: dengziming <dengziming1993@gmail.com>, Jason Gustafson <jason@confluent.io>
上级 a02c8d33
No related branches found
No related tags found
无相关合并请求
......@@ -81,7 +81,8 @@ class BrokerServer(
val controllerQuorumVotersFuture: CompletableFuture[util.Map[Integer, AddressSpec]]
) extends KafkaBroker {
override def brokerState: BrokerState = lifecycleManager.state
override def brokerState: BrokerState = Option(lifecycleManager).
flatMap(m => Some(m.state)).getOrElse(BrokerState.NOT_RUNNING)
import kafka.server.Server._
......@@ -89,7 +90,7 @@ class BrokerServer(
this.logIdent = logContext.logPrefix
@volatile private var lifecycleManager: BrokerLifecycleManager = null
@volatile var lifecycleManager: BrokerLifecycleManager = null
private val isShuttingDown = new AtomicBoolean(false)
......
......@@ -229,7 +229,7 @@ class KRaftMetadataCache(val brokerId: Int) extends MetadataCache with Logging w
override def getPartitionInfo(topicName: String, partitionId: Int): Option[UpdateMetadataPartitionState] = {
Option(_currentImage.topics().getTopic(topicName)).
flatMap(topic => Some(topic.partitions().get(partitionId))).
flatMap(topic => Option(topic.partitions().get(partitionId))).
flatMap(partition => Some(new UpdateMetadataPartitionState().
setTopicName(topicName).
setPartitionIndex(partitionId).
......
......@@ -17,18 +17,24 @@
package kafka.admin
import java.util.Optional
import java.util.{Collections, Optional}
import kafka.controller.ReplicaAssignment
import kafka.server.BaseRequestTest
import kafka.utils.TestUtils
import kafka.server.{BaseRequestTest, BrokerServer}
import kafka.utils.{TestInfoUtils, TestUtils}
import kafka.utils.TestUtils._
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.clients.admin.{Admin, NewPartitions, NewTopic}
import org.apache.kafka.common.errors.InvalidReplicaAssignmentException
import org.apache.kafka.common.requests.MetadataResponse.TopicMetadata
import org.apache.kafka.common.requests.{MetadataRequest, MetadataResponse}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{BeforeEach, Test, TestInfo}
import org.junit.jupiter.api.{BeforeEach, TestInfo}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
import java.util
import java.util.Arrays.asList
import java.util.Collections.singletonList
import java.util.concurrent.ExecutionException
import scala.jdk.CollectionConverters._
class AddPartitionsTest extends BaseRequestTest {
......@@ -47,44 +53,97 @@ class AddPartitionsTest extends BaseRequestTest {
val topic4Assignment = Map(0 -> ReplicaAssignment(Seq(0,3), List(), List()))
val topic5 = "new-topic5"
val topic5Assignment = Map(1 -> ReplicaAssignment(Seq(0,1), List(), List()))
var admin: Admin = null
@BeforeEach
override def setUp(testInfo: TestInfo): Unit = {
super.setUp(testInfo)
if (isKRaftTest()) {
brokers.foreach(broker => broker.asInstanceOf[BrokerServer].lifecycleManager.initialUnfenceFuture.get())
}
createTopicWithAssignment(topic1, partitionReplicaAssignment = topic1Assignment.map { case (k, v) => k -> v.replicas })
createTopicWithAssignment(topic2, partitionReplicaAssignment = topic2Assignment.map { case (k, v) => k -> v.replicas })
createTopicWithAssignment(topic3, partitionReplicaAssignment = topic3Assignment.map { case (k, v) => k -> v.replicas })
createTopicWithAssignment(topic4, partitionReplicaAssignment = topic4Assignment.map { case (k, v) => k -> v.replicas })
admin = createAdminClient()
}
@Test
def testWrongReplicaCount(): Unit = {
assertThrows(classOf[InvalidReplicaAssignmentException], () => adminZkClient.addPartitions(topic1, topic1Assignment, adminZkClient.getBrokerMetadatas(), 2,
Some(Map(0 -> Seq(0, 1), 1 -> Seq(0, 1, 2)))))
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
def testWrongReplicaCount(quorum: String): Unit = {
assertEquals(classOf[InvalidReplicaAssignmentException], assertThrows(classOf[ExecutionException], () => {
admin.createPartitions(Collections.singletonMap(topic1,
NewPartitions.increaseTo(2, singletonList(asList(0, 1, 2))))).all().get()
}).getCause.getClass)
}
@Test
def testMissingPartition0(): Unit = {
val e = assertThrows(classOf[AdminOperationException], () => adminZkClient.addPartitions(topic5, topic5Assignment, adminZkClient.getBrokerMetadatas(), 2,
Some(Map(1 -> Seq(0, 1), 2 -> Seq(0, 1, 2)))))
assertTrue(e.getMessage.contains("Unexpected existing replica assignment for topic 'new-topic5', partition id 0 is missing"))
/**
* Test that when we supply a manual partition assignment to createTopics, it must be 0-based
* and consecutive.
*/
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
def testMissingPartitionsInCreateTopics(quorum: String): Unit = {
val topic6Placements = new util.HashMap[Integer, util.List[Integer]]
topic6Placements.put(1, asList(0, 1))
topic6Placements.put(2, asList(1, 0))
val topic7Placements = new util.HashMap[Integer, util.List[Integer]]
topic7Placements.put(2, asList(0, 1))
topic7Placements.put(3, asList(1, 0))
val futures = admin.createTopics(asList(
new NewTopic("new-topic6", topic6Placements),
new NewTopic("new-topic7", topic7Placements))).values()
val topic6Cause = assertThrows(classOf[ExecutionException], () => futures.get("new-topic6").get()).getCause
assertEquals(classOf[InvalidReplicaAssignmentException], topic6Cause.getClass)
assertTrue(topic6Cause.getMessage.contains("partitions should be a consecutive 0-based integer sequence"),
"Unexpected error message: " + topic6Cause.getMessage)
val topic7Cause = assertThrows(classOf[ExecutionException], () => futures.get("new-topic7").get()).getCause
assertEquals(classOf[InvalidReplicaAssignmentException], topic7Cause.getClass)
assertTrue(topic7Cause.getMessage.contains("partitions should be a consecutive 0-based integer sequence"),
"Unexpected error message: " + topic7Cause.getMessage)
}
@Test
def testIncrementPartitions(): Unit = {
adminZkClient.addPartitions(topic1, topic1Assignment, adminZkClient.getBrokerMetadatas(), 3)
/**
* Test that when we supply a manual partition assignment to createPartitions, it must contain
* enough partitions.
*/
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
def testMissingPartitionsInCreatePartitions(quorum: String): Unit = {
val cause = assertThrows(classOf[ExecutionException], () =>
admin.createPartitions(Collections.singletonMap(topic1,
NewPartitions.increaseTo(3, singletonList(asList(0, 1, 2))))).all().get()).getCause
assertEquals(classOf[InvalidReplicaAssignmentException], cause.getClass)
if (isKRaftTest()) {
assertTrue(cause.getMessage.contains("Attempted to add 2 additional partition(s), but only 1 assignment(s) " +
"were specified."), "Unexpected error message: " + cause.getMessage)
} else {
assertTrue(cause.getMessage.contains("Increasing the number of partitions by 2 but 1 assignments provided."),
"Unexpected error message: " + cause.getMessage)
}
if (!isKRaftTest()) {
// In ZK mode, test the raw AdminZkClient method as well.
val e = assertThrows(classOf[AdminOperationException], () => adminZkClient.addPartitions(
topic5, topic5Assignment, adminZkClient.getBrokerMetadatas(), 2,
Some(Map(1 -> Seq(0, 1), 2 -> Seq(0, 1, 2)))))
assertTrue(e.getMessage.contains("Unexpected existing replica assignment for topic 'new-topic5', partition " +
"id 0 is missing"))
}
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
def testIncrementPartitions(quorum: String): Unit = {
admin.createPartitions(Collections.singletonMap(topic1, NewPartitions.increaseTo(3))).all().get()
// wait until leader is elected
val leader1 = waitUntilLeaderIsElectedOrChanged(zkClient, topic1, 1)
val leader2 = waitUntilLeaderIsElectedOrChanged(zkClient, topic1, 2)
val leader1FromZk = zkClient.getLeaderForPartition(new TopicPartition(topic1, 1)).get
val leader2FromZk = zkClient.getLeaderForPartition(new TopicPartition(topic1, 2)).get
assertEquals(leader1, leader1FromZk)
assertEquals(leader2, leader2FromZk)
waitUntilLeaderIsElectedOrChangedWithAdmin(admin, topic1, 1)
waitUntilLeaderIsElectedOrChangedWithAdmin(admin, topic1, 2)
// read metadata from a broker and verify the new topic partitions exist
TestUtils.waitForPartitionMetadata(servers, topic1, 1)
TestUtils.waitForPartitionMetadata(servers, topic1, 2)
TestUtils.waitForPartitionMetadata(brokers, topic1, 1)
TestUtils.waitForPartitionMetadata(brokers, topic1, 2)
val response = connectAndReceive[MetadataResponse](
new MetadataRequest.Builder(Seq(topic1).asJava, false).build)
assertEquals(1, response.topicMetadata.size)
......@@ -102,22 +161,21 @@ class AddPartitionsTest extends BaseRequestTest {
}
}
@Test
def testManualAssignmentOfReplicas(): Unit = {
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
def testManualAssignmentOfReplicas(quorum: String): Unit = {
// Add 2 partitions
adminZkClient.addPartitions(topic2, topic2Assignment, adminZkClient.getBrokerMetadatas(), 3,
Some(Map(0 -> Seq(1, 2), 1 -> Seq(0, 1), 2 -> Seq(2, 3))))
admin.createPartitions(Collections.singletonMap(topic2, NewPartitions.increaseTo(3,
asList(asList(0, 1), asList(2, 3))))).all().get()
// wait until leader is elected
val leader1 = waitUntilLeaderIsElectedOrChanged(zkClient, topic2, 1)
val leader2 = waitUntilLeaderIsElectedOrChanged(zkClient, topic2, 2)
val leader1FromZk = zkClient.getLeaderForPartition(new TopicPartition(topic2, 1)).get
val leader2FromZk = zkClient.getLeaderForPartition(new TopicPartition(topic2, 2)).get
assertEquals(leader1, leader1FromZk)
assertEquals(leader2, leader2FromZk)
val leader1 = waitUntilLeaderIsElectedOrChangedWithAdmin(admin, topic2, 1)
val leader2 = waitUntilLeaderIsElectedOrChangedWithAdmin(admin, topic2, 2)
// read metadata from a broker and verify the new topic partitions exist
TestUtils.waitForPartitionMetadata(servers, topic2, 1)
TestUtils.waitForPartitionMetadata(servers, topic2, 2)
val partition1Metadata = TestUtils.waitForPartitionMetadata(brokers, topic2, 1)
assertEquals(leader1, partition1Metadata.leader())
val partition2Metadata = TestUtils.waitForPartitionMetadata(brokers, topic2, 2)
assertEquals(leader2, partition2Metadata.leader())
val response = connectAndReceive[MetadataResponse](
new MetadataRequest.Builder(Seq(topic2).asJava, false).build)
assertEquals(1, response.topicMetadata.size)
......@@ -132,17 +190,18 @@ class AddPartitionsTest extends BaseRequestTest {
assertEquals(Set(0, 1), replicas.asScala.toSet)
}
@Test
def testReplicaPlacementAllServers(): Unit = {
adminZkClient.addPartitions(topic3, topic3Assignment, adminZkClient.getBrokerMetadatas(), 7)
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk")) // TODO: add kraft support
def testReplicaPlacementAllServers(quorum: String): Unit = {
admin.createPartitions(Collections.singletonMap(topic3, NewPartitions.increaseTo(7))).all().get()
// read metadata from a broker and verify the new topic partitions exist
TestUtils.waitForPartitionMetadata(servers, topic3, 1)
TestUtils.waitForPartitionMetadata(servers, topic3, 2)
TestUtils.waitForPartitionMetadata(servers, topic3, 3)
TestUtils.waitForPartitionMetadata(servers, topic3, 4)
TestUtils.waitForPartitionMetadata(servers, topic3, 5)
TestUtils.waitForPartitionMetadata(servers, topic3, 6)
TestUtils.waitForPartitionMetadata(brokers, topic3, 1)
TestUtils.waitForPartitionMetadata(brokers, topic3, 2)
TestUtils.waitForPartitionMetadata(brokers, topic3, 3)
TestUtils.waitForPartitionMetadata(brokers, topic3, 4)
TestUtils.waitForPartitionMetadata(brokers, topic3, 5)
TestUtils.waitForPartitionMetadata(brokers, topic3, 6)
val response = connectAndReceive[MetadataResponse](
new MetadataRequest.Builder(Seq(topic3).asJava, false).build)
......@@ -157,13 +216,14 @@ class AddPartitionsTest extends BaseRequestTest {
validateLeaderAndReplicas(topicMetadata, 6, 0, Set(0, 1, 2, 3))
}
@Test
def testReplicaPlacementPartialServers(): Unit = {
adminZkClient.addPartitions(topic2, topic2Assignment, adminZkClient.getBrokerMetadatas(), 3)
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk")) // TODO: add kraft support
def testReplicaPlacementPartialServers(quorum: String): Unit = {
admin.createPartitions(Collections.singletonMap(topic2, NewPartitions.increaseTo(3))).all().get()
// read metadata from a broker and verify the new topic partitions exist
TestUtils.waitForPartitionMetadata(servers, topic2, 1)
TestUtils.waitForPartitionMetadata(servers, topic2, 2)
TestUtils.waitForPartitionMetadata(brokers, topic2, 1)
TestUtils.waitForPartitionMetadata(brokers, topic2, 2)
val response = connectAndReceive[MetadataResponse](
new MetadataRequest.Builder(Seq(topic2).asJava, false).build)
......
......@@ -666,6 +666,12 @@ public class ReplicationControlManager {
Replicas.toArray(assignment.brokerIds()), Replicas.toArray(isr),
Replicas.NONE, Replicas.NONE, isr.get(0), LeaderRecoveryState.RECOVERED, 0, 0));
}
for (int i = 0; i < newParts.size(); i++) {
if (!newParts.containsKey(i)) {
return new ApiError(Errors.INVALID_REPLICA_ASSIGNMENT,
"partitions should be a consecutive 0-based integer sequence");
}
}
ApiError error = maybeCheckCreateTopicPolicy(() -> {
Map<Integer, List<Integer>> assignments = new HashMap<>();
newParts.entrySet().forEach(e -> assignments.put(e.getKey(),
......@@ -744,7 +750,7 @@ public class ReplicationControlManager {
setIsSensitive(entry.isSensitive()));
}
result.setNumPartitions(newParts.size());
result.setReplicationFactor((short) newParts.get(0).replicas.length);
result.setReplicationFactor((short) newParts.values().iterator().next().replicas.length);
result.setTopicConfigErrorCode(NONE.code());
} else {
result.setTopicConfigErrorCode(TOPIC_AUTHORIZATION_FAILED.code());
......
0% 加载中 .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册