Skip to content
代码片段 群组 项目
提交 fd10640a 编辑于 作者: Stanislav Kozlovski's avatar Stanislav Kozlovski
浏览文件

Merge remote-tracking branch 'origin/master' into june-30-ce-kafka-merge-based-off-june-15-branch

Remove extra test file
No related branches found
No related tags found
无相关合并请求
......@@ -258,7 +258,7 @@ public class KafkaChannelTest {
@Test
public void testUnreadableSocket() throws IOException {
SelectionKey key = Mockito.mock(SelectionKey.class);
Mockito.when(key.readyOps()).thenReturn(0).thenReturn(SelectionKey.OP_READ);
Mockito.when(key.isReadable()).thenReturn(false).thenReturn(true);
SocketChannel socketChannel = Mockito.mock(SocketChannel.class);
Mockito.when(socketChannel.read(Mockito.any(ByteBuffer.class))).thenReturn(-1);
KafkaChannel channel = createKafkaChannel(new ProxyProtocolEngineFactory(ProxyProtocol.V1), key, socketChannel);
......@@ -269,7 +269,7 @@ public class KafkaChannelTest {
assertEquals(KafkaChannel.ChannelProxyState.PROXY_PENDING, channel.proxyState());
assertFalse(channel.hasBytesBuffered());
// Second prepare() call should throw and EOFException when reading from the socket channel
// Second prepare() call should throw an EOFException when reading from the socket channel
assertThrows(EOFException.class, channel::prepare);
}
......@@ -297,7 +297,7 @@ public class KafkaChannelTest {
if (key == null) {
key = Mockito.mock(SelectionKey.class);
Mockito.when(key.readyOps()).thenReturn(SelectionKey.OP_READ);
Mockito.when(key.isReadable()).thenReturn(true);
}
if (channel == null) {
channel = Mockito.mock(SocketChannel.class);
......
......@@ -244,7 +244,7 @@ public class SaslServerAuthenticatorTest {
mockRequest(saslHandshakeRequest(mechanism), transportLayer);
authenticator.authenticate();
when(saslServer.isComplete()).thenReturn(false).thenReturn(true);
when(saslServer.isComplete()).thenReturn(true);
mockRequest(saslAuthenticateRequest(), transportLayer);
authenticator.authenticate();
......@@ -277,7 +277,7 @@ public class SaslServerAuthenticatorTest {
mockRequest(saslHandshakeRequest(mechanism), transportLayer);
authenticator.authenticate();
when(saslServer.isComplete()).thenReturn(false).thenReturn(true);
when(saslServer.isComplete()).thenReturn(true);
mockRequest(saslAuthenticateRequest(), transportLayer);
authenticator.authenticate();
......@@ -310,7 +310,7 @@ public class SaslServerAuthenticatorTest {
mockRequest(saslHandshakeRequest(mechanism), transportLayer);
authenticator.authenticate();
when(saslServer.isComplete()).thenReturn(false).thenReturn(true);
when(saslServer.isComplete()).thenReturn(true);
mockRequest(saslAuthenticateRequest(), transportLayer);
authenticator.authenticate();
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.security.authenticator;
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
import org.apache.kafka.common.message.ApiMessageType;
import org.apache.kafka.common.message.SaslAuthenticateRequestData;
import org.apache.kafka.common.message.SaslHandshakeRequestData;
import org.apache.kafka.common.network.ChannelBuilders;
import org.apache.kafka.common.network.ChannelMetadataRegistry;
import org.apache.kafka.common.network.DefaultChannelMetadataRegistry;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.network.MockAsyncAuthExecutor;
import org.apache.kafka.common.network.TransportLayer;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.MessageContext;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.ApiVersionsResponse;
import org.apache.kafka.common.requests.RequestHeader;
import org.apache.kafka.common.requests.RequestTestUtils;
import org.apache.kafka.common.requests.ResponseHeader;
import org.apache.kafka.common.requests.SaslAuthenticateRequest;
import org.apache.kafka.common.requests.SaslAuthenticateResponse;
import org.apache.kafka.common.requests.SaslHandshakeRequest;
import org.apache.kafka.common.security.DefaultRequestCallbackManager;
import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.auth.KafkaPrincipalBuilder;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.security.kerberos.KerberosShortNamer;
import org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule;
import org.apache.kafka.common.security.plain.PlainLoginModule;
import org.apache.kafka.common.security.ssl.SslPrincipalMapper;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.junit.jupiter.api.Test;
import org.mockito.Answers;
import org.mockito.ArgumentCaptor;
import org.mockito.MockedStatic;
import org.mockito.Mockito;
import org.mockito.stubbing.Answer;
import javax.security.auth.Subject;
import javax.security.sasl.Sasl;
import javax.security.sasl.SaslException;
import javax.security.sasl.SaslServer;
import java.io.IOException;
import java.net.InetAddress;
import java.nio.Buffer;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyMap;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class SaslServerAuthenticatorTestTwo {
private final String clientId = "clientId";
@Test
public void testSessionExpiresAtTokenExpiry() throws IOException {
String mechanism = OAuthBearerLoginModule.OAUTHBEARER_MECHANISM;
SaslServer saslServer = mock(SaslServer.class);
MockTime time = new MockTime(0, 1, 1000);
Duration tokenExpiryShorterThanMaxReauth = Duration.ofSeconds(2);
long maxReauthMs = tokenExpiryShorterThanMaxReauth.multipliedBy(2).toMillis();
try (
MockedStatic<?> ignored = mockSaslServer(saslServer, mechanism, time, tokenExpiryShorterThanMaxReauth);
MockedStatic<?> ignored2 = mockKafkaPrincipal("[principal-type]", "[principal-name");
TransportLayer transportLayer = mockTransportLayer()
) {
SaslServerAuthenticator authenticator = getSaslServerAuthenticatorForOAuth(mechanism, transportLayer, time, maxReauthMs);
mockRequest(saslHandshakeRequest(mechanism), transportLayer);
authenticator.authenticate();
when(saslServer.isComplete()).thenReturn(false).thenReturn(true);
mockRequest(saslAuthenticateRequest(), transportLayer);
authenticator.authenticate();
long atTokenExpiryNanos = time.nanoseconds() + tokenExpiryShorterThanMaxReauth.toNanos();
assertEquals(atTokenExpiryNanos, authenticator.serverSessionExpirationTimeNanos());
ByteBuffer secondResponseSent = getResponses(transportLayer).get(1);
consumeSizeAndHeader(secondResponseSent);
SaslAuthenticateResponse response = SaslAuthenticateResponse.parse(secondResponseSent, (short) 2, MessageContext.IDENTITY);
assertEquals(tokenExpiryShorterThanMaxReauth.toMillis(), response.sessionLifetimeMs());
}
}
/**
* [2022-08-05 08:51:07,177] TRACE connections.max.reauth.ms for mechanism=OAUTHBEARER: 4000 (org.apache.kafka.common.security.authenticator.SaslServerAuthenticator:185)
* [2022-08-05 08:51:07,178] DEBUG Set SASL server state to HANDSHAKE_OR_VERSIONS_REQUEST during authentication (org.apache.kafka.common.security.authenticator.SaslServerAuthenticator:378)
* [2022-08-05 08:51:07,178] DEBUG Handling Kafka request SASL_HANDSHAKE during authentication (org.apache.kafka.common.security.authenticator.SaslServerAuthenticator:511)
* [2022-08-05 08:51:07,178] DEBUG Using SASL mechanism 'OAUTHBEARER' provided by client (org.apache.kafka.common.security.authenticator.SaslServerAuthenticator:557)
* [2022-08-05 08:51:07,180] DEBUG Set SASL server state to AUTHENTICATE during authentication (org.apache.kafka.common.security.authenticator.SaslServerAuthenticator:378)
* [2022-08-05 08:51:07,182] DEBUG Authentication complete; session max lifetime from broker config=4000 ms, credential expiration=Thu Jan 01 01:00:02 CET 1970 (2000 ms); session expiration = Thu Jan 01 01:00:02 CET 1970 (2000 ms), sending 2000 ms to client (org.apache.kafka.common.security.authenticator.SaslServerAuthenticator:690)
* [2022-08-05 08:51:07,182] DEBUG Set SASL server state to COMPLETE during authentication (org.apache.kafka.common.security.authenticator.SaslServerAuthenticator:378)
* @param mechanism
* @param transportLayer
* @param time
* @param maxReauth
* @return
*/
private SaslServerAuthenticator getSaslServerAuthenticatorForOAuth(String mechanism, TransportLayer transportLayer, Time time, Long maxReauth) {
Map<String, ?> configs = Collections.singletonMap(BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG,
Collections.singletonList(mechanism));
ChannelMetadataRegistry metadataRegistry = new DefaultChannelMetadataRegistry();
return setupAuthenticator(configs, transportLayer, mechanism, metadataRegistry, time, maxReauth);
}
private MockedStatic<?> mockSaslServer(SaslServer saslServer, String mechanism, Time time, Duration tokenExpirationDuration) throws SaslException {
when(saslServer.getMechanismName()).thenReturn(mechanism);
when(saslServer.evaluateResponse(any())).thenReturn(new byte[]{});
long millisToExpiration = tokenExpirationDuration.toMillis();
when(saslServer.getNegotiatedProperty(eq(SaslInternalConfigs.CREDENTIAL_LIFETIME_MS_SASL_NEGOTIATED_PROPERTY_KEY)))
.thenReturn(time.milliseconds() + millisToExpiration);
return Mockito.mockStatic(Sasl.class, (Answer<SaslServer>) invocation -> saslServer);
}
private MockedStatic<?> mockKafkaPrincipal(String principalType, String name) {
KafkaPrincipalBuilder kafkaPrincipalBuilder = mock(KafkaPrincipalBuilder.class);
when(kafkaPrincipalBuilder.build(any())).thenReturn(new KafkaPrincipal(principalType, name));
MockedStatic<ChannelBuilders> channelBuilders = Mockito.mockStatic(ChannelBuilders.class, Answers.RETURNS_MOCKS);
channelBuilders.when(() ->
ChannelBuilders.createPrincipalBuilder(anyMap(), any(KerberosShortNamer.class), any(SslPrincipalMapper.class))
).thenReturn(kafkaPrincipalBuilder);
return channelBuilders;
}
private void consumeSizeAndHeader(ByteBuffer responseBuffer) {
responseBuffer.getInt();
ResponseHeader.parse(responseBuffer, (short) 1);
}
private List<ByteBuffer> getResponses(TransportLayer transportLayer) throws IOException {
ArgumentCaptor<ByteBuffer[]> buffersCaptor = ArgumentCaptor.forClass(ByteBuffer[].class);
verify(transportLayer, times(2)).write(buffersCaptor.capture());
return buffersCaptor.getAllValues().stream()
.map(this::concatBuffers)
.collect(Collectors.toList());
}
private ByteBuffer concatBuffers(ByteBuffer[] buffers) {
int combinedCapacity = 0;
for (ByteBuffer buffer : buffers) {
combinedCapacity += buffer.capacity();
}
if (combinedCapacity > 0) {
ByteBuffer concat = ByteBuffer.allocate(combinedCapacity);
for (ByteBuffer buffer : buffers) {
concat.put(buffer);
}
return safeFlip(concat);
} else {
return ByteBuffer.allocate(0);
}
}
private ByteBuffer safeFlip(ByteBuffer buffer) {
return (ByteBuffer) ((Buffer) buffer).flip();
}
private SaslAuthenticateRequest saslAuthenticateRequest() {
SaslAuthenticateRequestData authenticateRequestData = new SaslAuthenticateRequestData();
return new SaslAuthenticateRequest.Builder(authenticateRequestData).build(ApiKeys.SASL_AUTHENTICATE.latestVersion());
}
private SaslHandshakeRequest saslHandshakeRequest(String mechanism) {
SaslHandshakeRequestData handshakeRequestData = new SaslHandshakeRequestData();
handshakeRequestData.setMechanism(mechanism);
return new SaslHandshakeRequest.Builder(handshakeRequestData).build(ApiKeys.SASL_HANDSHAKE.latestVersion());
}
private TransportLayer mockTransportLayer() throws IOException {
TransportLayer transportLayer = mock(TransportLayer.class, Answers.RETURNS_DEEP_STUBS);
when(transportLayer.socketChannel().socket().getInetAddress()).thenReturn(InetAddress.getLoopbackAddress());
when(transportLayer.write(any(ByteBuffer[].class))).thenReturn(Long.MAX_VALUE);
return transportLayer;
}
private void mockRequest(AbstractRequest request, TransportLayer transportLayer) throws IOException {
mockRequest(new RequestHeader(request.apiKey(), request.apiKey().latestVersion(), clientId, 0), request, transportLayer);
}
private void mockRequest(RequestHeader header, AbstractRequest request, TransportLayer transportLayer) throws IOException {
ByteBuffer headerBuffer = RequestTestUtils.serializeRequestHeader(header);
ByteBuffer requestBuffer = request.serialize();
requestBuffer.rewind();
when(transportLayer.read(any(ByteBuffer.class))).then(invocation -> {
invocation.<ByteBuffer>getArgument(0).putInt(headerBuffer.remaining() + requestBuffer.remaining());
return 4;
}).then(invocation -> {
invocation.<ByteBuffer>getArgument(0)
.put(headerBuffer.duplicate())
.put(requestBuffer.duplicate());
return headerBuffer.remaining() + requestBuffer.remaining();
});
}
private SaslServerAuthenticator setupAuthenticator(Map<String, ?> configs, TransportLayer transportLayer,
String mechanism, ChannelMetadataRegistry metadataRegistry) {
return setupAuthenticator(configs, transportLayer, mechanism, metadataRegistry, new MockTime(), null);
}
private SaslServerAuthenticator setupAuthenticator(Map<String, ?> configs, TransportLayer transportLayer,
String mechanism, ChannelMetadataRegistry metadataRegistry, Time time, Long maxReauth) {
TestJaasConfig jaasConfig = new TestJaasConfig();
jaasConfig.addEntry("jaasContext", PlainLoginModule.class.getName(), new HashMap<>());
Map<String, Subject> subjects = Collections.singletonMap(mechanism, new Subject());
Map<String, AuthenticateCallbackHandler> callbackHandlers = Collections.singletonMap(
mechanism, new SaslServerCallbackHandler());
ApiVersionsResponse apiVersionsResponse = ApiVersionsResponse.defaultApiVersionsResponse(
ApiMessageType.ListenerType.ZK_BROKER);
Map<String, Long> connectionsMaxReauthMsByMechanism = maxReauth != null ?
Collections.singletonMap(mechanism, maxReauth) : Collections.emptyMap();
// TODO: This?
Map<String, Integer> saslServerMaxReceiveSizeByMechanism = Collections.singletonMap(mechanism, Integer.MAX_VALUE);
// TODO: This?
boolean isInterBrokerListener = false;
return new SaslServerAuthenticator(configs, callbackHandlers, "node", subjects, null,
new ListenerName("ssl"), isInterBrokerListener, SecurityProtocol.SASL_SSL, transportLayer, connectionsMaxReauthMsByMechanism,
Collections.emptyMap(), Collections.emptyMap(), saslServerMaxReceiveSizeByMechanism, new MockAsyncAuthExecutor(), metadataRegistry, time, () -> apiVersionsResponse, new DefaultRequestCallbackManager());
}
}
\ No newline at end of file
<svg xmlns="http://www.w3.org/2000/svg" xmlns:xlink="http://www.w3.org/1999/xlink" width="94" height="20"><linearGradient id="b" x2="0" y2="100%"><stop offset="0" stop-color="#bbb" stop-opacity=".1"/><stop offset="1" stop-opacity=".1"/></linearGradient><clipPath id="a"><rect width="94" height="20" rx="3" fill="#fff"/></clipPath><g clip-path="url(#a)"><path fill="#555" d="M0 0h49v20H0z"/><path fill="#007ec6" d="M49 0h45v20H49z"/><path fill="url(#b)" d="M0 0h94v20H0z"/></g><g fill="#fff" text-anchor="middle" font-family="DejaVu Sans,Verdana,Geneva,sans-serif" font-size="110"><text x="255" y="150" fill="#010101" fill-opacity=".3" transform="scale(.1)" textLength="390">release</text><text x="255" y="140" transform="scale(.1)" textLength="390">release</text><text x="705" y="150" fill="#010101" fill-opacity=".3" transform="scale(.1)" textLength="350">v0.3861.0-7.3.0-0-ce</text><text x="705" y="140" transform="scale(.1)" textLength="350">v0.3861.0-7.3.0-0-ce</text></g> </svg>
<svg xmlns="http://www.w3.org/2000/svg" xmlns:xlink="http://www.w3.org/1999/xlink" width="94" height="20"><linearGradient id="b" x2="0" y2="100%"><stop offset="0" stop-color="#bbb" stop-opacity=".1"/><stop offset="1" stop-opacity=".1"/></linearGradient><clipPath id="a"><rect width="94" height="20" rx="3" fill="#fff"/></clipPath><g clip-path="url(#a)"><path fill="#555" d="M0 0h49v20H0z"/><path fill="#007ec6" d="M49 0h45v20H49z"/><path fill="url(#b)" d="M0 0h94v20H0z"/></g><g fill="#fff" text-anchor="middle" font-family="DejaVu Sans,Verdana,Geneva,sans-serif" font-size="110"><text x="255" y="150" fill="#010101" fill-opacity=".3" transform="scale(.1)" textLength="390">release</text><text x="255" y="140" transform="scale(.1)" textLength="390">release</text><text x="705" y="150" fill="#010101" fill-opacity=".3" transform="scale(.1)" textLength="350">v0.3863.0-7.3.0-0-ce</text><text x="705" y="140" transform="scale(.1)" textLength="350">v0.3863.0-7.3.0-0-ce</text></g> </svg>
0% 加载中 .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册