Skip to content
代码片段 群组 项目
  1. 8月 10, 2022
    • ConfluentSemaphore's avatar
    • David Mao's avatar
      KCFUN-138: ClientRequestQuotaManager jmh benchmark (#4594) · bce06174
      David Mao 创作于
      What
      Quota manager code has non-insignificant overhead on clusters with high request rate, we should benchmark and fix this~. Profiles of both prod clusters and profiles of local benchmarks show that most of the overhead is in string generation to retrieve sensors.
      
      Approach:
      ThreadUsageSensors
      We currently generate a sensor name to look up/create ThreadUsageSensors when recording values. This adds some amount of overhead in both the request handler thread and the network threads. These sensors are also created with an expiration time, which doesn't make sense because the number of these sensors is fixed and usually small (3 * (1 for request threads + # of listeners). Instead of generating these sensors lazily, we can just keep the thread usage sensors in-sync with the configured listeners.
      
      NetworkThreadCallback
      If we change the ordering in ClientRequestQuotaManager.record() and grab a reference to the recorded sensors, we can avoid performing the expensive sensor name creation in the network thread callback.
      
      Before:
      Benchmark                                                                                  Mode  Cnt     Score     Error   Units
      ClientRequestQuotaManagerBench.testNetworkThreadCallback                                   avgt   15   940.726 ±  15.790   ns/op
      ClientRequestQuotaManagerBench.testNetworkThreadCallback:·async                            avgt            NaN               ---
      ClientRequestQuotaManagerBench.testNetworkThreadCallback:·gc.alloc.rate                    avgt   15  2764.859 ±  75.832  MB/sec
      ClientRequestQuotaManagerBench.testNetworkThreadCallback:·gc.alloc.rate.norm               avgt   15  2880.003 ±   0.010    B/op
      ClientRequestQuotaManagerBench.testNetworkThreadCallback:·gc.count                         avgt   15   997.000            counts
      ClientRequestQuotaManagerBench.testNetworkThreadCallback:·gc.time                          avgt   15  1253.000                ms
      ClientRequestQuotaManagerBench.testRecord                                                  avgt   15  1240.281 ±  36.303   ns/op
      ClientRequestQuotaManagerBench.testRecord:·async                                           avgt            NaN               ---
      ClientRequestQuotaManagerBench.testRecord:·gc.alloc.rate                                   avgt   15  2452.718 ± 104.847  MB/sec
      ClientRequestQuotaManagerBench.testRecord:·gc.alloc.rate.norm                              avgt   15  3368.004 ±   0.014    B/op B/op
      ClientRequestQuotaManagerBench.testRecord:·gc.count                                        avgt   15  1022.000            counts
      ClientRequestQuotaManagerBench.testRecord:·gc.time                                         avgt   15  1224.000                ms
      JMH benchmarks done
      
      After:
      Benchmark                                                                     Mode  Cnt     Score     Error   Units
      ClientRequestQuotaManagerBench.testNetworkThreadCallback                      avgt   15    54.259 ±  12.669   ns/op
      ClientRequestQuotaManagerBench.testNetworkThreadCallback:·async               avgt            NaN               ---
      ClientRequestQuotaManagerBench.testNetworkThreadCallback:·gc.alloc.rate       avgt   15     0.004 ±   0.014  MB/sec
      ClientRequestQuotaManagerBench.testNetworkThreadCallback:·gc.alloc.rate.norm  avgt   15    ≈ 10⁻⁴              B/op
      ClientRequestQuotaManagerBench.testNetworkThreadCallback:·gc.count            avgt   15       ≈ 0            counts
      ClientRequestQuotaManagerBench.testRecord                                     avgt   15   548.622 ±  40.003   ns/op
      ClientRequestQuotaManagerBench.testRecord:·async                              avgt            NaN               ---
      ClientRequestQuotaManagerBench.testRecord:·gc.alloc.rate                      avgt   15  1489.022 ± 106.025  MB/sec
      ClientRequestQuotaManagerBench.testRecord:·gc.alloc.rate.norm                 avgt   15   896.077 ±   0.009    B/op
      ClientRequestQuotaManagerBench.testRecord:·gc.count                           avgt   15   864.000            counts
      ClientRequestQuotaManagerBench.testRecord:·gc.time                            avgt   15   855.000                ms
      JMH benchmarks done
      bce06174
    • ConfluentSemaphore's avatar
    • Eric Wu's avatar
      MINOR: Use milliseconds for topic catalog event timestamps (#7079) · 157c79a0
      Eric Wu 创作于
      Use milliseconds for topic catalog event timestamps, instead of nanoseconds, which cannot be used to reflect system or wall-clock time.
      157c79a0
    • ConfluentSemaphore's avatar
    • Scott Hendricks's avatar
  2. 8月 09, 2022
  3. 8月 08, 2022
  4. 8月 07, 2022
  5. 8月 06, 2022
  6. 8月 05, 2022
    • ConfluentSemaphore's avatar
    • Stanislav Kozlovski's avatar
      Merge commit '265b8815' into june-30-merge · 3566ad97
      Stanislav Kozlovski 创作于
      Make ConfluentV1MetadataResource implement ConnectResource
      3566ad97
    • Stanislav Kozlovski's avatar
      4b068fe5
    • Stanislav Kozlovski's avatar
      5e65dc7a
    • Stanislav Kozlovski's avatar
      fec7371b
    • Stanislav Kozlovski's avatar
      9086577a
    • Stanislav Kozlovski's avatar
    • Stanislav Kozlovski's avatar
      Merge remote-tracking branch 'origin/master' into june-15-ce-kafka-merge · e3d7b344
      Stanislav Kozlovski 创作于
       Conflicts:
      	core/src/test/scala/integration/kafka/admin/ReassignPartitionsIntegrationTest.scala
      	core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
      Pretty simple conflicts
      
      Properly mock KafkaChannelTest proxy protocol unit tests
      
      Fix mocking in SaslServerAuthenticator test
      
      Fix nits
      e3d7b344
    • Stanislav Kozlovski's avatar
      Merge branch 'confluentinc-kafka-june-15' · fd8677fe
      Stanislav Kozlovski 创作于
      Conflicts:
      Unmerged paths:
        (use "git add/rm <file>..." as appropriate to mark resolution)
      	both modified:   build.gradle
      	both modified:   checkstyle/import-control.xml
      	both modified:   checkstyle/suppressions.xml
      	both modified:   clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
      	both modified:   clients/src/main/java/org/apache/kafka/common/internals/Topic.java
      	both modified:   clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
      	both modified:   clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
      	both modified:   clients/src/test/java/org/apache/kafka/common/network/NetworkTestUtils.java
      	both modified:   clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
      	both modified:   clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java
      	both modified:   clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java
      
      	both modified:   core/src/main/scala/kafka/cluster/Partition.scala
      	both modified:   core/src/main/scala/kafka/controller/KafkaController.scala
      	both modified:   core/src/main/scala/kafka/server/AlterPartitionManager.scala
      	both modified:   core/src/main/scala/kafka/server/ControllerApis.scala
      	both modified:   core/src/main/scala/kafka/server/ReplicaManager.scala
      	both modified:   core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala
      	both modified:   core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
      	both modified:   core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
      	both modified:   core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
      	both modified:   core/src/test/scala/unit/kafka/server/AlterPartitionManagerTest.scala
      	both modified:   core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
      	both modified:   core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
              both modified:   core/src/test/scala/unit/kafka/utils/TestUtils.scala
      	both modified:   core/src/main/scala/kafka/server/AbstractFetcherManager.scala
      	both modified:   core/src/main/scala/kafka/server/AbstractFetcherThread.scala
      	added by them:   core/src/main/scala/kafka/server/BrokerBlockingSender.scala
      	both modified:   core/src/main/scala/kafka/server/BrokerFeatures.scala
      	both modified:   core/src/main/scala/kafka/server/KafkaConfig.scala
      	both modified:   core/src/main/scala/kafka/server/KafkaRaftServer.scala
      	both added:      core/src/main/scala/kafka/server/LeaderEndPoint.scala
      	both added:      core/src/main/scala/kafka/server/LocalLeaderEndPoint.scala
      
      	both added:      core/src/main/scala/kafka/server/RemoteLeaderEndPoint.scala
      
      	both modified:   core/src/main/scala/kafka/server/ReplicaAlterLogDirsManager.scala
      	both modified:   core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
      	both modified:   core/src/main/scala/kafka/server/ReplicaFetcherManager.scala
      	both modified:   core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
      	both modified:   core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
      	both modified:   core/src/main/scala/kafka/tools/StorageTool.scala
      	both modified:   core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java
      	both modified:   core/src/test/scala/integration/kafka/admin/ReassignPartitionsIntegrationTest.scala
      	both modified:   core/src/test/scala/integration/kafka/server/MetadataVersionIntegrationTest.scala
      	both modified:   core/src/test/scala/unit/kafka/log/LogTestUtils.scala
      	both modified:   core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
      	deleted by us:   core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
      	both modified:   core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala
      	both modified:   core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
      	both modified:   core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
      	both modified:   core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala
      	both modified:   core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
      	both modified:   core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala
      	both modified:   core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
      	both modified:   core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala
      	both modified:   core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala
      	both modified:   core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
      
      	both modified:   docs/upgrade.html
      	both modified:   gradle.properties
      	both modified:   gradle/dependencies.gradle
      	both modified:   jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
      	both modified:   metadata/src/main/java/org/apache/kafka/controller/BootstrapMetadata.java
      	both modified:   metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
      
      	both modified:   metadata/src/main/java/org/apache/kafka/controller/ControllerRequestContext.java
      	both modified:   metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java
      	both modified:   metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
      	both modified:   metadata/src/main/java/org/apache/kafka/controller/QuorumFeatures.java
      	both modified:   metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
      
      	both modified:   metadata/src/main/java/org/apache/kafka/image/FeaturesImage.java
      	both modified:   metadata/src/main/java/org/apache/kafka/image/MetadataImage.java
      	both modified:   metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java
      	both modified:   metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java
      	both modified:   metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
      	both modified:   metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java
      	both modified:   metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
      	both modified:   server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
      	both modified:   server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java
      	both modified:   tests/kafkatest/services/kafka/kafka.py
      
      --
      	both modified:   connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
      	both modified:   connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java
      	both modified:   connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
      	both modified:   connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
      	both modified:   connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
      	both modified:   connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
      	both modified:   connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConnectorTest.java
      	both modified:   connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
      	both modified:   connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
      	both modified:   connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
      
      As for the connect files, we had a lot of messy conflicts so what was done was rm -rf ./connect/* and copy the folder recursively from the latest ce-kafka/master. The reason is that the big commits causing this (KAFKA-10000) were not skipped as part of the merge, and they will be cherry-picked separately from another PR, so we rely on that. Any other connect commits that were inadvertently deleted through this will be cherry-picked again
      The next three commit messages here are the commits from connect I had to cherry-pick:
      
      KAFKA-13780: Generate OpenAPI file for Connect REST API (#12067)
      
      New gradle task `connect:runtime:genConnectOpenAPIDocs` that generates `connect_rest.yaml` under `docs/generated`.
      This task is executed when `siteDocsTar` runs.
      
      KAFKA-13846: Adding overloaded metricOrElseCreate method (#12121)
      
      Reviewers: David Jacot <djacot@confluent.io>, Justine Olshan <jolshan@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
      
      KAFKA-13846: Use the new addMetricsIfAbsent API (#12287)
      
      Use the newly added function to replace the old addMetric function that may throw illegal argument exceptions.
      
      Although in some cases concurrency should not be possible they do not necessarily remain always true in the future, so it's better to use the new API just to be less error-prone.
      
      Reviewers: Bruno Cadonna <cadonna@apache.org>
      
      And these are the extra commits I did to fix failing tests/conflicts from the merge:
      MINOR: Adapt ReassignPartitionsIntegrationTest to account for static default values of throttles
      
      Import mockitoInline for clients test package and fix proxy selector unit test
      
      Fix SaslServerAuthenticatorTest - make map modifiable (proxy protocol adds to it) and pass down max receive bytes for mechanism being tested
      
      Disable testNoCleanShutdownAfterFailedStartupDueToCorruptLogs due to CPKAFKA-8947 consistent with master
      
      Re-align ReassignPartitionsIntegrationTest with ce-kafka/master
      
      Fix failing BrokerInterceptor unit test due to MockitoInline import
      fd8677fe
    • ConfluentSemaphore's avatar
加载中