Skip to content
代码片段 群组 项目
未验证 提交 95e788de 编辑于 作者: Raman Verma's avatar Raman Verma 提交者: GitHub
浏览文件

KSTORAGE-724 Have TierTasks writing to online tier partitions make progress...

KSTORAGE-724 Have TierTasks writing to online tier partitions make progress when few tier partitions are offline (#2374)

When one or more tier topic partitions are offline, TierArchiver and TierDeletionManager
reading / writing to these partitions will not be able to make progress. The corresponding transition
methods will get stuck until these tier topic partitions come back online.
The way TierTasks are scheduled currently, the scheduler will not call TierTasks#transition on any
other TierTasks till the stuck tasks make progress. This commit removes the limit on the number of
tasks that are scheduled at any given time from numThreads to MaxInt
Hence the thread pool will make progress on any TierTask that is not reading / writing to the offline
tier topic partitions.

Note that this will reduce the effectiveness of the existing archive "min lag" strategy https://confluentinc.atlassian.net/wiki/spaces/KSTORAGE/pages/878117031/One+Pager+Tier+Archiver+Strategy

, which attempts to maintain 0 lag on as many partitions as possible, even if it means effectively giving up on others. Given how severe it is for some partitions to stop making progress, we have deemed this tradeoff to be worthwhile.

Co-authored-by: default avatarLucas Bradstreet <lucas@confluent.io>
上级 255e0713
无相关合并请求
...@@ -142,7 +142,7 @@ public class TierTopicConsumer implements Runnable { ...@@ -142,7 +142,7 @@ public class TierTopicConsumer implements Runnable {
"TierTopicConsumer", "TierTopicConsumer",
"Number of metadata listeners awaiting materialization.", "Number of metadata listeners awaiting materialization.",
new HashMap<>()); new HashMap<>());
private final MetricName maxListeningMsMetricName = new MetricName("MaxListeningMs", final MetricName maxListeningMsMetricName = new MetricName("MaxListeningMs",
"TierTopicConsumer", "TierTopicConsumer",
"The time that the oldest metadata listener has been waiting in milliseconds.", "The time that the oldest metadata listener has been waiting in milliseconds.",
new HashMap<>()); new HashMap<>());
......
...@@ -73,9 +73,9 @@ class TierTasks(config: TierTasksConfig, ...@@ -73,9 +73,9 @@ class TierTasks(config: TierTasksConfig,
private implicit val pool: ExecutionContextExecutor = ExecutionContext.fromExecutor(executor) private implicit val pool: ExecutionContextExecutor = ExecutionContext.fromExecutor(executor)
private val tierArchiver = new TierArchiver(config, replicaManager, tierTopicAppender, tierObjectStore, private val tierArchiver = new TierArchiver(config, replicaManager, tierTopicAppender, tierObjectStore,
ctx.subContext(), maxTasks = config.numThreads, time) ctx.subContext(), maxTasks = Int.MaxValue, time)
private val tierDeletionManager = new TierDeletionManager(replicaManager, tierTopicAppender, tierObjectStore, private val tierDeletionManager = new TierDeletionManager(replicaManager, tierTopicAppender, tierObjectStore,
ctx.subContext(), maxTasks = config.numThreads, config, time) ctx.subContext(), maxTasks = Int.MaxValue, config, time)
private val changeManager = new ChangeManager(ctx.subContext(), private val changeManager = new ChangeManager(ctx.subContext(),
Seq(tierArchiver.taskQueue, tierDeletionManager.taskQueue), Seq(tierArchiver.taskQueue, tierDeletionManager.taskQueue),
...@@ -152,20 +152,13 @@ class TierTasks(config: TierTasksConfig, ...@@ -152,20 +152,13 @@ class TierTasks(config: TierTasksConfig,
// Task queues are not empty but we aren't processing any futures yet; backoff and retry // Task queues are not empty but we aren't processing any futures yet; backoff and retry
Thread.sleep(config.mainLoopBackoffMs) Thread.sleep(config.mainLoopBackoffMs)
} else { } else {
if (futures.size >= config.numThreads) { // Wait for the update interval and go back early to see if we have more work to do
// We are processing as many futures to saturate the thread pool; wait until at least one of the futures is debug("Waiting up to " + config.updateIntervalMs.milliseconds + " ms " +
// completed "to complete at least one from the " + futures.size + " futures")
debug("working set is full, blocking until a task completes") try {
Await.ready(Future.firstCompletedOf(futures), Int.MaxValue.seconds) Await.ready(Future.firstCompletedOf(futures), config.updateIntervalMs.milliseconds)
} else { } catch {
// We are processing futures but our thread pool has not been saturated; wait for the update interval and go case _: TimeoutException =>
// back early to see if we have more work to do
debug("working set is not full, attempting to complete at least one future")
try {
Await.ready(Future.firstCompletedOf(futures), config.updateIntervalMs.milliseconds)
} catch {
case _: TimeoutException =>
}
} }
} }
} }
......
...@@ -142,7 +142,7 @@ final class ArchiveTask(override val ctx: CancellationContext, ...@@ -142,7 +142,7 @@ final class ArchiveTask(override val ctx: CancellationContext,
state match { state match {
case s: BeforeLeader => ArchiveTask.establishLeadership(s, topicIdPartition, tierTopicAppender, brokerId) case s: BeforeLeader => ArchiveTask.establishLeadership(s, topicIdPartition, tierTopicAppender, brokerId)
case s: BeforeUpload => ArchiveTask.maybeInitiateUpload(s, topicIdPartition, time, tierTopicAppender, tierObjectStore, replicaManager) case s: BeforeUpload => ArchiveTask.maybeInitiateUpload(s, topicIdPartition, time, tierTopicAppender, tierObjectStore, replicaManager)
case s: Upload => ArchiveTask.upload(s, topicIdPartition, time, tierObjectStore) case s: Upload => ArchiveTask.upload(s, topicIdPartition, time, ctx, tierObjectStore)
case s: AfterUpload => ArchiveTask.finalizeUpload(s, topicIdPartition, time, tierTopicAppender, archiverMetrics.byteRateOpt) case s: AfterUpload => ArchiveTask.finalizeUpload(s, topicIdPartition, time, tierTopicAppender, archiverMetrics.byteRateOpt)
case s: FailedState => ArchiveTask.checkFailedState(s, topicIdPartition, replicaManager) case s: FailedState => ArchiveTask.checkFailedState(s, topicIdPartition, replicaManager)
} }
...@@ -337,6 +337,7 @@ object ArchiveTask extends Logging { ...@@ -337,6 +337,7 @@ object ArchiveTask extends Logging {
private[archive] def upload(state: Upload, private[archive] def upload(state: Upload,
topicIdPartition: TopicIdPartition, topicIdPartition: TopicIdPartition,
time: Time, time: Time,
ctx: CancellationContext,
tierObjectStore: TierObjectStore) tierObjectStore: TierObjectStore)
(implicit ec: ExecutionContext): Future[AfterUpload] = { (implicit ec: ExecutionContext): Future[AfterUpload] = {
Future { Future {
...@@ -352,6 +353,9 @@ object ArchiveTask extends Logging { ...@@ -352,6 +353,9 @@ object ArchiveTask extends Logging {
uploadableSegment.leaderEpochStateOpt.isDefined) uploadableSegment.leaderEpochStateOpt.isDefined)
blocking { blocking {
if (ctx.isCancelled)
throw new TierArchiverFencedException(topicIdPartition)
val startTimeMs = time.milliseconds val startTimeMs = time.milliseconds
try { try {
......
...@@ -18,7 +18,7 @@ import kafka.metrics.KafkaMetricsGroup ...@@ -18,7 +18,7 @@ import kafka.metrics.KafkaMetricsGroup
import kafka.server.ReplicaManager import kafka.server.ReplicaManager
import kafka.server.epoch.LeaderEpochFileCache import kafka.server.epoch.LeaderEpochFileCache
import kafka.tier.domain.{TierObjectMetadata, TierSegmentUploadComplete, TierSegmentUploadInitiate, TierTopicInitLeader} import kafka.tier.domain.{TierObjectMetadata, TierSegmentUploadComplete, TierSegmentUploadInitiate, TierTopicInitLeader}
import kafka.tier.exceptions.{NotTierablePartitionException, TierMetadataRetriableException, TierObjectStoreRetriableException} import kafka.tier.exceptions.{NotTierablePartitionException, TierArchiverFencedException, TierMetadataRetriableException, TierObjectStoreRetriableException}
import kafka.tier.fetcher.CancellationContext import kafka.tier.fetcher.CancellationContext
import kafka.tier.state.TierPartitionState import kafka.tier.state.TierPartitionState
import kafka.tier.state.TierPartitionState.AppendResult import kafka.tier.state.TierPartitionState.AppendResult
...@@ -155,7 +155,7 @@ class ArchiveTaskTest extends KafkaMetricsGroup { ...@@ -155,7 +155,7 @@ class ArchiveTaskTest extends KafkaMetricsGroup {
val uploadableSegment = UploadableSegment(log, logSegment, logSegment.readNextOffset, producerStateOpt, epochStateOpt, abortedTxnsOpt) val uploadableSegment = UploadableSegment(log, logSegment, logSegment.readNextOffset, producerStateOpt, epochStateOpt, abortedTxnsOpt)
val upload = Upload(leaderEpoch, uploadInitiate, uploadableSegment) val upload = Upload(leaderEpoch, uploadInitiate, uploadableSegment)
val uploadResult = ArchiveTask.upload(upload, topicIdPartition, time, tierObjectStore) val uploadResult = ArchiveTask.upload(upload, topicIdPartition, time, ctx, tierObjectStore)
val afterUpload = Await.result(uploadResult, 1 second) val afterUpload = Await.result(uploadResult, 1 second)
assertEquals("metadata size of AfterUpload object is incorrect value", assertEquals("metadata size of AfterUpload object is incorrect value",
...@@ -280,6 +280,34 @@ class ArchiveTaskTest extends KafkaMetricsGroup { ...@@ -280,6 +280,34 @@ class ArchiveTaskTest extends KafkaMetricsGroup {
} }
} }
@Test
def testArchiveTaskCancelledBeforeUpload(): Unit = {
val leaderEpoch = 0
val logSegment = mockLogSegment(tmpFile)
val log = mockAbstractLog(logSegment)
val uploadInitiate = new TierSegmentUploadInitiate(topicIdPartition,
leaderEpoch,
UUID.randomUUID,
logSegment.baseOffset,
logSegment.readNextOffset - 1,
logSegment.maxTimestampSoFar,
logSegment.size,
false,
false,
false,
new OffsetAndEpoch(0L, Optional.empty()))
val uploadableSegment = UploadableSegment(log, logSegment, logSegment.readNextOffset, None, None, None)
val upload = Upload(leaderEpoch, uploadInitiate, uploadableSegment)
ctx.cancel()
val nextState = ArchiveTask.upload(upload, topicIdPartition, time, ctx, tierObjectStore)
assertThrows[TierArchiverFencedException] {
Await.result(nextState, 1 second)
}
}
@Test @Test
def testUnknownExceptionDuringUpload(): Unit = { def testUnknownExceptionDuringUpload(): Unit = {
val nextState = testExceptionHandlingDuringUpload(new IllegalStateException("illegal state"), deleteSegment = false) val nextState = testExceptionHandlingDuringUpload(new IllegalStateException("illegal state"), deleteSegment = false)
...@@ -582,7 +610,7 @@ class ArchiveTaskTest extends KafkaMetricsGroup { ...@@ -582,7 +610,7 @@ class ArchiveTaskTest extends KafkaMetricsGroup {
when(tierObjectStore.putSegment(any(), any(), any(), any(), any(), any(), any())).thenThrow(e) when(tierObjectStore.putSegment(any(), any(), any(), any(), any(), any(), any())).thenThrow(e)
ArchiveTask.upload(upload, topicIdPartition, time, tierObjectStore) ArchiveTask.upload(upload, topicIdPartition, time, ctx, tierObjectStore)
} }
/** /**
...@@ -633,7 +661,7 @@ class ArchiveTaskTest extends KafkaMetricsGroup { ...@@ -633,7 +661,7 @@ class ArchiveTaskTest extends KafkaMetricsGroup {
replicaManager) replicaManager)
val upload = Await.result(beforeUploadResult, 1 second).asInstanceOf[Upload] val upload = Await.result(beforeUploadResult, 1 second).asInstanceOf[Upload]
val uploadResult = ArchiveTask.upload(upload, topicIdPartition, time, tierObjectStore) val uploadResult = ArchiveTask.upload(upload, topicIdPartition, time, ctx, tierObjectStore)
val afterUpload = Await.result(uploadResult, 1 second) val afterUpload = Await.result(uploadResult, 1 second)
val afterUploadResult = ArchiveTask.finalizeUpload(afterUpload, topicIdPartition, time, tierTopicManager, None) val afterUploadResult = ArchiveTask.finalizeUpload(afterUpload, topicIdPartition, time, tierTopicManager, None)
......
package kafka.tier.topic
import java.util.concurrent.TimeUnit
import java.util.{Collections, Optional, Properties}
import kafka.server.{BaseRequestTest, KafkaConfig}
import kafka.tier.TopicIdPartition
import kafka.utils.TestUtils
import org.apache.kafka.clients.admin.{Admin, NewPartitionReassignment}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.config.{ConfluentTopicConfig, TopicConfig}
import org.junit.{Before, Test}
import scala.collection.mutable.ListBuffer
import scala.jdk.CollectionConverters._
class TierTopicPartitionOfflineTest extends BaseRequestTest{
override def brokerCount: Int = 6 // Need 6 servers to keep the two tier state partitions on non overlapping assignment
val topic1: String = "foo"
val topic2: String = "bar"
val tierTopic = "_confluent-tier-state"
val props = new Properties
serverConfig.put(KafkaConfig.TierFeatureProp, "true")
serverConfig.put(KafkaConfig.TierBackendProp, "mock")
serverConfig.put(KafkaConfig.TierS3BucketProp, "mybucket")
serverConfig.put(KafkaConfig.TierLocalHotsetBytesProp, "0")
serverConfig.put(KafkaConfig.TierMetadataNumPartitionsProp, "2")
serverConfig.put(KafkaConfig.MinInSyncReplicasProp, "2")
serverConfig.put(KafkaConfig.LogFlushSchedulerIntervalMsProp, "10")
serverConfig.put(KafkaConfig.TierPartitionStateCommitIntervalProp, "10")
serverConfig.put(KafkaConfig.TierMetadataMaxPollMsProp, "10")
serverConfig.put(KafkaConfig.TierArchiverNumThreadsProp, "1")
override def brokerPropertyOverrides(properties: Properties): Unit = {
serverConfig.stringPropertyNames().forEach(key => properties.put(key, serverConfig.get(key)))
}
@Before
def prepareForTest(): Unit = {
props.clear()
props.put(ConfluentTopicConfig.TIER_ENABLE_CONFIG, "true")
props.put(TopicConfig.SEGMENT_BYTES_CONFIG, "2048")
props.put(ConfluentTopicConfig.TIER_LOCAL_HOTSET_BYTES_CONFIG, "1")
props.put(ConfluentTopicConfig.TIER_LOCAL_HOTSET_MS_CONFIG, "-1")
props.put(TopicConfig.RETENTION_BYTES_CONFIG, "-1")
}
@Test
def testArchiverWhenTierPartitionIsOffline(): Unit = {
// this test asserts that if a tier topic partition goes offline, tiered topic partitions appending their tier metadata
// to other tier topic partitions must still be able to archive.
TestUtils.waitUntilControllerElected(zkClient)
// wait till metadata for tier topic partitions is propagated to all brokers
TestUtils.waitUntilMetadataIsPropagated(servers, tierTopic, 0)
TestUtils.waitUntilMetadataIsPropagated(servers, tierTopic, 1)
val admin = createAdminClient()
// reassign tier topic partitions such that they have minimum overlap. we will later shutdown the brokers that host tier partition 1
val (replicasTierStatePartition0, replicasTierStatePartition1) = reassignTierPartition(admin)
// create two tiered partitions with their replicas assigned at same brokers as the tier topic partition we want to keep online
// also, the two test topics must append their tier metadata to different tier topic partitions
val tp1 = new TopicPartition(topic1, 0)
val leader1 = createTopic(topic1, Map(0 -> replicasTierStatePartition0), props)(0)
val log1 = serverForId(leader1).get.replicaManager.getLog(tp1).get
val tierStatePartitionForTopic1 = getTierTopicPartition(log1.topicIdPartition.get)
val tierStatePartitionForTopic2 = if(tierStatePartitionForTopic1 == 0) 1 else 0
val (leader2, tp2) = createSecondTieredTopic(admin = admin, expectedTierPartition = tierStatePartitionForTopic2, assignment = replicasTierStatePartition0)
// shutdown the brokers that host tier state topic partition 1
replicasTierStatePartition1.foreach(id => if (!replicasTierStatePartition0.contains(id)) serverForId(id).get.shutdown())
debug(s"tier topic partition 1 is now offline")
// append messages to both tiered topics. the one writing its metadata to tier partition 0 (online partition) should continue to tier.
// the one writing it metadata to tier partition 1 (offline due to under min isr) should throw exception while trying to archive segments
if (tierStatePartitionForTopic1 == 0) {
TestUtils.generateAndProduceMessages(servers.toSeq, tp1.topic(), 100)
TestUtils.generateAndProduceMessages(servers.toSeq, tp2.topic(), 100)
// leader writing to the offline tier state partition shall see increased materialization lag
verifyMaterializationLag(leader2)
// only verify tiering for partition whose tier state partition is online. This should not be blocked due to other tier state partition being offline
appendMessagesAndVerifyTier(leader1, tp1)
} else {
TestUtils.generateAndProduceMessages(servers.toSeq, tp1.topic(), 100)
TestUtils.generateAndProduceMessages(servers.toSeq, tp2.topic(), 100)
// leader writing to the offline tier state partition shall see increased materialization lag
verifyMaterializationLag(leader1)
// only verify tiering for partition whose tier state partition is online. This should not be blocked due to other tier state partition being offline
appendMessagesAndVerifyTier(leader2, tp2)
}
}
// verify the trend of materialization lag for the given broker
private def verifyMaterializationLag(brokerId: Int): Unit = {
val tierTopicConsumer = serverForId(brokerId).get.replicaManager.tierReplicaComponents.logComponents.topicConsumerOpt.get
val maxListeningMs = tierTopicConsumer.maxListeningMsMetricName
var curr = 0.toDouble
var prev = 0.toDouble
(1 to 5) foreach(_ => {
Thread.sleep(5000)
prev = curr
curr = serverForId(brokerId).get.metrics.metric(maxListeningMs).metricValue().asInstanceOf[Double]
info(s"curr: $curr prev $prev")
assert(curr >= prev)
})
}
// create a tiered topic with a given assignment such that it will append its metadata to the supplied tier topic partition
// @return a tuple containing leader id and TopicPartition object for the topic created
private def createSecondTieredTopic(admin: Admin, expectedTierPartition: Int, assignment: List[Int]): (Int, TopicPartition) = {
var done = false
var idx = -1
var leader = 0
var name = ""
while(!done) {
idx += 1
name = topic2 + "-" + idx.toString
leader = createTopic(name, Map(0 -> assignment), props)(0)
val log = serverForId(leader).get.replicaManager.getLog(new TopicPartition(name, 0)).get
val toTierPart = getTierTopicPartition(log.topicIdPartition.get)
info(s"Created TopicPartition $name that will append metadata to tier partition $toTierPart. Expected tier partition: $expectedTierPartition")
if (toTierPart != expectedTierPartition)
admin.deleteTopics(List(name).asJavaCollection).all().get(500, TimeUnit.MILLISECONDS)
else
done = true
}
(leader, new TopicPartition(name, 0))
}
// get the tier topic partition for a given tiered partition
private def getTierTopicPartition(topicIdPartition: TopicIdPartition): Int = {
val tierTopicPartitioner = new TierTopicPartitioner(2) // similar to KafkaConfig.TierMetadataNumPartitionsProp
tierTopicPartitioner.partitionId(topicIdPartition)
}
private def appendMessagesAndVerifyTier(leaderId: Int, tp: TopicPartition, numSegments: Int = 5): Unit = {
val log = serverForId(leaderId).get.replicaManager.getLog(tp).get
val numMessages = 50
var totalMessages = log.logEndOffset
val initTotalSegments = log.numberOfSegments
while (log.numberOfSegments <= initTotalSegments + numSegments) {
TestUtils.generateAndProduceMessages(servers.toSeq, tp.topic(), numMessages)
totalMessages += numMessages
}
TestUtils.waitUntilTrue(() => {
log.logEndOffset == totalMessages &&
log.tierPartitionState.numSegments() >= log.numberOfSegments - 1
},"Timeout waiting for all messages to be written and tiered", 90000)
}
// reassign partition 1 of tier topic such that there is a max overlap of one broker in its assignment and that of partition 0.
// NOTE: this method assumes two partitions in the tier topic with replication factor of 3 and a total of 6 brokers in kafka
// cluster.
// @return a tuple of assignments for the two tier topic partitions respectively
private def reassignTierPartition(admin: Admin): (List[Int], List[Int]) = {
val tierTopicDesc = admin.describeTopics(Collections.singletonList(tierTopic)).all().get(500, TimeUnit.MILLISECONDS)
// alter assignment of one of the partitions such that when we make it offline, the other tier topic partition stays online
val replicasForTierStatePartition0 = tierTopicDesc.get(tierTopic).partitions.get(0).replicas().asScala.toList.map(node => node.id())
val replicasForTierStatePartition1 = new ListBuffer[Int]() ++
servers.map(server => server.config.brokerId).toList.filterNot(id => replicasForTierStatePartition0.contains(id))
debug(s"replicasForTierStatePartition0 $replicasForTierStatePartition0 replicasForTierStatePartition1 $replicasForTierStatePartition1")
val tierPartition1 = new TopicPartition(tierTopic, 1)
val reassignments = Map(tierPartition1 -> Optional.of(new NewPartitionReassignment(replicasForTierStatePartition1.toList.map(id => id.asInstanceOf[Integer]).asJava))).asJava
admin.alterPartitionReassignments(reassignments).all().get(500, TimeUnit.MILLISECONDS)
TestUtils.waitUntilTrue(() => admin.listPartitionReassignments().reassignments().get().isEmpty, "Timed out waiting for reassignment to complete", 30000)
// verify reassignment
TestUtils.waitUntilTrue(() => {
val tierTopicDesc = admin.describeTopics(Collections.singletonList(tierTopic)).all().get(500, TimeUnit.MILLISECONDS)
replicasForTierStatePartition1.toList == tierTopicDesc.get(tierTopic).partitions.get(1).replicas().asScala.toList.map(node => node.id())
}, "Unexpected assignment")
(replicasForTierStatePartition0, replicasForTierStatePartition1.toList)
}
}
\ No newline at end of file
0% 加载中 .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册