Skip to content
代码片段 群组 项目
未验证 提交 1914418c 编辑于 作者: Colin Patrick McCabe's avatar Colin Patrick McCabe 提交者: GitHub
浏览文件

MINOR: Convert some junit tests to kraft (#12443)

Convert ProducerCompressionTest, MirrorMakerIntegrationTest, EdgeCaseRequestTest to kraft.

Make it explicit that ServerShutdownTest#testControllerShutdownDuringSend is ZK-only.

Reviewers: David Arthur <mumrah@gmail.com>
上级 160a6ab4
无相关合并请求
......@@ -17,19 +17,19 @@
package kafka.api.test
import kafka.server.{KafkaConfig, KafkaServer}
import kafka.server.{KafkaBroker, KafkaConfig, QuorumTestHarness}
import kafka.utils.TestUtils
import kafka.server.QuorumTestHarness
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.serialization.ByteArraySerializer
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.{Arguments, MethodSource}
import org.junit.jupiter.params.provider.CsvSource
import java.util.{Collections, Properties}
import scala.jdk.CollectionConverters._
class ProducerCompressionTest extends QuorumTestHarness {
......@@ -37,18 +37,18 @@ class ProducerCompressionTest extends QuorumTestHarness {
private val topic = "topic"
private val numRecords = 2000
private var server: KafkaServer = null
private var broker: KafkaBroker = null
@BeforeEach
override def setUp(testInfo: TestInfo): Unit = {
super.setUp(testInfo)
val props = TestUtils.createBrokerConfig(brokerId, zkConnect)
server = TestUtils.createServer(KafkaConfig.fromProps(props))
val props = TestUtils.createBrokerConfig(brokerId, zkConnectOrNull)
broker = createBroker(new KafkaConfig(props))
}
@AfterEach
override def tearDown(): Unit = {
TestUtils.shutdownServers(Seq(server))
TestUtils.shutdownServers(Seq(broker))
super.tearDown()
}
......@@ -58,11 +58,18 @@ class ProducerCompressionTest extends QuorumTestHarness {
* Compressed messages should be able to sent and consumed correctly
*/
@ParameterizedTest
@MethodSource(Array("parameters"))
def testCompression(compression: String): Unit = {
@CsvSource(value = Array(
"kraft,none",
"kraft,gzip",
"kraft,snappy",
"kraft,lz4",
"kraft,zstd",
"zk,gzip"
))
def testCompression(quorum: String, compression: String): Unit = {
val producerProps = new Properties()
val bootstrapServers = TestUtils.plaintextBootstrapServers(Seq(server))
val bootstrapServers = TestUtils.plaintextBootstrapServers(Seq(broker))
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)
producerProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, compression)
producerProps.put(ProducerConfig.BATCH_SIZE_CONFIG, "66000")
......@@ -72,7 +79,13 @@ class ProducerCompressionTest extends QuorumTestHarness {
try {
// create topic
TestUtils.createTopic(zkClient, topic, 1, 1, List(server))
val admin = TestUtils.createAdminClient(Seq(broker),
ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT))
try {
TestUtils.createTopicWithAdmin(admin, topic, Seq(broker))
} finally {
admin.close()
}
val partition = 0
// prepare the messages
......@@ -103,15 +116,3 @@ class ProducerCompressionTest extends QuorumTestHarness {
}
}
}
object ProducerCompressionTest {
def parameters: java.util.stream.Stream[Arguments] = {
Seq(
Arguments.of("none"),
Arguments.of("gzip"),
Arguments.of("snappy"),
Arguments.of("lz4"),
Arguments.of("zstd")
).asJava.stream()
}
}
......@@ -18,26 +18,27 @@ package kafka.tools
import java.util.Properties
import java.util.concurrent.atomic.AtomicBoolean
import scala.collection.Seq
import kafka.integration.KafkaServerTestHarness
import kafka.server.KafkaConfig
import kafka.tools.MirrorMaker.{ConsumerWrapper, MirrorMakerProducer, NoRecordsException}
import kafka.utils.TestUtils
import kafka.utils.{TestInfoUtils, TestUtils}
import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.TimeoutException
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer}
import org.apache.kafka.common.utils.Exit
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
@deprecated(message = "Use the Connect-based MirrorMaker instead (aka MM2).", since = "3.0")
class MirrorMakerIntegrationTest extends KafkaServerTestHarness {
override def generateConfigs: Seq[KafkaConfig] =
TestUtils.createBrokerConfigs(1, zkConnect).map(KafkaConfig.fromProps(_, new Properties()))
TestUtils.createBrokerConfigs(1, zkConnectOrNull).map(KafkaConfig.fromProps(_, new Properties()))
val exited = new AtomicBoolean(false)
......@@ -57,8 +58,9 @@ class MirrorMakerIntegrationTest extends KafkaServerTestHarness {
}
}
@Test
def testCommitOffsetsThrowTimeoutException(): Unit = {
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
def testCommitOffsetsThrowTimeoutException(quorum: String): Unit = {
val consumerProps = new Properties
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group")
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
......@@ -70,8 +72,9 @@ class MirrorMakerIntegrationTest extends KafkaServerTestHarness {
assertThrows(classOf[TimeoutException], () => mirrorMakerConsumer.commit())
}
@Test
def testCommitOffsetsRemoveNonExistentTopics(): Unit = {
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
def testCommitOffsetsRemoveNonExistentTopics(quorum: String): Unit = {
val consumerProps = new Properties
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group")
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
......@@ -85,8 +88,9 @@ class MirrorMakerIntegrationTest extends KafkaServerTestHarness {
assertTrue(mirrorMakerConsumer.offsets.isEmpty, "Offsets for non-existent topics should be removed")
}
@Test
def testCommaSeparatedRegex(): Unit = {
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
def testCommaSeparatedRegex(quorum: String): Unit = {
val topic = "new-topic"
val msg = "a test message"
......
......@@ -35,19 +35,20 @@ import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.utils.ByteUtils
import org.apache.kafka.common.{TopicPartition, requests}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
import scala.jdk.CollectionConverters._
class EdgeCaseRequestTest extends KafkaServerTestHarness {
def generateConfigs = {
val props = TestUtils.createBrokerConfig(1, zkConnect)
val props = TestUtils.createBrokerConfig(1, zkConnectOrNull)
props.setProperty(KafkaConfig.AutoCreateTopicsEnableProp, "false")
List(KafkaConfig.fromProps(props))
}
private def socketServer = servers.head.socketServer
private def socketServer = brokers.head.socketServer
private def connect(s: SocketServer = socketServer, protocol: SecurityProtocol = SecurityProtocol.PLAINTEXT): Socket = {
new Socket("localhost", s.boundPort(ListenerName.forSecurityProtocol(protocol)))
......@@ -116,8 +117,9 @@ class EdgeCaseRequestTest extends KafkaServerTestHarness {
}
}
@Test
def testProduceRequestWithNullClientId(): Unit = {
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
def testProduceRequestWithNullClientId(quorum: String): Unit = {
val topic = "topic"
val topicPartition = new TopicPartition(topic, 0)
val correlationId = -1
......@@ -161,23 +163,27 @@ class EdgeCaseRequestTest extends KafkaServerTestHarness {
assertEquals(Errors.NONE, Errors.forCode(partitionProduceResponse.errorCode), "There should be no error")
}
@Test
def testHeaderOnlyRequest(): Unit = {
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
def testHeaderOnlyRequest(quorum: String): Unit = {
verifyDisconnect(requestHeaderBytes(ApiKeys.PRODUCE.id, 1))
}
@Test
def testInvalidApiKeyRequest(): Unit = {
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
def testInvalidApiKeyRequest(quorum: String): Unit = {
verifyDisconnect(requestHeaderBytes(-1, 0))
}
@Test
def testInvalidApiVersionRequest(): Unit = {
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
def testInvalidApiVersionRequest(quorum: String): Unit = {
verifyDisconnect(requestHeaderBytes(ApiKeys.PRODUCE.id, -1))
}
@Test
def testMalformedHeaderRequest(): Unit = {
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
def testMalformedHeaderRequest(quorum: String): Unit = {
val serializedBytes = {
// Only send apiKey and apiVersion
val buffer = ByteBuffer.allocate(
......
......@@ -38,7 +38,7 @@ import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.serialization.{IntegerDeserializer, IntegerSerializer, StringDeserializer, StringSerializer}
import org.apache.kafka.common.utils.Time
import org.apache.kafka.metadata.BrokerState
import org.junit.jupiter.api.{BeforeEach, Disabled, Test, TestInfo, Timeout}
import org.junit.jupiter.api.{BeforeEach, Disabled, TestInfo, Timeout}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.function.Executable
import org.junit.jupiter.params.ParameterizedTest
......@@ -251,9 +251,11 @@ class ServerShutdownTest extends KafkaServerTestHarness {
}
// Verify that if controller is in the midst of processing a request, shutdown completes
// without waiting for request timeout.
@Test
def testControllerShutdownDuringSend(): Unit = {
// without waiting for request timeout. Since this involves LeaderAndIsr request, it is
// ZK-only for now.
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk"))
def testControllerShutdownDuringSend(quorum: String): Unit = {
val securityProtocol = SecurityProtocol.PLAINTEXT
val listenerName = ListenerName.forSecurityProtocol(securityProtocol)
......
0% 加载中 .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册