Skip to content
代码片段 群组 项目
未验证 提交 34a4b1a2 编辑于 作者: Manikumar Reddy's avatar Manikumar Reddy 提交者: GitHub
浏览文件

KSECURITY-477: Avoid sending partial updates during LDAP group manager startup (#6781)

上级 ea9d3f9d
No related branches found
No related tags found
无相关合并请求
......@@ -182,7 +182,13 @@ public class LdapGroupManager {
boolean done = false;
do {
try {
searchAndProcessResults();
if (listener != null) {
Set<String> currentSearchEntries = searchAndProcessResults(false);
listener.start();
removeDeletedEntries(currentSearchEntries);
} else {
searchAndProcessResults(false);
}
done = true;
} catch (Throwable e) {
try {
......@@ -342,7 +348,12 @@ public class LdapGroupManager {
}
// Visibility to override for testing
protected void searchAndProcessResults() throws NamingException, IOException {
protected Set<String> searchAndProcessResults() throws NamingException, IOException {
return searchAndProcessResults(true);
}
// Visibility to override for testing
protected Set<String> searchAndProcessResults(boolean removeDeletedEntries) throws NamingException, IOException {
if (context == null) {
context = contextCreator.createLdapContext();
maybeSetPagingControl(null);
......@@ -369,8 +380,12 @@ public class LdapGroupManager {
maybeSetPagingControl(cookie);
} while (cookie != null);
removeDeletedEntries(currentSearchEntries);
if (removeDeletedEntries) {
removeDeletedEntries(currentSearchEntries);
}
log.debug("Search completed, group cache is {}", userGroupCache);
return currentSearchEntries;
}
private void removeDeletedEntries(Set<String> currentSearchEntries) {
......
......@@ -38,7 +38,6 @@ public class LdapStore implements ExternalStore {
}
listener.generationId = generationId;
ldapGroupManager = createLdapGroupManager(listener);
listener.start();
ldapGroupManager.start();
}
......@@ -75,11 +74,13 @@ public class LdapStore implements ExternalStore {
this.writer = writer;
}
void start() {
@Override
public void start() {
this.active = true;
}
void stop() {
@Override
public void stop() {
this.active = false;
}
......
......@@ -50,4 +50,8 @@ public interface ExternalStoreListener<K, V> {
* Indicates that external store is functioning after an earlier failure.
*/
void resetFailure();
void start();
void stop();
}
......@@ -6,12 +6,22 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import io.confluent.security.auth.provider.ldap.LdapConfig.SearchMode;
import io.confluent.security.auth.store.data.UserKey;
import io.confluent.security.auth.store.data.UserValue;
import io.confluent.security.auth.store.external.ExternalStoreListener;
import io.confluent.security.minikdc.MiniKdcWithLdapService;
import io.confluent.security.test.utils.LdapTestUtils;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.test.TestUtils;
......@@ -263,6 +273,41 @@ public class LdapGroupManagerTest {
verifyGroupMappingChangesFromUsers();
}
@Test
public void testGroupManagerStartupWithListener() throws Exception {
miniKdcWithLdapService.createGroup("grp1", "userA");
miniKdcWithLdapService.createGroup("grp2", "userA", "userB", "userC");
miniKdcWithLdapService.createGroup("grp3", "userB");
miniKdcWithLdapService.createGroup("grp4", "userA", "userB", "userD");
//create existing cache entries to MockUserStoreListener
MockUserStoreListener mockUserStoreListener = new MockUserStoreListener();
mockUserStoreListener.listenerCache.put("userA", Collections.singleton("grp1"));
mockUserStoreListener.listenerCache.put("userB", new HashSet<>(Arrays.asList("grp1", "grp2")));
ldapGroupManager = createGroupManagerWithInverseMapping(0, 10, SearchMode.GROUPS, mockUserStoreListener);
Map<String, Set<String>> userGroupCache = new HashMap<>();
TestUtils.setFieldValue(ldapGroupManager, "userGroupCache", userGroupCache);
ldapGroupManager.start();
//verify userGroupCache in ldapGroupManager
assertEquals(new HashSet<>(Arrays.asList("userA", "userB", "userC", "userD")), userGroupCache.keySet());
assertEquals(new HashSet<>(Arrays.asList("grp1", "grp2", "grp4")), userGroupCache.get("userA"));
assertEquals(new HashSet<>(Arrays.asList("grp2", "grp3", "grp4")), userGroupCache.get("userB"));
assertEquals(new HashSet<>(Collections.singletonList("grp2")), userGroupCache.get("userC"));
assertEquals(new HashSet<>(Collections.singletonList("grp4")), userGroupCache.get("userD"));
//verify listener cache
System.out.println("MANI :" + mockUserStoreListener.listenerCache.keySet());
assertEquals(new HashSet<>(Arrays.asList("userA", "userB", "userC", "userD")), mockUserStoreListener.listenerCache.keySet());
assertEquals(new HashSet<>(Arrays.asList("grp1", "grp2", "grp4")), mockUserStoreListener.listenerCache.get("userA"));
assertEquals(new HashSet<>(Arrays.asList("grp2", "grp3", "grp4")), mockUserStoreListener.listenerCache.get("userB"));
assertEquals(new HashSet<>(Collections.singletonList("grp2")), mockUserStoreListener.listenerCache.get("userC"));
assertEquals(new HashSet<>(Collections.singletonList("grp4")), mockUserStoreListener.listenerCache.get("userD"));
}
/**
* MiniKdc (Apache DirectoryServer doesn't currently have a virtual 'memberOf'
* attribute to provide user->group mapping. Hence this test uses mock user/groups
......@@ -311,11 +356,13 @@ public class LdapGroupManagerTest {
* mapping.
*/
private LdapGroupManager createGroupManagerWithInverseMapping(int refreshIntervalMs,
int pageSize) {
int pageSize,
SearchMode searchMode,
ExternalStoreListener<UserKey, UserValue> listener) {
Properties props = new Properties();
props.putAll(LdapTestUtils.ldapAuthorizerConfigs(miniKdcWithLdapService, refreshIntervalMs));
props.setProperty(LdapConfig.SEARCH_PAGE_SIZE_PROP, String.valueOf(pageSize));
props.setProperty(LdapConfig.SEARCH_MODE_PROP, SearchMode.USERS.name());
props.setProperty(LdapConfig.SEARCH_MODE_PROP, searchMode.name());
props.setProperty(LdapConfig.USER_SEARCH_BASE_PROP, "ou=groups");
props.setProperty(LdapConfig.USER_OBJECT_CLASS_PROP, "groupOfNames");
props.setProperty(LdapConfig.USER_NAME_ATTRIBUTE_PROP, "cn");
......@@ -323,7 +370,60 @@ public class LdapGroupManagerTest {
props.setProperty(LdapConfig.USER_MEMBEROF_ATTRIBUTE_PATTERN_PROP,
"uid=(.*),ou=users,dc=example,dc=com");
LdapConfig ldapConfig = new LdapConfig(props);
return new LdapGroupManager(ldapConfig, time);
return new LdapGroupManager(ldapConfig, time, listener);
}
private LdapGroupManager createGroupManagerWithInverseMapping(int refreshIntervalMs,
int pageSize) {
return createGroupManagerWithInverseMapping(refreshIntervalMs, pageSize, SearchMode.USERS, null);
}
private static class MockUserStoreListener implements ExternalStoreListener<UserKey, UserValue> {
private boolean active;
private Map<String, Set<String>> listenerCache;
MockUserStoreListener() {
this.listenerCache = new ConcurrentHashMap<>();
}
@Override
public void start() {
this.active = true;
}
@Override
public void stop() {
this.active = false;
}
@Override
public void initialize(Map<UserKey, UserValue> initialValues) {
initialValues.forEach(this::update);
}
@Override
public void update(UserKey key, UserValue value) {
if (active) {
listenerCache.put(key.principal().getName(),
value.groups().stream().map(g -> g.getName()).collect(Collectors.toSet()));
}
}
@Override
public void delete(UserKey key) {
if (active) {
listenerCache.remove(key.principal().getName());
}
}
@Override
public void fail(String errorMessage) {
}
@Override
public void resetFailure() {
}
}
}
......@@ -30,6 +30,8 @@ import io.confluent.security.store.kafka.coordinator.MetadataServiceAssignment.A
import io.confluent.security.store.kafka.coordinator.MetadataServiceCoordinator;
import io.confluent.security.store.kafka.coordinator.NodeMetadata;
import io.confluent.security.test.utils.RbacTestUtils;
import java.util.HashSet;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.Metadata;
......@@ -386,17 +388,20 @@ public class MockAuthStore extends KafkaAuthStore {
ExternalStoreListener<UserKey, UserValue> listener) {
return new LdapGroupManager(new LdapConfig(configs), time, listener) {
@Override
protected void searchAndProcessResults() throws NamingException, IOException {
protected Set<String> searchAndProcessResults(boolean removeDeletedEntries) throws NamingException, IOException {
if (configs.get("ldap." + Context.PROVIDER_URL).equals(MOCK_LDAP_URL)) {
Set<String> users = new HashSet<>();
ldapGroups.get().forEach((k, v) -> {
KafkaPrincipal user = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, k);
Set<KafkaPrincipal> groups = v.stream()
.map(name -> new KafkaPrincipal("Group", name))
.collect(Collectors.toSet());
listener.update(new UserKey(user), new UserValue(groups));
users.add(user.getName());
});
return users;
} else
super.searchAndProcessResults();
return super.searchAndProcessResults(removeDeletedEntries);
}
};
}
......
0% 加载中 .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册