Skip to content
代码片段 群组 项目
未验证 提交 d654bc1b 编辑于 作者: Jason Gustafson's avatar Jason Gustafson 提交者: GitHub
浏览文件

MINOR: Support KRaft in GroupAuthorizerIntegrationTest (#12336)

Support KRaft in `GroupAuthorizerIntegrationTest`. 

Reviewers: David Arthur <mumrah@gmail.com>
上级 a3c7017f
No related branches found
No related tags found
无相关合并请求
......@@ -85,9 +85,8 @@ object AuthorizerIntegrationTest {
class PrincipalBuilder extends DefaultKafkaPrincipalBuilder(null, null) {
override def build(context: AuthenticationContext): KafkaPrincipal = {
context.listenerName match {
case BrokerListenerName => BrokerPrincipal
case BrokerListenerName | ControllerListenerName => BrokerPrincipal
case ClientListenerName => ClientPrincipal
case ControllerListenerName => BrokerPrincipal
case listenerName => throw new IllegalArgumentException(s"No principal mapped to listener $listenerName")
}
}
......@@ -152,32 +151,32 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, group)
override def brokerPropertyOverrides(properties: Properties): Unit = {
properties.put(KafkaConfig.BrokerIdProp, brokerId.toString)
addNodeProperties(properties)
}
override def kraftControllerConfigs(): collection.Seq[Properties] = {
val controllerConfigs = super.kraftControllerConfigs()
controllerConfigs.foreach(addNodeProperties)
controllerConfigs
}
private def addNodeProperties(properties: Properties): Unit = {
if (isKRaftTest()) {
properties.put(KafkaConfig.AuthorizerClassNameProp, classOf[StandardAuthorizer].getName)
properties.put(StandardAuthorizer.SUPER_USERS_CONFIG, BrokerPrincipal.toString())
properties.put(StandardAuthorizer.SUPER_USERS_CONFIG, BrokerPrincipal.toString)
} else {
properties.put(KafkaConfig.AuthorizerClassNameProp, classOf[AclAuthorizer].getName)
}
properties.put(KafkaConfig.BrokerIdProp, brokerId.toString)
properties.put(KafkaConfig.OffsetsTopicPartitionsProp, "1")
properties.put(KafkaConfig.OffsetsTopicReplicationFactorProp, "1")
properties.put(KafkaConfig.TransactionsTopicPartitionsProp, "1")
properties.put(KafkaConfig.TransactionsTopicReplicationFactorProp, "1")
properties.put(KafkaConfig.TransactionsTopicMinISRProp, "1")
properties.put(BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG,
classOf[PrincipalBuilder].getName)
properties.put(BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, classOf[PrincipalBuilder].getName)
}
override def kraftControllerConfigs(): Seq[Properties] = {
val controllerConfigs = Seq(new Properties())
controllerConfigs.foreach { properties =>
properties.put(KafkaConfig.AuthorizerClassNameProp, classOf[StandardAuthorizer].getName())
properties.put(StandardAuthorizer.SUPER_USERS_CONFIG, BrokerPrincipal.toString())
properties.put(BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG,
classOf[PrincipalBuilder].getName)
}
controllerConfigs
}
val requestKeyToError = (topicNames: Map[Uuid, String], version: Short) => Map[ApiKeys, Nothing => Errors](
ApiKeys.METADATA -> ((resp: requests.MetadataResponse) => resp.errors.asScala.find(_._1 == topic).getOrElse(("test", Errors.NONE))._2),
......@@ -2574,14 +2573,6 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
}
}
private def addAndVerifyAcls(acls: Set[AccessControlEntry], resource: ResourcePattern): Unit = {
TestUtils.addAndVerifyAcls(brokers, acls, resource, controllerServers)
}
private def removeAndVerifyAcls(acls: Set[AccessControlEntry], resource: ResourcePattern): Unit = {
TestUtils.removeAndVerifyAcls(brokers, acls, resource, controllerServers)
}
private def consumeRecords(consumer: Consumer[Array[Byte], Array[Byte]],
numRecords: Int = 1,
startingOffset: Int = 0,
......
......@@ -14,12 +14,11 @@ package kafka.api
import java.util.Properties
import java.util.concurrent.ExecutionException
import kafka.api.GroupAuthorizerIntegrationTest._
import kafka.security.authorizer.AclAuthorizer
import kafka.security.authorizer.AclEntry.WildcardHost
import kafka.server.{BaseRequestTest, KafkaConfig}
import kafka.utils.TestUtils
import kafka.utils.{TestInfoUtils, TestUtils}
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.TopicPartition
......@@ -30,8 +29,11 @@ import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.resource.{PatternType, Resource, ResourcePattern, ResourceType}
import org.apache.kafka.common.security.auth.{AuthenticationContext, KafkaPrincipal}
import org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder
import org.apache.kafka.metadata.authorizer.StandardAuthorizer
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{BeforeEach, Test, TestInfo}
import org.junit.jupiter.api.{BeforeEach, TestInfo}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
import scala.jdk.CollectionConverters._
......@@ -41,11 +43,12 @@ object GroupAuthorizerIntegrationTest {
val BrokerListenerName = "BROKER"
val ClientListenerName = "CLIENT"
val ControllerListenerName = "CONTROLLER"
class GroupPrincipalBuilder extends DefaultKafkaPrincipalBuilder(null, null) {
override def build(context: AuthenticationContext): KafkaPrincipal = {
context.listenerName match {
case BrokerListenerName => BrokerPrincipal
case BrokerListenerName | ControllerListenerName => BrokerPrincipal
case ClientListenerName => ClientPrincipal
case listenerName => throw new IllegalArgumentException(s"No principal mapped to listener $listenerName")
}
......@@ -64,9 +67,25 @@ class GroupAuthorizerIntegrationTest extends BaseRequestTest {
def brokerPrincipal: KafkaPrincipal = BrokerPrincipal
def clientPrincipal: KafkaPrincipal = ClientPrincipal
override def kraftControllerConfigs(): collection.Seq[Properties] = {
val controllerConfigs = super.kraftControllerConfigs()
controllerConfigs.foreach(addNodeProperties)
controllerConfigs
}
override def brokerPropertyOverrides(properties: Properties): Unit = {
properties.put(KafkaConfig.AuthorizerClassNameProp, classOf[AclAuthorizer].getName)
properties.put(KafkaConfig.BrokerIdProp, brokerId.toString)
addNodeProperties(properties)
}
private def addNodeProperties(properties: Properties): Unit = {
if (isKRaftTest()) {
properties.put(KafkaConfig.AuthorizerClassNameProp, classOf[StandardAuthorizer].getName)
properties.put(StandardAuthorizer.SUPER_USERS_CONFIG, BrokerPrincipal.toString)
} else {
properties.put(KafkaConfig.AuthorizerClassNameProp, classOf[AclAuthorizer].getName)
}
properties.put(KafkaConfig.OffsetsTopicPartitionsProp, "1")
properties.put(KafkaConfig.OffsetsTopicReplicationFactorProp, "1")
properties.put(KafkaConfig.TransactionsTopicPartitionsProp, "1")
......@@ -80,11 +99,12 @@ class GroupAuthorizerIntegrationTest extends BaseRequestTest {
doSetup(testInfo, createOffsetsTopic = false)
// Allow inter-broker communication
TestUtils.addAndVerifyAcls(brokers,
addAndVerifyAcls(
Set(createAcl(AclOperation.CLUSTER_ACTION, AclPermissionType.ALLOW, principal = BrokerPrincipal)),
new ResourcePattern(ResourceType.CLUSTER, Resource.CLUSTER_NAME, PatternType.LITERAL))
new ResourcePattern(ResourceType.CLUSTER, Resource.CLUSTER_NAME, PatternType.LITERAL)
)
TestUtils.createOffsetsTopic(zkClient, servers)
createOffsetsTopic(interBrokerListenerName)
}
private def createAcl(aclOperation: AclOperation,
......@@ -93,12 +113,13 @@ class GroupAuthorizerIntegrationTest extends BaseRequestTest {
new AccessControlEntry(principal.toString, WildcardHost, aclOperation, aclPermissionType)
}
@Test
def testUnauthorizedProduceAndConsume(): Unit = {
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
def testUnauthorizedProduceAndConsume(quorum: String): Unit = {
val topic = "topic"
val topicPartition = new TopicPartition("topic", 0)
createTopic(topic)
createTopic(topic, listenerName = interBrokerListenerName)
val producer = createProducer()
val produceException = assertThrows(classOf[ExecutionException],
......@@ -113,22 +134,25 @@ class GroupAuthorizerIntegrationTest extends BaseRequestTest {
assertEquals(Set(topic), consumeException.unauthorizedTopics.asScala)
}
@Test
def testAuthorizedProduceAndConsume(): Unit = {
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
def testAuthorizedProduceAndConsume(quorum: String): Unit = {
val topic = "topic"
val topicPartition = new TopicPartition("topic", 0)
createTopic(topic)
createTopic(topic, listenerName = interBrokerListenerName)
TestUtils.addAndVerifyAcls(brokers,
addAndVerifyAcls(
Set(createAcl(AclOperation.WRITE, AclPermissionType.ALLOW)),
new ResourcePattern(ResourceType.TOPIC, topic, PatternType.LITERAL))
new ResourcePattern(ResourceType.TOPIC, topic, PatternType.LITERAL)
)
val producer = createProducer()
producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic, "message".getBytes)).get()
TestUtils.addAndVerifyAcls(brokers,
addAndVerifyAcls(
Set(createAcl(AclOperation.READ, AclPermissionType.ALLOW)),
new ResourcePattern(ResourceType.TOPIC, topic, PatternType.LITERAL))
new ResourcePattern(ResourceType.TOPIC, topic, PatternType.LITERAL)
)
val consumer = createConsumer(configsToRemove = List(ConsumerConfig.GROUP_ID_CONFIG))
consumer.assign(List(topicPartition).asJava)
TestUtils.pollUntilAtLeastNumRecords(consumer, numRecords = 1)
......
......@@ -43,6 +43,7 @@ import org.junit.jupiter.api.{AfterAll, AfterEach, BeforeAll, BeforeEach, Tag, T
import scala.collection.mutable.ListBuffer
import scala.collection.{Seq, immutable}
import scala.compat.java8.OptionConverters._
import scala.jdk.CollectionConverters._
trait QuorumImplementation {
......@@ -123,9 +124,12 @@ abstract class QuorumTestHarness extends Logging {
val bootstrapRecords: ListBuffer[ApiMessageAndVersion] = ListBuffer()
private var testInfo: TestInfo = null
private var implementation: QuorumImplementation = null
def isKRaftTest(): Boolean = implementation.isInstanceOf[KRaftQuorumImplementation]
def isKRaftTest(): Boolean = {
TestInfoUtils.isKRaft(testInfo)
}
def checkIsZKTest(): Unit = {
if (isKRaftTest()) {
......@@ -182,6 +186,7 @@ abstract class QuorumTestHarness extends Logging {
// That way you control the initialization order.
@BeforeEach
def setUp(testInfo: TestInfo): Unit = {
this.testInfo = testInfo
Exit.setExitProcedure((code, message) => {
try {
throw new RuntimeException(s"exit(${code}, ${message}) called!")
......@@ -202,16 +207,14 @@ abstract class QuorumTestHarness extends Logging {
tearDown()
}
})
val name = if (testInfo.getTestMethod().isPresent()) {
testInfo.getTestMethod().get().toString()
} else {
"[unspecified]"
}
val name = testInfo.getTestMethod.asScala
.map(_.toString)
.getOrElse("[unspecified]")
if (TestInfoUtils.isKRaft(testInfo)) {
info(s"Running KRAFT test ${name}")
info(s"Running KRAFT test $name")
implementation = newKRaftQuorum(testInfo)
} else {
info(s"Running ZK test ${name}")
info(s"Running ZK test $name")
implementation = newZooKeeperQuorum()
}
}
......
......@@ -30,8 +30,10 @@ import scala.collection.{Seq, mutable}
import scala.jdk.CollectionConverters._
import java.util.Properties
import kafka.utils.TestUtils.{createAdminClient, resource}
import org.apache.kafka.common.acl.AccessControlEntry
import org.apache.kafka.common.{KafkaException, Uuid}
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.resource.ResourcePattern
import org.apache.kafka.common.security.scram.ScramCredential
import org.apache.kafka.common.utils.Time
import org.apache.kafka.controller.ControllerRequestContextUtil.ANONYMOUS_CONTEXT
......@@ -237,6 +239,14 @@ abstract class KafkaServerTestHarness extends QuorumTestHarness {
}
}
def addAndVerifyAcls(acls: Set[AccessControlEntry], resource: ResourcePattern): Unit = {
TestUtils.addAndVerifyAcls(brokers, acls, resource, controllerServers)
}
def removeAndVerifyAcls(acls: Set[AccessControlEntry], resource: ResourcePattern): Unit = {
TestUtils.removeAndVerifyAcls(brokers, acls, resource, controllerServers)
}
/**
* Pick a broker at random and kill it if it isn't already dead
* Return the id of the broker killed
......
0% 加载中 .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册