Skip to content
代码片段 群组 项目
  1. 8月 23, 2022
  2. 8月 22, 2022
  3. 8月 20, 2022
    • Confluent Jenkins Bot's avatar
    • Confluent Jenkins Bot's avatar
    • dengziming's avatar
      KAFKA-13914: Add command line tool kafka-metadata-quorum.sh (#12469) · 150fd5b0
      dengziming 创作于
       Add `MetadataQuorumCommand` to describe quorum status, I'm trying to use arg4j style command format, currently, we only support one sub-command which is "describe" and we can specify 2 arguments which are --status and --replication.
      
      ```
      # describe quorum status
      kafka-metadata-quorum.sh --bootstrap-server localhost:9092 describe --replication
      
      ReplicaId	LogEndOffset	Lag	LastFetchTimeMs	LastCaughtUpTimeMs	Status  	
      0        	10          	        0  	-1             	        -1                	                 Leader  	
      1        	10          	        0  	-1             	        -1                	                 Follower	
      2        	10          	        0  	-1             	        -1                	                 Follower	
      
      kafka-metadata-quorum.sh --bootstrap-server localhost:9092 describe --status
      ClusterId:                             fMCL8kv1SWm87L_Md-I2hg
      LeaderId:                             3002
      LeaderEpoch:                      2
      HighWatermark:                  10
      MaxFollowerLag:                 0
      MaxFollowerLagTimeMs:   -1
      CurrentVoters:                    [3000,3001,3002]
      CurrentObservers:              [0,1,2]
      
      # specify AdminClient properties
      kafka-metadata-quorum.sh --bootstrap-server localhost:9092 --command-config config.properties describe --status
      ```
      
      Reviewers: Jason Gustafson <jason@confluent.io>
      未验证
      150fd5b0
    • Confluent Jenkins Bot's avatar
    • Confluent Jenkins Bot's avatar
    • Niket's avatar
      KAFKA-13888; Implement `LastFetchTimestamp` and in `LastCaughtUpTimestamp`... · c7f05191
      Niket 创作于
      KAFKA-13888; Implement `LastFetchTimestamp` and  in `LastCaughtUpTimestamp` for DescribeQuorumResponse [KIP-836] (#12508)
      
      This commit implements the newly added fields `LastFetchTimestamp` and `LastCaughtUpTimestamp` for KIP-836: https://cwiki.apache.org/confluence/display/KAFKA/KIP-836:+Addition+of+Information+in+DescribeQuorumResponse+about+Voter+Lag.
      
      Reviewers: Jason Gustafson <jason@confluent.io>
      未验证
      c7f05191
    • Confluent Jenkins Bot's avatar
    • Confluent Jenkins Bot's avatar
    • Jason Gustafson's avatar
      MINOR: Fix unexpected request error in kraft shutdown (#12538) · a724166f
      Jason Gustafson 创作于
      We have been seeing a few exceptions like the following when running integration tests:
      ```
      [2022-08-18 13:02:59,470] ERROR [ControllerApis nodeId=3000] Unexpected error handling request RequestHeader(apiKey=FETCH, apiVersion=13, clientId=raft-client-0, correlationId=7) -- FetchRequestData(clusterId='txpo87ZUSbGSeV2v7H0n_w', replicaId=0, maxWaitMs=500, minBytes=0, maxBytes=8388608, isolationLevel=0, sessionId=0, sessionEpoch=-1, topics=[FetchTopic(topic='__cluster_metadata', topicId=AAAAAAAAAAAAAAAAAAAAAQ, partitions=[FetchPartition(partition=0, currentLeaderEpoch=1, fetchOffset=6, lastFetchedEpoch=1, logStartOffset=-1, partitionMaxBytes=0)])], forgottenTopicsData=[], rackId='') with context RequestContext(header=RequestHeader(apiKey=FETCH, apiVersion=13, clientId=raft-client-0, correlationId=7), connectionId='127.0.0.1:63113-127.0.0.1:63114-0', clientAddress=/127.0.0.1, principal=User:ANONYMOUS, listenerName=ListenerName(CONTROLLER), securityProtocol=PLAINTEXT, clientInformation=ClientInformation(softwareName=apache-kafka-java, softwareVersion=unknown), fromPrivilegedListener=false, principalSerde=Optional[org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder@201038c3]) (kafka.server.ControllerApis:76)
      java.util.concurrent.CompletionException: java.util.NoSuchElementException: key not found: BROKER_NOT_AVAILABLE
      	at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:315)
      	at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:320)
      	at java.base/java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:936)
      	at java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:911)
      	at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
      	at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2147)
      	at org.apache.kafka.raft.KafkaRaftClient.lambda$handleRequest$19(KafkaRaftClient.java:1666)
      	at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863)
      	at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841)
      	at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
      	at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2162)
      	at kafka.raft.TimingWheelExpirationService$TimerTaskCompletableFuture.run(TimingWheelExpirationService.scala:32)
      	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
      	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
      	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
      	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
      	at java.base/java.lang.Thread.run(Thread.java:833)
      Caused by: java.util.NoSuchElementException: key not found: BROKER_NOT_AVAILABLE
      ```
      There are two causes for this error that I found. First, we were not shutting down the timer services in `RaftManager` which are used in the purgatory implementation. This meant that operations remaining in purgatory could be completed even after `RaftManager` was shutdown. Second, the shutdown order in `KafkaClusterTestKit` was problematic. The `RaftManager` instance depends on the `SocketServer` in `ControllerServer`, but it was the latter that was shutdown first. Instead, we should shutdown `RaftManager` first as we do in `KafkaRaftServer`. 
      
      Reviewers: Ismael Juma <ismael@juma.me.uk>
      未验证
      a724166f
    • Guozhang Wang's avatar
      MINOR: Improve KafkaProducer Javadocs (#12537) · 5d32f24c
      Guozhang Wang 创作于
      While reviewing KIP-588 and KIP-691 I went through the exception throwing behavior and wanted to improve the related javadocs a little bit.
      
      Reviewers: John Roesler <vvcephei@apache.org>
      未验证
      5d32f24c
  4. 8月 19, 2022
  5. 8月 18, 2022
    • Matthew de Detrich's avatar
      MINOR: Use underscore for variable initialization in BrokerServer (#12471) · 2ff4c0a3
      Matthew de Detrich 创作于
      In Scala its standard practice to use _ whenever you are initializing variables. In regards to implementation, for object references _ initialization maps to null so there is no change in behaviour.
      
      Reviewers: Mickael Maison <mickael.maison@gmail.com>, Divij Vaidya <diviv@amazon.com>
      未验证
      2ff4c0a3
    • Chris Egerton's avatar
      MINOR: Remove unused ShutdownableThread class and ineffective ThreadedTest class (#12410) · 73e8d5dd
      Chris Egerton 创作于
      Reviewers: Mickael Maison <mickael.maison@gmail.com>, Christo Lolov <christo_lolov@yahoo.com>
      未验证
      73e8d5dd
    • David Jacot's avatar
      MINOR: Log error when storing assignment fails (#12526) · 04fce135
      David Jacot 创作于
      Reviewers: Jason Gustafson <jason@confluent.io>
      未验证
      04fce135
    • Jason Gustafson's avatar
      KAFKA-14167; Completion exceptions should not be translated directly to error codes (#12518) · bc90c29f
      Jason Gustafson 创作于
      There are a few cases in `ControllerApis` where we may see an `ApiException` wrapped as a `CompletionException`. This can happen in `QuorumController.allocateProducerIds` where the returned future is the result of calling `thenApply` on the future passed to the controller. The danger when this happens is that the `CompletionException` gets passed to `Errors.forException`, which translates it to an `UNKNOWN_SERVER_ERROR`. At a minimum, I found that the `AllocateProducerIds` and `UpdateFeatures` APIs were affected by this bug, but it is difficult to root out all cases. 
      
      Interestingly, `DeleteTopics` is not affected by this bug as I originally suspected. This is because we have logic in `ApiError.fromThrowable` to check for both `CompletionException` and `ExecutionException` and to pull out the underlying cause. This patch duplicates this logic from `ApiError.fromThrowable` into `Errors.forException` to be sure that we handle all cases where exceptions are converted to error codes.
      
      Reviewers: David Arthur <mumrah@gmail.com>
      未验证
      bc90c29f
    • Jason Gustafson's avatar
      KAFKA-13940; Return NOT_LEADER_OR_FOLLOWER if DescribeQuorum sent to non-leader (#12517) · e5b865d6
      Jason Gustafson 创作于
      Currently the server will return `INVALID_REQUEST` if a `DescribeQuorum` request is sent to a node that is not the current leader. In addition to being inconsistent with all of the other leader APIs in the raft layer, this error is treated as fatal by both the forwarding manager and the admin client. Instead, we should return `NOT_LEADER_OR_FOLLOWER` as we do with the other APIs. This error is retriable and we can rely on the admin client to retry it after seeing this error.
      
      Reviewers: David Jacot <djacot@confluent.io>
      未验证
      e5b865d6
    • Jason Gustafson's avatar
      HOTFIX: Revert KAFKA-10199 which is causing compilation failures (#12532) · 0243bb98
      Jason Gustafson 创作于
      Compilation is failing after these two commits:
      ```
      > Task :streams:compileJava
      /Users/jgustafson/Projects/kafka/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:852: error: cannot find symbol
                              tasks.addPendingTaskToClose(restoringTask.id());
                                   ^
        symbol:   method addPendingTaskToClose(org.apache.kafka.streams.processor.TaskId)
        location: variable tasks of type org.apache.kafka.streams.processor.internals.Tasks
      1 error
      ```
      
      Also here:
      ```
      
      [2022-08-17T20:58:20.912Z] > Task :streams:compileTestJava
      
      [2022-08-17T20:58:20.912Z] /home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-12530/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:822: error: method setupForRevocation(Set<Task>,Set<Task>) is already defined in class TaskManagerTest
      
      [2022-08-17T20:58:20.912Z]     private TaskManager setupForRevocation(final Set<Task> tasksInStateUpdater,
      ```
       This patch reverts them.
      
      Reviewers: Ismael Juma <ismael@juma.me.uk>
      未验证
      0243bb98
    • Bruno Cadonna's avatar
      KAFKA-10199: Remove tasks from state updater on revocation (#12520) · b47c4d85
      Bruno Cadonna 创作于
      Removes tasks from the state updater when the input partitions of the tasks are revoked during a rebalance.
      
      Reviewers: Guozhang Wang <wangguoz@gmail.com>
      未验证
      b47c4d85
    • Bruno Cadonna's avatar
      KAFKA-10199: Remove tasks from state updater on partition lost (#12521) · 9f20f899
      Bruno Cadonna 创作于
      Removes tasks from the state updater when the input partitions of the tasks are lost during a rebalance.
      
      Reviewers: Guozhang Wang <wangguoz@gmail.com>
      未验证
      9f20f899
    • Janik Dotzel's avatar
      MINOR: Fix invalid link to plugin.path property docs in quickstart (#12523) · ae3f48b6
      Janik Dotzel 创作于
      Reviewers: Chris Egerton <fearthecellos@gmail.com>
      未验证
      ae3f48b6
加载中