Skip to content
代码片段 群组 项目
未验证 提交 448441a3 编辑于 作者: dengziming's avatar dengziming 提交者: GitHub
浏览文件

KAFKA-13228; Ensure ApiVersionRequest is properly handled KRaft co-resident mode (#11784)

When brokers are co-resident with controllers using kraft, we incorrectly determine the supported API versions on the controller using `NodeApiVersions.create()`. The patch fixes the problem by using the versions from the sent `ApiVersions` request even when connecting to the local node.

The patch also improves integration tests by adding support for co-resident mode.

Reviewers: Justine Olshan <jolshan@confluent.io>, Jason Gustafson <jason@confluent.io>
上级 277c4c2e
No related branches found
No related tags found
无相关合并请求
显示
128 个添加61 个删除
......@@ -78,15 +78,14 @@ object AlterPartitionManager {
config: KafkaConfig,
metadataCache: MetadataCache,
scheduler: KafkaScheduler,
controllerNodeProvider: ControllerNodeProvider,
time: Time,
metrics: Metrics,
threadNamePrefix: Option[String],
brokerEpochSupplier: () => Long,
): AlterPartitionManager = {
val nodeProvider = MetadataCacheControllerNodeProvider(config, metadataCache)
val channelManager = BrokerToControllerChannelManager(
controllerNodeProvider = nodeProvider,
controllerNodeProvider,
time = time,
metrics = metrics,
config = config,
......
......@@ -129,7 +129,7 @@ class BrokerServer(
var forwardingManager: ForwardingManager = null
var alterIsrManager: AlterPartitionManager = null
var alterPartitionManager: AlterPartitionManager = null
var autoTopicCreationManager: AutoTopicCreationManager = null
......@@ -241,24 +241,17 @@ class BrokerServer(
clientQuotaMetadataManager = new ClientQuotaMetadataManager(quotaManagers, socketServer.connectionQuotas)
val alterIsrChannelManager = BrokerToControllerChannelManager(
controllerNodeProvider,
time,
metrics,
alterPartitionManager = AlterPartitionManager(
config,
channelName = "alterIsr",
threadNamePrefix,
retryTimeoutMs = Long.MaxValue
)
alterIsrManager = new DefaultAlterPartitionManager(
controllerChannelManager = alterIsrChannelManager,
metadataCache,
scheduler = kafkaScheduler,
controllerNodeProvider,
time = time,
brokerId = config.nodeId,
brokerEpochSupplier = () => lifecycleManager.brokerEpoch,
metadataVersionSupplier = () => metadataCache.metadataVersion()
metrics,
threadNamePrefix,
brokerEpochSupplier = () => lifecycleManager.brokerEpoch
)
alterIsrManager.start()
alterPartitionManager.start()
this._replicaManager = new ReplicaManager(
config = config,
......@@ -269,7 +262,7 @@ class BrokerServer(
quotaManagers = quotaManagers,
metadataCache = metadataCache,
logDirFailureChannel = logDirFailureChannel,
alterPartitionManager = alterIsrManager,
alterPartitionManager = alterPartitionManager,
brokerTopicStats = brokerTopicStats,
isShuttingDown = isShuttingDown,
zkClient = None,
......@@ -343,10 +336,22 @@ class BrokerServer(
k -> VersionRange.of(v.min, v.max)
}.asJava
lifecycleManager.start(() => metadataListener.highestMetadataOffset,
BrokerToControllerChannelManager(controllerNodeProvider, time, metrics, config,
"heartbeat", threadNamePrefix, config.brokerSessionTimeoutMs.toLong),
metaProps.clusterId, networkListeners, featuresRemapped)
val brokerLifecycleChannelManager = BrokerToControllerChannelManager(
controllerNodeProvider,
time,
metrics,
config,
"heartbeat",
threadNamePrefix,
config.brokerSessionTimeoutMs.toLong
)
lifecycleManager.start(
() => metadataListener.highestMetadataOffset,
brokerLifecycleChannelManager,
metaProps.clusterId,
networkListeners,
featuresRemapped
)
// Register a listener with the Raft layer to receive metadata event notifications
raftManager.register(metadataListener)
......@@ -544,8 +549,8 @@ class BrokerServer(
if (replicaManager != null)
CoreUtils.swallow(replicaManager.shutdown(), this)
if (alterIsrManager != null)
CoreUtils.swallow(alterIsrManager.shutdown(), this)
if (alterPartitionManager != null)
CoreUtils.swallow(alterPartitionManager.shutdown(), this)
if (clientToControllerChannelManager != null)
CoreUtils.swallow(clientToControllerChannelManager.shutdown(), this)
......
......@@ -165,7 +165,6 @@ class BrokerToControllerChannelManagerImpl(
private val logContext = new LogContext(s"[BrokerToControllerChannelManager broker=${config.brokerId} name=$channelName] ")
private val manualMetadataUpdater = new ManualMetadataUpdater()
private val apiVersions = new ApiVersions()
private val currentNodeApiVersions = NodeApiVersions.create()
private val requestThread = newRequestThread
def start(): Unit = {
......@@ -253,10 +252,7 @@ class BrokerToControllerChannelManagerImpl(
def controllerApiVersions(): Option[NodeApiVersions] = {
requestThread.activeControllerAddress().flatMap { activeController =>
if (activeController.id == config.brokerId)
Some(currentNodeApiVersions)
else
Option(apiVersions.get(activeController.idString))
Option(apiVersions.get(activeController.idString))
}
}
}
......
......@@ -1637,6 +1637,10 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
distinctRoles
}
def isKRaftCoResidentMode: Boolean = {
processRoles == Set(BrokerRole, ControllerRole)
}
def metadataLogDir: String = {
Option(getString(KafkaConfig.MetadataLogDirProp)) match {
case Some(dir) => dir
......@@ -2164,7 +2168,7 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
validateControllerQuorumVotersMustContainNodeIdForKRaftController()
validateControllerListenerExistsForKRaftController()
validateControllerListenerNamesMustAppearInListenersForKRaftController()
} else if (processRoles == Set(BrokerRole, ControllerRole)) {
} else if (isKRaftCoResidentMode) {
// KRaft colocated broker and controller
validateNonEmptyQuorumVotersForKRaft()
validateControlPlaneListenerEmptyForKRaft()
......
......@@ -140,7 +140,7 @@ class KafkaServer(
var clientToControllerChannelManager: BrokerToControllerChannelManager = null
var alterIsrManager: AlterPartitionManager = null
var alterPartitionManager: AlterPartitionManager = null
var kafkaScheduler: KafkaScheduler = null
......@@ -263,6 +263,7 @@ class KafkaServer(
logManager.startup(zkClient.getAllTopicsInCluster())
metadataCache = MetadataCache.zkMetadataCache(config.brokerId, config.interBrokerProtocolVersion, brokerFeatures)
val controllerNodeProvider = MetadataCacheControllerNodeProvider(config, metadataCache)
/* initialize feature change listener */
_featureChangeListener = new FinalizedFeatureChangeListener(metadataCache, _zkClient)
......@@ -276,13 +277,14 @@ class KafkaServer(
credentialProvider = new CredentialProvider(ScramMechanism.mechanismNames, tokenCache)
clientToControllerChannelManager = BrokerToControllerChannelManager(
controllerNodeProvider = MetadataCacheControllerNodeProvider(config, metadataCache),
controllerNodeProvider = controllerNodeProvider,
time = time,
metrics = metrics,
config = config,
channelName = "forwarding",
threadNamePrefix = threadNamePrefix,
retryTimeoutMs = config.requestTimeoutMs.longValue)
retryTimeoutMs = config.requestTimeoutMs.longValue
)
clientToControllerChannelManager.start()
/* start forwarding manager */
......@@ -309,11 +311,12 @@ class KafkaServer(
socketServer = new SocketServer(config, metrics, time, credentialProvider, apiVersionManager)
// Start alter partition manager based on the IBP version
alterIsrManager = if (config.interBrokerProtocolVersion.isAlterPartitionSupported) {
alterPartitionManager = if (config.interBrokerProtocolVersion.isAlterPartitionSupported) {
AlterPartitionManager(
config = config,
metadataCache = metadataCache,
scheduler = kafkaScheduler,
controllerNodeProvider,
time = time,
metrics = metrics,
threadNamePrefix = threadNamePrefix,
......@@ -322,7 +325,7 @@ class KafkaServer(
} else {
AlterPartitionManager(kafkaScheduler, time, zkClient)
}
alterIsrManager.start()
alterPartitionManager.start()
// Start replica manager
_replicaManager = createReplicaManager(isShuttingDown)
......@@ -478,7 +481,7 @@ class KafkaServer(
quotaManagers = quotaManagers,
metadataCache = metadataCache,
logDirFailureChannel = logDirFailureChannel,
alterPartitionManager = alterIsrManager,
alterPartitionManager = alterPartitionManager,
brokerTopicStats = brokerTopicStats,
isShuttingDown = isShuttingDown,
zkClient = Some(zkClient),
......@@ -755,8 +758,8 @@ class KafkaServer(
if (replicaManager != null)
CoreUtils.swallow(replicaManager.shutdown(), this)
if (alterIsrManager != null)
CoreUtils.swallow(alterIsrManager.shutdown(), this)
if (alterPartitionManager != null)
CoreUtils.swallow(alterPartitionManager.shutdown(), this)
if (clientToControllerChannelManager != null)
CoreUtils.swallow(clientToControllerChannelManager.shutdown(), this)
......
......@@ -90,6 +90,10 @@ public class ClusterTestExtensionsTest {
@ClusterTest(name = "cluster-tests-2", clusterType = Type.KRAFT, serverProperties = {
@ClusterConfigProperty(key = "foo", value = "baz"),
@ClusterConfigProperty(key = "spam", value = "eggz")
}),
@ClusterTest(name = "cluster-tests-3", clusterType = Type.CO_KRAFT, serverProperties = {
@ClusterConfigProperty(key = "foo", value = "baz"),
@ClusterConfigProperty(key = "spam", value = "eggz")
})
})
public void testClusterTests() {
......
......@@ -31,7 +31,13 @@ public enum Type {
KRAFT {
@Override
public void invocationContexts(ClusterConfig config, Consumer<TestTemplateInvocationContext> invocationConsumer) {
invocationConsumer.accept(new RaftClusterInvocationContext(config.copyOf()));
invocationConsumer.accept(new RaftClusterInvocationContext(config.copyOf(), false));
}
},
CO_KRAFT {
@Override
public void invocationContexts(ClusterConfig config, Consumer<TestTemplateInvocationContext> invocationConsumer) {
invocationConsumer.accept(new RaftClusterInvocationContext(config.copyOf(), true));
}
},
ZK {
......@@ -40,10 +46,11 @@ public enum Type {
invocationConsumer.accept(new ZkClusterInvocationContext(config.copyOf()));
}
},
BOTH {
ALL {
@Override
public void invocationContexts(ClusterConfig config, Consumer<TestTemplateInvocationContext> invocationConsumer) {
invocationConsumer.accept(new RaftClusterInvocationContext(config.copyOf()));
invocationConsumer.accept(new RaftClusterInvocationContext(config.copyOf(), false));
invocationConsumer.accept(new RaftClusterInvocationContext(config.copyOf(), true));
invocationConsumer.accept(new ZkClusterInvocationContext(config.copyOf()));
}
},
......
......@@ -65,18 +65,20 @@ public class RaftClusterInvocationContext implements TestTemplateInvocationConte
private final ClusterConfig clusterConfig;
private final AtomicReference<KafkaClusterTestKit> clusterReference;
private final boolean isCoResident;
public RaftClusterInvocationContext(ClusterConfig clusterConfig) {
public RaftClusterInvocationContext(ClusterConfig clusterConfig, boolean isCoResident) {
this.clusterConfig = clusterConfig;
this.clusterReference = new AtomicReference<>();
this.isCoResident = isCoResident;
}
@Override
public String getDisplayName(int invocationIndex) {
String clusterDesc = clusterConfig.nameTags().entrySet().stream()
.map(Object::toString)
.collect(Collectors.joining(", "));
return String.format("[%d] Type=Raft, %s", invocationIndex, clusterDesc);
.map(Object::toString)
.collect(Collectors.joining(", "));
return String.format("[%d] Type=Raft-%s, %s", invocationIndex, isCoResident ? "CoReside" : "Distributed", clusterDesc);
}
@Override
......@@ -86,6 +88,7 @@ public class RaftClusterInvocationContext implements TestTemplateInvocationConte
(BeforeTestExecutionCallback) context -> {
TestKitNodes nodes = new TestKitNodes.Builder().
setBootstrapMetadataVersion(clusterConfig.metadataVersion()).
setCoResident(isCoResident).
setNumBrokerNodes(clusterConfig.numBrokers()).
setNumControllerNodes(clusterConfig.numControllers()).build();
nodes.brokerNodes().forEach((brokerId, brokerNode) -> {
......
......@@ -150,15 +150,16 @@ public class KafkaClusterTestKit implements AutoCloseable {
ThreadUtils.createThreadFactory("KafkaClusterTestKit%d", false));
for (ControllerNode node : nodes.controllerNodes().values()) {
Map<String, String> props = new HashMap<>(configProps);
props.put(KafkaConfig$.MODULE$.ProcessRolesProp(), "controller");
props.put(KafkaConfig$.MODULE$.ProcessRolesProp(), roles(node.id()));
props.put(KafkaConfig$.MODULE$.NodeIdProp(),
Integer.toString(node.id()));
props.put(KafkaConfig$.MODULE$.MetadataLogDirProp(),
node.metadataDirectory());
props.put(KafkaConfig$.MODULE$.ListenerSecurityProtocolMapProp(),
"CONTROLLER:PLAINTEXT");
props.put(KafkaConfig$.MODULE$.ListenersProp(),
"CONTROLLER://localhost:0");
"EXTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT");
props.put(KafkaConfig$.MODULE$.ListenersProp(), listeners(node.id()));
props.put(KafkaConfig$.MODULE$.InterBrokerListenerNameProp(),
nodes.interBrokerListenerName().value());
props.put(KafkaConfig$.MODULE$.ControllerListenerNamesProp(),
"CONTROLLER");
// Note: we can't accurately set controller.quorum.voters yet, since we don't
......@@ -203,7 +204,7 @@ public class KafkaClusterTestKit implements AutoCloseable {
}
for (BrokerNode node : nodes.brokerNodes().values()) {
Map<String, String> props = new HashMap<>(configProps);
props.put(KafkaConfig$.MODULE$.ProcessRolesProp(), "broker");
props.put(KafkaConfig$.MODULE$.ProcessRolesProp(), roles(node.id()));
props.put(KafkaConfig$.MODULE$.BrokerIdProp(),
Integer.toString(node.id()));
props.put(KafkaConfig$.MODULE$.MetadataLogDirProp(),
......@@ -212,8 +213,7 @@ public class KafkaClusterTestKit implements AutoCloseable {
String.join(",", node.logDataDirectories()));
props.put(KafkaConfig$.MODULE$.ListenerSecurityProtocolMapProp(),
"EXTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT");
props.put(KafkaConfig$.MODULE$.ListenersProp(),
"EXTERNAL://localhost:0");
props.put(KafkaConfig$.MODULE$.ListenersProp(), listeners(node.id()));
props.put(KafkaConfig$.MODULE$.InterBrokerListenerNameProp(),
nodes.interBrokerListenerName().value());
props.put(KafkaConfig$.MODULE$.ControllerListenerNamesProp(),
......@@ -231,9 +231,15 @@ public class KafkaClusterTestKit implements AutoCloseable {
String threadNamePrefix = String.format("broker%d_", node.id());
MetaProperties metaProperties = MetaProperties.apply(nodes.clusterId().toString(), node.id());
TopicPartition metadataPartition = new TopicPartition(KafkaRaftServer.MetadataTopic(), 0);
KafkaRaftManager<ApiMessageAndVersion> raftManager = new KafkaRaftManager<>(
KafkaRaftManager<ApiMessageAndVersion> raftManager;
if (raftManagers.containsKey(node.id())) {
raftManager = raftManagers.get(node.id());
} else {
raftManager = new KafkaRaftManager<>(
metaProperties, config, new MetadataRecordSerde(), metadataPartition, KafkaRaftServer.MetadataTopicId(),
Time.SYSTEM, new Metrics(), Option.apply(threadNamePrefix), connectFutureManager.future);
raftManagers.put(node.id(), raftManager);
}
BrokerServer broker = new BrokerServer(
config,
nodes.brokerProperties(node.id()),
......@@ -245,7 +251,6 @@ public class KafkaClusterTestKit implements AutoCloseable {
connectFutureManager.future
);
brokers.put(node.id(), broker);
raftManagers.put(node.id(), raftManager);
}
} catch (Exception e) {
if (executorService != null) {
......@@ -271,6 +276,26 @@ public class KafkaClusterTestKit implements AutoCloseable {
brokers, raftManagers, connectFutureManager, baseDirectory);
}
private String listeners(int node) {
if (nodes.isCoResidentNode(node)) {
return "EXTERNAL://localhost:0,CONTROLLER://localhost:0";
}
if (nodes.controllerNodes().containsKey(node)) {
return "CONTROLLER://localhost:0";
}
return "EXTERNAL://localhost:0";
}
private String roles(int node) {
if (nodes.isCoResidentNode(node)) {
return "broker,controller";
}
if (nodes.controllerNodes().containsKey(node)) {
return "controller";
}
return "broker";
}
static private void setupNodeDirectories(File baseDirectory,
String metadataDirectory,
Collection<String> logDataDirectories) throws Exception {
......
......@@ -33,6 +33,7 @@ import java.util.TreeMap;
public class TestKitNodes {
public static class Builder {
private boolean coResident = false;
private Uuid clusterId = null;
private MetadataVersion bootstrapMetadataVersion = null;
private final NavigableMap<Integer, ControllerNode> controllerNodes = new TreeMap<>();
......@@ -48,6 +49,11 @@ public class TestKitNodes {
return this;
}
public Builder setCoResident(boolean coResident) {
this.coResident = coResident;
return this;
}
public Builder addNodes(TestKitNode[] nodes) {
for (TestKitNode node : nodes) {
addNode(node);
......@@ -78,7 +84,7 @@ public class TestKitNodes {
controllerNodes.pollFirstEntry();
}
while (controllerNodes.size() < numControllerNodes) {
int nextId = 3000;
int nextId = startControllerId();
if (!controllerNodes.isEmpty()) {
nextId = controllerNodes.lastKey() + 1;
}
......@@ -96,7 +102,7 @@ public class TestKitNodes {
brokerNodes.pollFirstEntry();
}
while (brokerNodes.size() < numBrokerNodes) {
int nextId = 0;
int nextId = startBrokerId();
if (!brokerNodes.isEmpty()) {
nextId = brokerNodes.lastKey() + 1;
}
......@@ -115,6 +121,17 @@ public class TestKitNodes {
}
return new TestKitNodes(clusterId, bootstrapMetadataVersion, controllerNodes, brokerNodes);
}
private int startBrokerId() {
return 0;
}
private int startControllerId() {
if (coResident) {
return startBrokerId();
}
return startBrokerId() + 3000;
}
}
private final Uuid clusterId;
......@@ -122,6 +139,10 @@ public class TestKitNodes {
private final NavigableMap<Integer, ControllerNode> controllerNodes;
private final NavigableMap<Integer, BrokerNode> brokerNodes;
public boolean isCoResidentNode(int node) {
return controllerNodes.containsKey(node) && brokerNodes.containsKey(node);
}
private TestKitNodes(Uuid clusterId,
MetadataVersion bootstrapMetadataVersion,
NavigableMap<Integer, ControllerNode> controllerNodes,
......
......@@ -35,7 +35,7 @@ import org.junit.jupiter.api.extension.ExtendWith
import org.junit.jupiter.api.{BeforeEach, Tag}
@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
@ClusterTestDefaults(clusterType = Type.BOTH, brokers = 3)
@ClusterTestDefaults(clusterType = Type.ALL, brokers = 3)
@Tag("integration")
final class LeaderElectionCommandTest(cluster: ClusterInstance) {
import LeaderElectionCommandTest._
......
......@@ -29,7 +29,7 @@ import org.junit.jupiter.api.extension.ExtendWith
@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
@ClusterTestDefaults(clusterType = Type.BOTH, brokers = 1)
@ClusterTestDefaults(clusterType = Type.ALL, brokers = 1)
class ApiVersionsRequestTest(cluster: ClusterInstance) extends AbstractApiVersionsRequestTest(cluster) {
@BeforeEach
......
......@@ -28,7 +28,7 @@ import org.junit.jupiter.api.extension.ExtendWith
import scala.jdk.CollectionConverters._
@ClusterTestDefaults(clusterType = Type.BOTH)
@ClusterTestDefaults(clusterType = Type.ALL)
@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
class BrokerMetricNamesTest(cluster: ClusterInstance) {
@AfterEach
......
......@@ -37,7 +37,7 @@ import org.junit.jupiter.api.extension.ExtendWith
import scala.jdk.CollectionConverters._
@ClusterTestDefaults(clusterType = Type.BOTH)
@ClusterTestDefaults(clusterType = Type.ALL)
@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
@Tag("integration")
class ClientQuotasRequestTest(cluster: ClusterInstance) {
......
0% 加载中 .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册