Skip to content
代码片段 群组 项目
未验证 提交 732bba0b 编辑于 作者: Tim Fox's avatar Tim Fox 提交者: GitHub
浏览文件

CNKAF-1154: Add load metrics for logical cluster metadata (#2495)

上级 eb93a518
No related branches found
No related tags found
无相关合并请求
......@@ -2,46 +2,51 @@
package io.confluent.kafka.multitenant;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Sets;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.internals.ConfluentConfigs;
import org.apache.kafka.server.multitenant.MultiTenantMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.confluent.kafka.multitenant.quota.QuotaConfig;
import io.confluent.kafka.multitenant.quota.TenantQuotaCallback;
import java.io.IOException;
import java.nio.file.FileSystems;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardWatchEventKinds;
import java.nio.file.WatchEvent;
import java.nio.file.WatchKey;
import java.nio.file.WatchService;
import java.nio.file.StandardWatchEventKinds;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.confluent.kafka.multitenant.quota.QuotaConfig;
import io.confluent.kafka.multitenant.quota.TenantQuotaCallback;
import kafka.server.KafkaConfig$;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.internals.ConfluentConfigs;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.common.metrics.stats.Min;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.multitenant.MultiTenantMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This holds metadata passed from CCloud related to this physical cluster
......@@ -54,7 +59,19 @@ public class PhysicalClusterMetadata implements MultiTenantMetadata {
static final String DATA_DIR_NAME = "..data";
private static final String LOGICAL_CLUSTER_FILE_EXT_WITH_DOT = ".json";
private static final Long CLOSE_TIMEOUT_MS = TimeUnit.SECONDS.toMillis(30);
private static final String LKC_LOAD_METRICS_GROUP_NAME = "confluent-lkc-load-metrics";
private static final String LKC_METADATA_LOAD_TIME_FROM_FS_UPDATE_SENSOR_NAME = "lkc-metadata-load-time-from-fs-update";
static final String LKC_METADATA_LOAD_TIME_FROM_FS_UPDATE_MIN_METRIC_NAME = LKC_METADATA_LOAD_TIME_FROM_FS_UPDATE_SENSOR_NAME + "-min";
static final String LKC_METADATA_LOAD_TIME_FROM_FS_UPDATE_MAX_METRIC_NAME = LKC_METADATA_LOAD_TIME_FROM_FS_UPDATE_SENSOR_NAME + "-max";
static final String LKC_METADATA_LOAD_TIME_FROM_FS_UPDATE_AVG_METRIC_NAME = LKC_METADATA_LOAD_TIME_FROM_FS_UPDATE_SENSOR_NAME + "-avg";
private static final String LKC_METADATA_END_TO_END_LOAD_TIME_SENSOR_NAME = "lkc-metadata-end-to-end-load-time";
static final String LKC_METADATA_END_TO_END_LOAD_TIME_MIN_METRIC_NAME = LKC_METADATA_END_TO_END_LOAD_TIME_SENSOR_NAME + "-min";
static final String LKC_METADATA_END_TO_END_LOAD_TIME_MAX_METRIC_NAME = LKC_METADATA_END_TO_END_LOAD_TIME_SENSOR_NAME + "-max";
static final String LKC_METADATA_END_TO_END_LOAD_TIME_AVG_METRIC_NAME = LKC_METADATA_END_TO_END_LOAD_TIME_SENSOR_NAME + "-avg";
private final Sensor lkcTimeToLoadFromFilesytemUpdateSensor;
private final Sensor lkcTimeToLoadEndToEndSensor;
private final Time time;
private String logicalClustersDir;
private String sslCertsDir;
private List<String> watchDirs = new ArrayList<>();
......@@ -81,7 +98,29 @@ public class PhysicalClusterMetadata implements MultiTenantMetadata {
public TenantLifecycleManager tenantLifecycleManager;
public SslCertificateManager sslCertificateManager;
public PhysicalClusterMetadata() {
public PhysicalClusterMetadata(Metrics metrics) {
this(metrics, Time.SYSTEM);
}
public PhysicalClusterMetadata(Metrics metrics, Time time) {
this.lkcTimeToLoadFromFilesytemUpdateSensor = metrics.sensor(LKC_METADATA_LOAD_TIME_FROM_FS_UPDATE_SENSOR_NAME);
lkcTimeToLoadFromFilesytemUpdateSensor.add(metrics.metricName(LKC_METADATA_LOAD_TIME_FROM_FS_UPDATE_MIN_METRIC_NAME, LKC_LOAD_METRICS_GROUP_NAME,
"The minimum time to load logical cluster metadata from file system update in ms"), new Min());
lkcTimeToLoadFromFilesytemUpdateSensor.add(metrics.metricName(LKC_METADATA_LOAD_TIME_FROM_FS_UPDATE_MAX_METRIC_NAME, LKC_LOAD_METRICS_GROUP_NAME,
"The maximum time to load logical cluster metadata from file system update in ms"), new Max());
lkcTimeToLoadFromFilesytemUpdateSensor.add(metrics.metricName(
LKC_METADATA_LOAD_TIME_FROM_FS_UPDATE_AVG_METRIC_NAME, LKC_LOAD_METRICS_GROUP_NAME,
"The mean time to load logical cluster metadata from file system update in ms"), new Avg());
this.lkcTimeToLoadEndToEndSensor = metrics.sensor(LKC_METADATA_END_TO_END_LOAD_TIME_SENSOR_NAME);
lkcTimeToLoadEndToEndSensor.add(metrics.metricName(LKC_METADATA_END_TO_END_LOAD_TIME_MIN_METRIC_NAME, LKC_LOAD_METRICS_GROUP_NAME,
"The minimum end to end load time of logical cluster metadata in ms"), new Min());
lkcTimeToLoadEndToEndSensor.add(metrics.metricName(LKC_METADATA_END_TO_END_LOAD_TIME_MAX_METRIC_NAME, LKC_LOAD_METRICS_GROUP_NAME,
"The maximum end to end load time of logical cluster metadata in ms"), new Max());
lkcTimeToLoadEndToEndSensor.add(metrics.metricName(
LKC_METADATA_END_TO_END_LOAD_TIME_AVG_METRIC_NAME, LKC_LOAD_METRICS_GROUP_NAME,
"The mean end to end load time of logical cluster metadata in ms"), new Avg());
this.time = time;
this.state = new AtomicReference<>(State.NOT_READY);
this.cacheLock = new ReentrantReadWriteLock();
this.logicalClusterMap = new ConcurrentHashMap<>();
......@@ -469,6 +508,28 @@ public class PhysicalClusterMetadata implements MultiTenantMetadata {
if (!lcMeta.equals(oldMeta)) {
LOG.info("Added/Updated logical cluster {}", lcMeta);
/*
We record values for two sensors:
1. Time to actually load the logical cluster meta data since it was updated
or created on the file system
2. End-end time to load logical cluster meta data for new clusters as measured by time
since creation date in the actual metadata
*/
long now = time.milliseconds();
long lastModified = lcFile.toFile().lastModified();
if (lastModified == 0) {
throw new KafkaException("File " + lcFile.toFile() + " invalid");
}
long timeSinceFsUpdate = now - lastModified;
lkcTimeToLoadFromFilesytemUpdateSensor.record((double) timeSinceFsUpdate);
if (oldMeta == null && lcMeta.lifecycleMetadata() != null) {
Date creationDate = lcMeta.lifecycleMetadata().creationDate();
if (creationDate != null) {
long endToEndLoadTime = now - creationDate.getTime();
lkcTimeToLoadEndToEndSensor.record((double) endToEndLoadTime);
}
}
}
} catch (Exception e) {
......
......@@ -2,43 +2,32 @@
package io.confluent.kafka.multitenant;
import com.google.common.collect.ImmutableSet;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.MockAdminClient;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.internals.ConfluentConfigs;
import org.apache.kafka.server.quota.ClientQuotaType;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.Rule;
import org.junit.rules.TemporaryFolder;
import static io.confluent.kafka.multitenant.Utils.LC_META_DED;
import static io.confluent.kafka.multitenant.Utils.CREATION_DATE_1;
import static io.confluent.kafka.multitenant.Utils.CREATION_DATE_2;
import static io.confluent.kafka.multitenant.Utils.LC_META_ABC;
import static io.confluent.kafka.multitenant.Utils.LC_META_XYZ;
import static io.confluent.kafka.multitenant.Utils.LC_META_DED;
import static io.confluent.kafka.multitenant.Utils.LC_META_HEALTHCHECK;
import static io.confluent.kafka.multitenant.Utils.LC_META_XYZ;
import static java.util.Collections.singletonList;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import java.io.IOException;
import com.google.common.collect.ImmutableSet;
import io.confluent.kafka.multitenant.quota.QuotaConfig;
import io.confluent.kafka.multitenant.quota.TenantQuotaCallback;
import io.confluent.kafka.multitenant.quota.TestCluster;
import java.io.File;
import java.io.IOException;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Paths;
......@@ -47,10 +36,21 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import io.confluent.kafka.multitenant.quota.QuotaConfig;
import io.confluent.kafka.multitenant.quota.TenantQuotaCallback;
import io.confluent.kafka.multitenant.quota.TestCluster;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.MockAdminClient;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.internals.ConfluentConfigs;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.server.quota.ClientQuotaType;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
public class PhysicalClusterMetadataTest {
......@@ -66,6 +66,8 @@ public class PhysicalClusterMetadataTest {
private static final URL TEST_SSL_CERTS_MAY = PhysicalClusterMetadataTest.class.getResource("/cert_exp_may");
private AdminClient mockAdminClient;
private MockTime time;
private Metrics metrics;
private PhysicalClusterMetadata lcCache;
private String sslCertsPath;
private String endpoint;
......@@ -75,7 +77,9 @@ public class PhysicalClusterMetadataTest {
@Before
public void setUp() throws Exception {
lcCache = new PhysicalClusterMetadata();
metrics = new Metrics();
time = new MockTime();
lcCache = new PhysicalClusterMetadata(metrics, time);
Node node = new Node(0, "localhost", 9092);
endpoint = node.host() + ":" + node.port();
mockAdminClient = spy(new MockAdminClient.Builder()
......@@ -92,6 +96,48 @@ public class PhysicalClusterMetadataTest {
lcCache.shutdown();
}
@Test
public void testLoadFromFileSystemUpdateMetric() throws Exception {
// We round up to seconds as some OS only have second precision for last modified time
long now = 1000 + 1000 * (System.currentTimeMillis() / 1000);
time.setCurrentTimeMs(now);
long delay1 = 5000;
long delay2 = 8000;
long lastMod1 = now - delay1;
Utils.createOrUpdateLogicalClusterFile(LC_META_ABC, tempFolder, lastMod1);
long lastMod2 = now - delay2;
Utils.createOrUpdateLogicalClusterFile(LC_META_XYZ, tempFolder, lastMod2);
lcCache.start();
assertTrue("Expected cache to be initialized", lcCache.isUp());
assertEquals(delay1, TestUtils.getMetricValue(metrics,
PhysicalClusterMetadata.LKC_METADATA_LOAD_TIME_FROM_FS_UPDATE_MIN_METRIC_NAME), 0);
assertEquals(delay2, TestUtils.getMetricValue(metrics,
PhysicalClusterMetadata.LKC_METADATA_LOAD_TIME_FROM_FS_UPDATE_MAX_METRIC_NAME), 0);
assertEquals(0.5 * (double) (delay1 + delay2), TestUtils.getMetricValue(metrics,
PhysicalClusterMetadata.LKC_METADATA_LOAD_TIME_FROM_FS_UPDATE_AVG_METRIC_NAME), 0);
}
@Test
public void testEndToEndLoadTimeMetric() throws Exception {
Utils.createLogicalClusterFile(LC_META_ABC, tempFolder);
Utils.createLogicalClusterFile(LC_META_XYZ, tempFolder);
long delay1 = time.milliseconds() - CREATION_DATE_1.getTime();
long delay2 = time.milliseconds() - CREATION_DATE_2.getTime();
lcCache.start();
assertTrue("Expected cache to be initialized", lcCache.isUp());
assertEquals(delay1, TestUtils.getMetricValue(metrics,
PhysicalClusterMetadata.LKC_METADATA_END_TO_END_LOAD_TIME_MIN_METRIC_NAME), 0);
assertEquals(delay2, TestUtils.getMetricValue(metrics,
PhysicalClusterMetadata.LKC_METADATA_END_TO_END_LOAD_TIME_MAX_METRIC_NAME), 0);
assertEquals(0.5 * (double) (delay1 + delay2), TestUtils.getMetricValue(metrics,
PhysicalClusterMetadata.LKC_METADATA_END_TO_END_LOAD_TIME_AVG_METRIC_NAME), 0);
}
@Test
public void testCreateAndRemoveInstance() throws Exception {
final String brokerUUID = "test-uuid";
......@@ -105,7 +151,7 @@ public class PhysicalClusterMetadataTest {
// get instance does not create instance
assertNull(PhysicalClusterMetadata.getInstance(brokerUUID));
final PhysicalClusterMetadata metadata = new PhysicalClusterMetadata();
final PhysicalClusterMetadata metadata = new PhysicalClusterMetadata(metrics, time);
metadata.configure(configs);
assertTrue("Expected cache to be initialized", metadata.isUp());
assertEquals(metadata, PhysicalClusterMetadata.getInstance(brokerUUID));
......@@ -117,7 +163,7 @@ public class PhysicalClusterMetadataTest {
public void testConfigureInstanceWithoutDirConfigThrowsException() {
Map<String, Object> configs = new HashMap<>();
configs.put("broker.session.uuid", "test-uuid-1");
final PhysicalClusterMetadata metadata = new PhysicalClusterMetadata();
final PhysicalClusterMetadata metadata = new PhysicalClusterMetadata(metrics, time);
metadata.configure(configs);
}
......@@ -129,14 +175,14 @@ public class PhysicalClusterMetadataTest {
configs.put(ConfluentConfigs.MULTITENANT_METADATA_DIR_CONFIG,
tempFolder.getRoot().getCanonicalPath());
final PhysicalClusterMetadata meta1 = new PhysicalClusterMetadata();
final PhysicalClusterMetadata meta1 = new PhysicalClusterMetadata(metrics, time);
meta1.configure(configs);
assertEquals(meta1, PhysicalClusterMetadata.getInstance(brokerUUID));
// configure() on the same instance and broker UUId does nothing
meta1.configure(configs);
assertEquals(meta1, PhysicalClusterMetadata.getInstance(brokerUUID));
final PhysicalClusterMetadata meta2 = new PhysicalClusterMetadata();
final PhysicalClusterMetadata meta2 = new PhysicalClusterMetadata(metrics, time);
// configuring another instance with the same broker uuid should fail
try {
meta2.configure(configs);
......@@ -1079,7 +1125,7 @@ public class PhysicalClusterMetadataTest {
tempFolder.getRoot().getCanonicalPath());
configs.put(ConfluentConfigs.MULTITENANT_METADATA_SSL_CERTS_SPEC_CONFIG,
tempFolder.getRoot().getCanonicalPath() + "/mnt/sslcerts/");
PhysicalClusterMetadata metadata = new PhysicalClusterMetadata();
PhysicalClusterMetadata metadata = new PhysicalClusterMetadata(metrics, time);
metadata.configure(configs);
metadata.handleSocketServerInitialized(endpoint);
assertNotNull(metadata.sslCertificateManager.getAdminClient());
......@@ -1094,7 +1140,7 @@ public class PhysicalClusterMetadataTest {
tempFolder.getRoot().getCanonicalPath());
configs.put(ConfluentConfigs.MULTITENANT_METADATA_SSL_CERTS_SPEC_CONFIG,
tempFolder.getRoot().getCanonicalPath() + "/mnt/sslcerts/");
PhysicalClusterMetadata metadata = new PhysicalClusterMetadata();
PhysicalClusterMetadata metadata = new PhysicalClusterMetadata(metrics, time);
metadata.configure(configs);
metadata.handleSocketServerInitialized(endpoint);
assertNull(metadata.sslCertificateManager.getAdminClient());
......@@ -1108,7 +1154,7 @@ public class PhysicalClusterMetadataTest {
configs.put("broker.session.uuid", BROKER_UUID);
configs.put(ConfluentConfigs.MULTITENANT_METADATA_DIR_CONFIG,
tempFolder.getRoot().getCanonicalPath());
PhysicalClusterMetadata metadata = new PhysicalClusterMetadata();
PhysicalClusterMetadata metadata = new PhysicalClusterMetadata(metrics, time);
metadata.configure(configs);
metadata.handleSocketServerInitialized(endpoint);
assertNull(metadata.sslCertificateManager.getAdminClient());
......@@ -1123,7 +1169,7 @@ public class PhysicalClusterMetadataTest {
configs.put(ConfluentConfigs.MULTITENANT_METADATA_DIR_CONFIG,
tempFolder.getRoot().getCanonicalPath());
configs.put(ConfluentConfigs.MULTITENANT_METADATA_SSL_CERTS_SPEC_CONFIG, "tempfolderpathmntsslcerts");
PhysicalClusterMetadata metadata = new PhysicalClusterMetadata();
PhysicalClusterMetadata metadata = new PhysicalClusterMetadata(metrics, time);
metadata.configure(configs);
metadata.handleSocketServerInitialized(endpoint);
assertNull(metadata.sslCertificateManager.getAdminClient());
......@@ -1171,4 +1217,5 @@ public class PhysicalClusterMetadataTest {
assertFalse(Files.exists(Paths.get(logicalClustersDir)));
verify(mockAdminClient, timeout(TEST_MAX_WAIT_MS).times(2)).incrementalAlterConfigs(any(), any());
}
}
......@@ -5,6 +5,8 @@ package io.confluent.kafka.multitenant;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.common.config.internals.ConfluentConfigs;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.Time;
import org.junit.rules.TemporaryFolder;
import java.io.File;
......@@ -22,20 +24,22 @@ import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
public class Utils {
public static final Date CREATION_DATE_1 = new Date(System.currentTimeMillis() - 124356);
public static final Date CREATION_DATE_2 = new Date(System.currentTimeMillis() - 654321);
public static final LogicalClusterMetadata LC_META_XYZ =
new LogicalClusterMetadata("lkc-xyz", "pkc-xyz", "xyz",
"my-account", "k8s-abc", LogicalClusterMetadata.KAFKA_LOGICAL_CLUSTER_TYPE,
104857600L, 10240000L, 2048L, null, null,
LogicalClusterMetadata.DEFAULT_REQUEST_PERCENTAGE_PER_BROKER.longValue(),
LogicalClusterMetadata.DEFAULT_NETWORK_QUOTA_OVERHEAD_PERCENTAGE,
new LogicalClusterMetadata.LifecycleMetadata("xyz", "pkc-xyz", null, null));
new LogicalClusterMetadata.LifecycleMetadata("xyz", "pkc-xyz", CREATION_DATE_2, null));
public static final LogicalClusterMetadata LC_META_ABC =
new LogicalClusterMetadata("lkc-abc", "pkc-abc", "abc",
"my-account", "k8s-abc", LogicalClusterMetadata.KAFKA_LOGICAL_CLUSTER_TYPE,
10485760L, 102000000L, 204800L, null, null,
LogicalClusterMetadata.DEFAULT_REQUEST_PERCENTAGE_PER_BROKER.longValue(),
LogicalClusterMetadata.DEFAULT_NETWORK_QUOTA_OVERHEAD_PERCENTAGE,
new LogicalClusterMetadata.LifecycleMetadata("abc", "pkc-abc", null, null));
new LogicalClusterMetadata.LifecycleMetadata("abc", "pkc-abc", CREATION_DATE_1, null));
// Note that this cluster will be deactivated on arrival, but maybe not deleted yet
public static final LogicalClusterMetadata LC_META_DED =
new LogicalClusterMetadata("lkc-ded", "pkc-ded", "ded",
......@@ -79,7 +83,7 @@ public class Utils {
public static PhysicalClusterMetadata initiatePhysicalClusterMetadata(Map<String, Object> configs, long reloadDelay) throws IOException {
configs.put(ConfluentConfigs.MULTITENANT_METADATA_RELOAD_DELAY_MS_CONFIG, reloadDelay);
PhysicalClusterMetadata metadata = new PhysicalClusterMetadata();
PhysicalClusterMetadata metadata = new PhysicalClusterMetadata(new Metrics(), Time.SYSTEM);
metadata.configure(configs);
return metadata;
......@@ -88,13 +92,13 @@ public class Utils {
public static void createLogicalClusterFile(LogicalClusterMetadata lcMeta, TemporaryFolder
tempFolder)
throws IOException {
updateLogicalClusterFile(lcMeta, false, true, tempFolder);
updateLogicalClusterFile(lcMeta, false, true, tempFolder, null);
}
public static void createInvalidLogicalClusterFile(LogicalClusterMetadata lcMeta, TemporaryFolder
tempFolder)
throws IOException {
updateLogicalClusterFile(lcMeta, false, false, tempFolder);
updateLogicalClusterFile(lcMeta, false, false, tempFolder, null);
}
/**
......@@ -105,19 +109,19 @@ public class Utils {
public static void updateLogicalClusterFile(LogicalClusterMetadata lcMeta, TemporaryFolder
tempFolder)
throws IOException {
updateLogicalClusterFile(lcMeta, false, true, tempFolder);
updateLogicalClusterFile(lcMeta, false, true, tempFolder, null);
}
public static void updateInvalidLogicalClusterFile(LogicalClusterMetadata lcMeta, TemporaryFolder
tempFolder)
throws IOException {
updateLogicalClusterFile(lcMeta, false, false, tempFolder);
updateLogicalClusterFile(lcMeta, false, false, tempFolder, null);
}
public static void deleteLogicalClusterFile(LogicalClusterMetadata lcMeta, TemporaryFolder tempFolder)
throws IOException {
updateLogicalClusterFile(lcMeta, true, true, tempFolder);
updateLogicalClusterFile(lcMeta, true, true, tempFolder, null);
}
public static void setPosixFilePermissions(LogicalClusterMetadata lcMeta,
......@@ -141,19 +145,34 @@ public class Utils {
lcMeta.networkQuotaOverhead(), lcMeta.lifecycleMetadata());
}
public static void createOrUpdateLogicalClusterFile(LogicalClusterMetadata lcMeta,
TemporaryFolder tempFolder,
long lastModified) throws IOException {
final String lcFilename = lcMeta.logicalClusterId() + ".json";
updateJsonFile(lcFilename, logicalClusterJsonString(lcMeta, true), false, tempFolder, lastModified);
}
private static void updateLogicalClusterFile(LogicalClusterMetadata lcMeta,
boolean remove,
boolean valid,
TemporaryFolder tempFolder) throws IOException {
TemporaryFolder tempFolder,
Long lastModified) throws IOException {
final String lcFilename = lcMeta.logicalClusterId() + ".json";
updateJsonFile(lcFilename, logicalClusterJsonString(lcMeta, valid), remove, tempFolder);
updateJsonFile(lcFilename, logicalClusterJsonString(lcMeta, valid), remove, tempFolder, lastModified);
}
public static Path updateJsonFile(String jsonFilename,
String jsonString,
boolean remove,
TemporaryFolder tempFolder) throws IOException {
return updateJsonFile(jsonFilename, jsonString, remove, tempFolder, null);
}
public static Path updateJsonFile(String jsonFilename,
String jsonString,
boolean remove,
TemporaryFolder tempFolder)
TemporaryFolder tempFolder,
Long lastModified)
throws IOException {
// create logical cluster file in tempFolder/<newDir>
final Path newDir = tempFolder.newFolder().toPath();
......@@ -161,6 +180,9 @@ public class Utils {
if (!remove) {
lcFile = Paths.get(newDir.toString(), jsonFilename);
Files.write(lcFile, jsonString.getBytes());
if (lastModified != null) {
lcFile.toFile().setLastModified(lastModified);
}
}
// this is ..data symbolic link
......
......@@ -30,8 +30,10 @@ import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.Endpoint;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.network.Mode;
import org.apache.kafka.common.requests.SamplingRequestLogFilter;
......@@ -531,13 +533,17 @@ public class ConfluentConfigs {
return interceptor;
}
public static MultiTenantMetadata buildMultitenantMetadata(Map<String, ?> configs) {
public static MultiTenantMetadata buildMultitenantMetadata(Map<String, ?> configs, Metrics metrics) {
MultiTenantMetadata meta = null;
if (configs.get(MULTITENANT_METADATA_CLASS_CONFIG) != null) {
@SuppressWarnings("unchecked")
Class<? extends MultiTenantMetadata> multitenantMetadataClass =
(Class<? extends MultiTenantMetadata>) configs.get(MULTITENANT_METADATA_CLASS_CONFIG);
meta = Utils.newInstance(multitenantMetadataClass);
try {
meta = Utils.newParameterizedInstance(multitenantMetadataClass.getName(), Metrics.class, metrics);
} catch (ClassNotFoundException e) {
throw new KafkaException(e);
}
meta.configure(configs);
}
return meta;
......
......@@ -464,7 +464,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
// multi-tenant metadata watcher should be initialized after dynamic config manager is
// initialized and before socket server processors
multitenantMetadata = ConfluentConfigs.buildMultitenantMetadata(config.values)
multitenantMetadata = ConfluentConfigs.buildMultitenantMetadata(config.values, metrics)
//auditLogProvider.start() will called as part of authorizer.start()
auditLogProvider = AuditLogProviderFactory.create(config.originals, clusterId)
......
0% 加载中 .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册