Skip to content
代码片段 群组 项目
未验证 提交 08101e04 编辑于 作者: Alok Thatikunta's avatar Alok Thatikunta 提交者: GitHub
浏览文件

KSTORAGE-973: Check for invalid topic partitions within the fencing tool (#2472)

* Validate partitions before fencing

* Fix build failure and tests

* Add integration test

* Fix indentation issues

* Add more integration tests and address comments

* PR comments

* Address PR comments

* PR comments
上级 3c9c6be0
No related branches found
No related tags found
无相关合并请求
......@@ -12,11 +12,16 @@ import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Properties;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import kafka.server.KafkaConfig;
import kafka.tier.TopicIdPartition;
import kafka.tier.domain.TierPartitionFence;
......@@ -24,9 +29,13 @@ import kafka.tier.tools.common.FenceEventInfo;
import kafka.tier.topic.TierTopic;
import kafka.utils.CoreUtils;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.common.utils.Utils;
import net.sourceforge.argparse4j.ArgumentParsers;
......@@ -81,10 +90,10 @@ public class TierPartitionStateFencingTrigger {
.required(true)
.help(TierPartitionStateFencingTrigger.FILE_FENCE_TARGET_PARTITIONS_DOC);
parser.addArgument(RecoveryUtils.makeArgument(TierPartitionStateFencingTrigger.OUTPUT_CONFIG))
.dest(TierPartitionStateFencingTrigger.OUTPUT_CONFIG)
.type(String.class)
.required(true)
.help(TierPartitionStateFencingTrigger.OUTPUT_CONFIG_DOC);
.dest(TierPartitionStateFencingTrigger.OUTPUT_CONFIG)
.type(String.class)
.required(true)
.help(TierPartitionStateFencingTrigger.OUTPUT_CONFIG_DOC);
return parser;
}
......@@ -155,6 +164,8 @@ public class TierPartitionStateFencingTrigger {
if (!file.createNewFile())
throw new IOException("Could not create output file at path " + outputFile);
validatePartitions(parser, props, tieredTopicIdPartitions);
try (FileOutputStream fos = new FileOutputStream(file)) {
List<FenceEventInfo> events = injectFencingEvents(props, tierTopicNamespace,
tieredTopicIdPartitions);
......@@ -162,6 +173,46 @@ public class TierPartitionStateFencingTrigger {
}
}
// Gets the list of valid partitions and checks if the partitions to be fenced are valid
public static void validatePartitions(
ArgumentParser parser,
Properties properties,
List<TopicIdPartition> tieredTopicIdPartitions
) throws ArgumentParserException, CancellationException {
List<String> checkTopics = tieredTopicIdPartitions.stream()
.map(TopicIdPartition::topic)
.collect(Collectors.toList());
Map<String, TopicDescription> results;
try (final Admin admin = Admin.create(properties)) {
results = admin.describeTopics(checkTopics).all().get();
} catch (Exception e) {
throw new RuntimeException("Could not validate fencing input user topics", e);
}
Set<TopicPartition> inputPartitions = tieredTopicIdPartitions.stream()
.map(TopicIdPartition::topicPartition)
.collect(Collectors.toSet());
Set<TopicPartition> validPartitions = new HashSet<>();
for (TopicPartition inpputPartition : inputPartitions) {
TopicDescription topicDescription = results.get(inpputPartition.topic());
if (topicDescription.isInternal()) {
throw new ArgumentParserException(String.format(
"Internal topic: '%s' can not be fenced", inpputPartition.topic()), parser);
}
List<TopicPartitionInfo> partitionInfoList = topicDescription.partitions();
for (TopicPartitionInfo partitionInfo: partitionInfoList) {
validPartitions.add(
new TopicPartition(inpputPartition.topic(), partitionInfo.partition()));
}
}
inputPartitions.removeAll(validPartitions);
if (!inputPartitions.isEmpty()) {
throw new ArgumentParserException(String.format(
"Found invalid partitions: %s", inputPartitions), parser);
}
}
public static List<FenceEventInfo> injectFencingEvents(
Properties properties,
String tierTopicNamespace,
......
......@@ -26,6 +26,7 @@ import kafka.tier.TopicIdPartition
import kafka.tier.domain.TierPartitionForceRestore
import kafka.tier.fetcher.TierStateFetcher
import kafka.tier.state.TierPartitionState.RestoreResult
import kafka.tier.tools.RecoveryTestUtils.writeFencingFile
import kafka.tier.tools.common.FenceEventInfo
import kafka.tier.topic.TierTopicManagerConfig
import kafka.tier.topic.TierTopic
......@@ -37,6 +38,7 @@ import kafka.zk.AdminZkClient
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.errors.TopicExistsException
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.utils.{MockTime, Utils}
import org.junit.{After, Before, Test}
import org.junit.Assert.{assertEquals, assertTrue}
......@@ -55,9 +57,9 @@ class TierPartitionStateFencingTriggerTest extends IntegrationTestHarness {
private val logDir = TestUtils.tempDir().getAbsolutePath
private var tierPartitionStateFiles: Array[FileTierPartitionState] = Array()
private val tpidsToBeFenced: Array[TopicIdPartition] = Array(
new TopicIdPartition("the_dark_knight", UUID.randomUUID(),123),
new TopicIdPartition("mummy_returns", UUID.randomUUID(), 456),
new TopicIdPartition("mission_impossible", UUID.randomUUID(), 789)
new TopicIdPartition("the_dark_knight", UUID.randomUUID(),2),
new TopicIdPartition("mummy_returns", UUID.randomUUID(), 3),
new TopicIdPartition("mission_impossible", UUID.randomUUID(), 5)
)
private var topicIdPartitionsFile: File = _
private var outputJsonFile: File = _
......@@ -89,6 +91,9 @@ class TierPartitionStateFencingTriggerTest extends IntegrationTestHarness {
super.setUp()
topicIdPartitionsFile = TestUtils.tempFile()
outputJsonFile = TestUtils.tempFile()
for (tpid <- tpidsToBeFenced) {
createTopic(tpid.topic(), 10, 1, TierTopicAdmin.topicConfig)
}
RecoveryTestUtils.writeFencingFile(topicIdPartitionsFile, tpidsToBeFenced.toList)
propertiesConfFile = TestUtils.tempFile()
}
......@@ -150,7 +155,7 @@ class TierPartitionStateFencingTriggerTest extends IntegrationTestHarness {
fenceOutFile))
val fenceEvents = FenceEventInfo.jsonToList(Paths.get(fenceOutFile)).asScala
assertEquals(tpidsToBeFenced.size, fenceEvents.size)
assertEquals(tpidsToBeFenced.length, fenceEvents.size)
val partitionToFenceEventInfoMap = Map[Int, FenceEventInfo]()
(fenceEvents zip tpidsToBeFenced).map {
......@@ -179,11 +184,11 @@ class TierPartitionStateFencingTriggerTest extends IntegrationTestHarness {
Defaults.TierMetadataRequestTimeoutMs,
Defaults.TierPartitionStateCommitInterval,
Collections.singletonList(logDir))
val primaryConsumerSupplier = new TierTopicConsumerSupplier(config, "primary");
val primaryConsumerSupplier = new TierTopicConsumerSupplier(config, "primary")
val verificationConsumer = primaryConsumerSupplier.get()
val tierTopicPartitions = TierTopicManager.partitions(
tierTopic.topicName(), tierTopic.numPartitions().getAsInt());
verificationConsumer.assign(tierTopicPartitions);
tierTopic.topicName(), tierTopic.numPartitions().getAsInt())
verificationConsumer.assign(tierTopicPartitions)
tierTopicPartitions.forEach(
tp => verificationConsumer.seekToBeginning(Collections.singletonList(tp)))
val records = new ListBuffer[ConsumerRecord[Array[Byte], Array[Byte]]]()
......@@ -193,14 +198,14 @@ class TierPartitionStateFencingTriggerTest extends IntegrationTestHarness {
while (batchIterator.hasNext) {
records += batchIterator.next
}
records.size == tpidsToBeFenced.size
records.size == tpidsToBeFenced.length
}, "Timed out trying to fetch TierTopic records")
val allFencedTpids = collection.mutable.Set[TopicIdPartition]() ++ tpidsToBeFenced
records.foreach(record => {
// Ensure each event is a TierPartitionFence event.
val eventOpt: Optional[AbstractTierMetadata]
= AbstractTierMetadata.deserialize(record.key(), record.value());
= AbstractTierMetadata.deserialize(record.key(), record.value())
assertTrue(eventOpt.isPresent)
assertEquals(TierRecordType.PartitionFence, eventOpt.get.`type`())
......@@ -240,7 +245,7 @@ class TierPartitionStateFencingTriggerTest extends IntegrationTestHarness {
}
@Test
def testFencingWithBadTopicIdPartitionFile(): Unit = {
def testFencingWithEmptyTopicIdPartitionFile(): Unit = {
Utils.mkProperties(
new HashMap[String, String] {
put(KafkaConfig.TierMetadataBootstrapServersProp, brokerList)
......@@ -248,7 +253,7 @@ class TierPartitionStateFencingTriggerTest extends IntegrationTestHarness {
}
).store(new PrintWriter(propertiesConfFile), "")
// 1. Empty TopicIdPartition file should cause ArgumentParserException to be raised.
// Empty TopicIdPartition file should cause ArgumentParserException to be raised.
val emptyTopicIdPartitionsFile = TestUtils.tempFile()
assertThrows[ArgumentParserException] {
kafka.tier.tools.TierPartitionStateFencingTrigger.main(Array(
......@@ -262,14 +267,18 @@ class TierPartitionStateFencingTriggerTest extends IntegrationTestHarness {
kafka.tier.tools.TierPartitionStateFencingTrigger.OUTPUT_CONFIG),
outputJsonFile.getPath))
}
}
// 2. Badly formatted TopicIdPartition file should cause ArgumentParserException to be raised.
@Test
def testFencingWithBadlyFormattedTopicIdPartitionFile(): Unit = {
// Badly formatted TopicIdPartition file should cause ArgumentParserException to be raised.
val badTopicIdPartitionsFile = TestUtils.tempFile()
val pw = new PrintWriter(badTopicIdPartitionsFile)
// Third field is intentionally missing in the printed line
pw.write("%s,%s".format("abc", "def"))
pw.println()
pw.close
assertThrows[ArgumentParserException] {
kafka.tier.tools.TierPartitionStateFencingTrigger.main(Array(
kafka.tier.tools.RecoveryUtils.makeArgument(
......@@ -284,6 +293,72 @@ class TierPartitionStateFencingTriggerTest extends IntegrationTestHarness {
}
}
@Test
def testFencingWithInvalidTopicInput(): Unit = {
// Invalid topic should cause ArgumentParsesException to be raised.
val invalidTopicIdPartitionsFile = TestUtils.tempFile()
val invalidTpids: List[TopicIdPartition] = List(
new TopicIdPartition("joker", UUID.randomUUID(),7))
writeFencingFile(invalidTopicIdPartitionsFile, invalidTpids)
assertThrows[ArgumentParserException] {
kafka.tier.tools.TierPartitionStateFencingTrigger.main(Array(
kafka.tier.tools.RecoveryUtils.makeArgument(
kafka.tier.tools.RecoveryUtils.TIER_PROPERTIES_CONF_FILE_CONFIG),
propertiesConfFile.getPath,
kafka.tier.tools.RecoveryUtils.makeArgument(
kafka.tier.tools.TierPartitionStateFencingTrigger.FILE_FENCE_TARGET_PARTITIONS_CONFIG),
invalidTopicIdPartitionsFile.getPath,
kafka.tier.tools.RecoveryUtils.makeArgument(
kafka.tier.tools.TierPartitionStateFencingTrigger.OUTPUT_CONFIG),
outputJsonFile.getPath))
}
}
@Test
def testFencingWithInvalidPartitionInput(): Unit = {
// Valid topic but invalid partition should cause ArgumentParsesException to be raised.
val invalidTopicIdPartitionsFile = TestUtils.tempFile()
val invalidTpids: List[TopicIdPartition] = List(
new TopicIdPartition(tpidsToBeFenced(0).topic, tpidsToBeFenced(0).topicId(),13))
writeFencingFile(invalidTopicIdPartitionsFile, invalidTpids)
assertThrows[ArgumentParserException] {
kafka.tier.tools.TierPartitionStateFencingTrigger.main(Array(
kafka.tier.tools.RecoveryUtils.makeArgument(
kafka.tier.tools.RecoveryUtils.TIER_PROPERTIES_CONF_FILE_CONFIG),
propertiesConfFile.getPath,
kafka.tier.tools.RecoveryUtils.makeArgument(
kafka.tier.tools.TierPartitionStateFencingTrigger.FILE_FENCE_TARGET_PARTITIONS_CONFIG),
invalidTopicIdPartitionsFile.getPath,
kafka.tier.tools.RecoveryUtils.makeArgument(
kafka.tier.tools.TierPartitionStateFencingTrigger.OUTPUT_CONFIG),
outputJsonFile.getPath))
}
}
@Test
def testFencingWithInternalTopicInput(): Unit = {
// Internal topic should cause ArgumentParsesException to be raised.
val internalTopicIdPartitionsFile = TestUtils.tempFile()
val internalTpids: List[TopicIdPartition] = List(
new TopicIdPartition(Topic.GROUP_METADATA_TOPIC_NAME, UUID.randomUUID(),0))
writeFencingFile(internalTopicIdPartitionsFile, internalTpids)
assertThrows[ArgumentParserException] {
kafka.tier.tools.TierPartitionStateFencingTrigger.main(Array(
kafka.tier.tools.RecoveryUtils.makeArgument(
kafka.tier.tools.RecoveryUtils.TIER_PROPERTIES_CONF_FILE_CONFIG),
propertiesConfFile.getPath,
kafka.tier.tools.RecoveryUtils.makeArgument(
kafka.tier.tools.TierPartitionStateFencingTrigger.FILE_FENCE_TARGET_PARTITIONS_CONFIG),
internalTopicIdPartitionsFile.getPath,
kafka.tier.tools.RecoveryUtils.makeArgument(
kafka.tier.tools.TierPartitionStateFencingTrigger.OUTPUT_CONFIG),
outputJsonFile.getPath))
}
}
@Test
def testFencingWithBadPropertiesFile(): Unit = {
// 1. Bad properties file path should cause ArgumentParserException to be raised.
......
0% 加载中 .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册