Skip to content
代码片段 群组 项目
未验证 提交 add7cd85 编辑于 作者: David Arthur's avatar David Arthur 提交者: GitHub
浏览文件

KAFKA-14136 Generate ConfigRecord for brokers even if the value is unchanged (#12483)

上级 5ceaa588
No related branches found
No related tags found
无相关合并请求
......@@ -67,6 +67,7 @@ import org.junit.jupiter.api.{AfterEach, BeforeEach, Disabled, Test, TestInfo}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
import java.util.concurrent.atomic.AtomicInteger
import scala.annotation.nowarn
import scala.collection._
import scala.collection.mutable.ArrayBuffer
......@@ -352,8 +353,9 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
}
}
@Test // TODO KAFKA-14126 add KRaft support
def testKeyStoreAlter(): Unit = {
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
def testKeyStoreAlter(quorum: String): Unit = {
val topic2 = "testtopic2"
TestUtils.createTopicWithAdmin(adminClients.head, topic2, servers, numPartitions, replicationFactor = numServers)
......@@ -421,8 +423,9 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
stopAndVerifyProduceConsume(producerThread, consumerThread)
}
@Test // TODO KAFKA-14126 add KRaft support
def testTrustStoreAlter(): Unit = {
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
def testTrustStoreAlter(quorum: String): Unit = {
val producerBuilder = ProducerBuilder().listenerName(SecureInternal).securityProtocol(SecurityProtocol.SSL)
// Producer with new keystore should fail to connect before truststore update
......@@ -469,9 +472,12 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
assertFalse(response.wasDisconnected(), "Request failed because broker is not available")
}
val group_id = new AtomicInteger(1)
def next_group_name(): String = s"alter-truststore-${group_id.getAndIncrement()}"
// Produce/consume should work with old as well as new client keystore
verifySslProduceConsume(sslProperties1, "alter-truststore-1")
verifySslProduceConsume(sslProperties2, "alter-truststore-2")
verifySslProduceConsume(sslProperties1, next_group_name())
verifySslProduceConsume(sslProperties2, next_group_name())
// Revert to old truststore with only one certificate and update. Clients should connect only with old keystore.
val oldTruststoreProps = new Properties
......@@ -480,7 +486,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
reconfigureServers(oldTruststoreProps, perBrokerConfig = true,
(s"$prefix$SSL_TRUSTSTORE_LOCATION_CONFIG", sslProperties1.getProperty(SSL_TRUSTSTORE_LOCATION_CONFIG)))
verifyAuthenticationFailure(producerBuilder.keyStoreProps(sslProperties2).build())
verifySslProduceConsume(sslProperties1, "alter-truststore-3")
verifySslProduceConsume(sslProperties1, next_group_name())
// Update same truststore file to contain both certificates without changing any configs.
// Clients should connect successfully with either keystore after admin client AlterConfigsRequest completes.
......@@ -488,8 +494,14 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
Paths.get(sslProperties1.getProperty(SSL_TRUSTSTORE_LOCATION_CONFIG)),
StandardCopyOption.REPLACE_EXISTING)
TestUtils.incrementalAlterConfigs(servers, adminClients.head, oldTruststoreProps, perBrokerConfig = true).all.get()
verifySslProduceConsume(sslProperties1, "alter-truststore-4")
verifySslProduceConsume(sslProperties2, "alter-truststore-5")
TestUtils.retry(30000) {
try {
verifySslProduceConsume(sslProperties1, next_group_name())
verifySslProduceConsume(sslProperties2, next_group_name())
} catch {
case t: Throwable => throw new AssertionError(t)
}
}
// Update internal keystore/truststore and validate new client connections from broker (e.g. controller).
// Alter internal keystore from `sslProperties1` to `sslProperties2`, force disconnect of a controller connection
......@@ -497,21 +509,23 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
val props2 = securityProps(sslProperties2, KEYSTORE_PROPS, prefix)
props2 ++= securityProps(combinedStoreProps, TRUSTSTORE_PROPS, prefix)
TestUtils.incrementalAlterConfigs(servers, adminClients.head, props2, perBrokerConfig = true).all.get(15, TimeUnit.SECONDS)
verifySslProduceConsume(sslProperties2, "alter-truststore-6")
verifySslProduceConsume(sslProperties2, next_group_name())
props2 ++= securityProps(sslProperties2, TRUSTSTORE_PROPS, prefix)
TestUtils.incrementalAlterConfigs(servers, adminClients.head, props2, perBrokerConfig = true).all.get(15, TimeUnit.SECONDS)
verifySslProduceConsume(sslProperties2, "alter-truststore-7")
verifySslProduceConsume(sslProperties2, next_group_name())
waitForAuthenticationFailure(producerBuilder.keyStoreProps(sslProperties1))
val controller = servers.find(_.config.brokerId == TestUtils.waitUntilControllerElected(zkClient)).get.asInstanceOf[KafkaServer]
val controllerChannelManager = controller.kafkaController.controllerChannelManager
val brokerStateInfo: mutable.HashMap[Int, ControllerBrokerStateInfo] =
JTestUtils.fieldValue(controllerChannelManager, classOf[ControllerChannelManager], "brokerStateInfo")
brokerStateInfo(0).networkClient.disconnect("0")
TestUtils.createTopic(zkClient, "testtopic2", numPartitions, replicationFactor = numServers, servers)
// validate that the brokerToController request works fine
verifyBrokerToControllerCall(controller)
if (!isKRaftTest()) {
val controller = servers.find(_.config.brokerId == TestUtils.waitUntilControllerElected(zkClient)).get.asInstanceOf[KafkaServer]
val controllerChannelManager = controller.kafkaController.controllerChannelManager
val brokerStateInfo: mutable.HashMap[Int, ControllerBrokerStateInfo] =
JTestUtils.fieldValue(controllerChannelManager, classOf[ControllerChannelManager], "brokerStateInfo")
brokerStateInfo(0).networkClient.disconnect("0")
TestUtils.createTopic(zkClient, "testtopic2", numPartitions, replicationFactor = numServers, servers)
// validate that the brokerToController request works fine
verifyBrokerToControllerCall(controller)
}
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
......
......@@ -22,6 +22,7 @@ import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.ConfigResource.Type;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.config.types.Password;
import org.apache.kafka.common.metadata.ConfigRecord;
import org.apache.kafka.common.requests.ApiError;
import org.apache.kafka.common.utils.LogContext;
......@@ -228,7 +229,8 @@ public class ConfigurationControlManager {
newValue = String.join(",", oldValueList);
break;
}
if (!Objects.equals(currentValue, newValue)) {
if (!Objects.equals(currentValue, newValue) || configResource.type().equals(Type.BROKER)) {
// KAFKA-14136 We need to generate records even if the value is unchanged to trigger reloads on the brokers
newRecords.add(new ApiMessageAndVersion(new ConfigRecord().
setResourceType(configResource.type().id()).
setResourceName(configResource.name()).
......@@ -317,7 +319,8 @@ public class ConfigurationControlManager {
String key = entry.getKey();
String newValue = entry.getValue();
String currentValue = currentConfigs.get(key);
if (!Objects.equals(newValue, currentValue)) {
if (!Objects.equals(currentValue, newValue) || configResource.type().equals(Type.BROKER)) {
// KAFKA-14136 We need to generate records even if the value is unchanged to trigger reloads on the brokers
newRecords.add(new ApiMessageAndVersion(new ConfigRecord().
setResourceType(configResource.type().id()).
setResourceName(configResource.name()).
......@@ -381,7 +384,11 @@ public class ConfigurationControlManager {
if (configs.isEmpty()) {
configData.remove(configResource);
}
log.info("{}: set configuration {} to {}", configResource, record.name(), record.value());
if (configSchema.isSensitive(record)) {
log.info("{}: set configuration {} to {}", configResource, record.name(), Password.HIDDEN);
} else {
log.info("{}: set configuration {} to {}", configResource, record.name(), record.value());
}
}
// VisibleForTesting
......
0% 加载中 .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册