diff --git a/clients/java/signalr/src/main/java/com/microsoft/signalr/HubConnection.java b/clients/java/signalr/src/main/java/com/microsoft/signalr/HubConnection.java index a904f9dfbf5757cf233e591ce8f2a4a3bfa34f83..f799c6ddb1832377bc0a0f4cd7c6e8ddc7770544 100644 --- a/clients/java/signalr/src/main/java/com/microsoft/signalr/HubConnection.java +++ b/clients/java/signalr/src/main/java/com/microsoft/signalr/HubConnection.java @@ -6,11 +6,15 @@ package com.microsoft.signalr; import java.io.IOException; import java.time.Duration; import java.util.ArrayList; +import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Timer; +import java.util.TimerTask; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.function.Consumer; @@ -39,9 +43,36 @@ public class HubConnection { private ConnectionState connectionState = null; private HttpClient httpClient; private String stopError; + private Timer pingTimer = null; + private AtomicLong nextServerTimeout = new AtomicLong(); + private AtomicLong nextPingActivation = new AtomicLong(); + private Duration keepAliveInterval = Duration.ofSeconds(15); + private Duration serverTimeout = Duration.ofSeconds(30); + private Duration tickRate = Duration.ofSeconds(1); private CompletableFuture<Void> handshakeResponseFuture; private Duration handshakeResponseTimeout = Duration.ofSeconds(15); + public void setServerTimeout(Duration serverTimeout) { + this.serverTimeout = serverTimeout; + } + + public Duration getServerTimeout() { + return this.serverTimeout; + } + + public void setKeepAliveInterval(Duration keepAliveInterval) { + this.keepAliveInterval = keepAliveInterval; + } + + public Duration getKeepAliveInterval() { + return this.keepAliveInterval; + } + + // For testing purposes + void setTickRate(Duration tickRate) { + this.tickRate = tickRate; + } + HubConnection(String url, Transport transport, boolean skipNegotiate, Logger logger, HttpClient httpClient, Single<String> accessTokenProvider, Duration handshakeResponseTimeout) { if (url == null || url.isEmpty()) { throw new IllegalArgumentException("A valid url is required."); @@ -79,6 +110,7 @@ public class HubConnection { this.skipNegotiate = skipNegotiate; this.callback = (payload) -> { + resetServerTimeout(); if (!handshakeReceived) { int handshakeLength = payload.indexOf(RECORD_SEPARATOR) + 1; String handshakeResponseString = payload.substring(0, handshakeLength - 1); @@ -245,6 +277,29 @@ public class HubConnection { hubConnectionState = HubConnectionState.CONNECTED; connectionState = new ConnectionState(this); logger.log(LogLevel.Information, "HubConnection started."); + + resetServerTimeout(); + this.pingTimer = new Timer(); + this.pingTimer.schedule(new TimerTask() { + @Override + public void run() { + try { + if (System.currentTimeMillis() > nextServerTimeout.get()) { + stop("Server timeout elapsed without receiving a message from the server."); + return; + } + + if (System.currentTimeMillis() > nextPingActivation.get()) { + sendHubMessage(PingMessage.getInstance()); + } + } catch (Exception e) { + logger.log(LogLevel.Warning, String.format("Error sending ping: %s", e.getMessage())); + // The connection is probably in a bad or closed state now, cleanup the timer so + // it stops triggering + pingTimer.cancel(); + } + } + }, new Date(0), tickRate.toMillis()); } finally { hubConnectionStateLock.unlock(); } @@ -399,11 +454,21 @@ public class HubConnection { private void sendHubMessage(HubMessage message) throws Exception { String serializedMessage = protocol.writeMessage(message); if (message.getMessageType() == HubMessageType.INVOCATION) { - logger.log(LogLevel.Debug, "Sending %d message '%s'.", message.getMessageType().value, ((InvocationMessage)message).getInvocationId()); + logger.log(LogLevel.Debug, "Sending %s message '%s'.", message.getMessageType().name(), ((InvocationMessage)message).getInvocationId()); } else { - logger.log(LogLevel.Debug, "Sending %d message.", message.getMessageType().value); + logger.log(LogLevel.Debug, "Sending %s message.", message.getMessageType().name()); } transport.send(serializedMessage); + + resetKeepAlive(); + } + + private void resetServerTimeout() { + this.nextServerTimeout.set(System.currentTimeMillis() + serverTimeout.toMillis()); + } + + private void resetKeepAlive() { + this.nextPingActivation.set(System.currentTimeMillis() + keepAliveInterval.toMillis()); } /** diff --git a/clients/java/signalr/src/main/java/com/microsoft/signalr/PingMessage.java b/clients/java/signalr/src/main/java/com/microsoft/signalr/PingMessage.java index 6e6e04d5dc5c5d79037a3f6d68e2e643bb2767d3..318de059ddda7f925babce623959fe4431ccd00f 100644 --- a/clients/java/signalr/src/main/java/com/microsoft/signalr/PingMessage.java +++ b/clients/java/signalr/src/main/java/com/microsoft/signalr/PingMessage.java @@ -5,6 +5,8 @@ package com.microsoft.signalr; class PingMessage extends HubMessage { + int type = HubMessageType.PING.value; + private static PingMessage instance = new PingMessage(); private PingMessage() diff --git a/clients/java/signalr/src/test/java/com/microsoft/signalr/HubConnectionTest.java b/clients/java/signalr/src/test/java/com/microsoft/signalr/HubConnectionTest.java index 1cc7550a3c3a591524cc4214b79051839421a2f3..ba11f88ce1f9f6ba9a58045baedd3854cb7b43a5 100644 --- a/clients/java/signalr/src/test/java/com/microsoft/signalr/HubConnectionTest.java +++ b/clients/java/signalr/src/test/java/com/microsoft/signalr/HubConnectionTest.java @@ -35,7 +35,7 @@ class HubConnectionTest { @Test public void transportCloseTriggersStopInHubConnection() throws Exception { - MockTransport mockTransport = new MockTransport(true); + MockTransport mockTransport = new MockTransport(); HubConnection hubConnection = TestUtils.createHubConnection("http://example.com", mockTransport); hubConnection.start().blockingAwait(1000, TimeUnit.MILLISECONDS); assertEquals(HubConnectionState.CONNECTED, hubConnection.getConnectionState()); @@ -46,7 +46,7 @@ class HubConnectionTest { @Test public void transportCloseWithErrorTriggersStopInHubConnection() throws Exception { - MockTransport mockTransport = new MockTransport(true); + MockTransport mockTransport = new MockTransport(); AtomicReference<String> message = new AtomicReference<>(); HubConnection hubConnection = TestUtils.createHubConnection("http://example.com", mockTransport); String errorMessage = "Example transport error."; @@ -63,7 +63,7 @@ class HubConnectionTest { @Test public void checkHubConnectionStateNoHandShakeResponse() { - MockTransport mockTransport = new MockTransport(); + MockTransport mockTransport = new MockTransport(false); HubConnection hubConnection = HubConnectionBuilder.create("http://example.com") .withTransport(mockTransport) .withHttpClient(new TestHttpClient()) @@ -79,7 +79,7 @@ class HubConnectionTest { @Test public void constructHubConnectionWithHttpConnectionOptions() { - Transport mockTransport = new MockTransport(true); + Transport mockTransport = new MockTransport(); HubConnection hubConnection = TestUtils.createHubConnection("http://example.com", mockTransport); hubConnection.start(); @@ -95,7 +95,6 @@ class HubConnectionTest { HubConnection hubConnection = TestUtils.createHubConnection("http://example.com", mockTransport); hubConnection.start(); - mockTransport.receiveMessage("{}" + RECORD_SEPARATOR); assertEquals(HubConnectionState.CONNECTED, hubConnection.getConnectionState()); @@ -106,7 +105,7 @@ class HubConnectionTest { @Test public void invalidHandShakeResponse() throws Exception { - MockTransport mockTransport = new MockTransport(); + MockTransport mockTransport = new MockTransport(false); HubConnection hubConnection = TestUtils.createHubConnection("http://example.com", mockTransport); hubConnection.start(); @@ -118,7 +117,7 @@ class HubConnectionTest { @Test public void hubConnectionReceiveHandshakeResponseWithError() { - MockTransport mockTransport = new MockTransport(); + MockTransport mockTransport = new MockTransport(false); HubConnection hubConnection = TestUtils.createHubConnection("http://example.com", mockTransport); hubConnection.start(); @@ -145,7 +144,6 @@ class HubConnectionTest { assertEquals(expectedHanshakeRequest, message); - mockTransport.receiveMessage("{}" + RECORD_SEPARATOR); mockTransport.receiveMessage("{\"type\":1,\"target\":\"inc\",\"arguments\":[]}" + RECORD_SEPARATOR); // Confirming that our handler was called and that the counter property was incremented. @@ -169,7 +167,6 @@ class HubConnectionTest { assertEquals(expectedHanshakeRequest, message); - mockTransport.receiveMessage("{}" + RECORD_SEPARATOR); mockTransport.receiveMessage("{\"type\":1,\"target\":\"inc\",\"arguments\":[]}" + RECORD_SEPARATOR); // Confirming that our handler was called and that the counter property was incremented. @@ -197,7 +194,6 @@ class HubConnectionTest { assertEquals(expectedHanshakeRequest, message); - mockTransport.receiveMessage("{}" + RECORD_SEPARATOR); mockTransport.receiveMessage("{\"type\":1,\"target\":\"inc\",\"arguments\":[]}" + RECORD_SEPARATOR); // Confirming that the handler was removed. @@ -223,7 +219,6 @@ class HubConnectionTest { assertEquals(expectedHanshakeRequest, message); - mockTransport.receiveMessage("{}" + RECORD_SEPARATOR); mockTransport.receiveMessage("{\"type\":1,\"target\":\"inc\",\"arguments\":[]}" + RECORD_SEPARATOR); assertEquals(Double.valueOf(3), value.get()); @@ -253,7 +248,6 @@ class HubConnectionTest { assertEquals(expectedHanshakeRequest, message); - mockTransport.receiveMessage("{}" + RECORD_SEPARATOR); mockTransport.receiveMessage("{\"type\":1,\"target\":\"inc\",\"arguments\":[]}" + RECORD_SEPARATOR); // Confirming that our handler was called and that the counter property was incremented. @@ -286,7 +280,6 @@ class HubConnectionTest { assertEquals(expectedHanshakeRequest, message); - mockTransport.receiveMessage("{}" + RECORD_SEPARATOR); mockTransport.receiveMessage("{\"type\":1,\"target\":\"inc\",\"arguments\":[]}" + RECORD_SEPARATOR); // Confirming that our handler was called and that the counter property was incremented. @@ -322,7 +315,6 @@ class HubConnectionTest { assertEquals(expectedHanshakeRequest, message); - mockTransport.receiveMessage("{}" + RECORD_SEPARATOR); mockTransport.receiveMessage("{\"type\":1,\"target\":\"inc\",\"arguments\":[]}" + RECORD_SEPARATOR); // Confirming that our handler was called and that the counter property was incremented. assertEquals(Double.valueOf(3), value.get()); @@ -346,7 +338,6 @@ class HubConnectionTest { assertEquals(Double.valueOf(0), value.get()); hubConnection.start(); - mockTransport.receiveMessage("{}" + RECORD_SEPARATOR); try { mockTransport.receiveMessage("{\"type\":1,\"target\":\"inc\",\"arguments\":[]}" + RECORD_SEPARATOR); @@ -371,7 +362,6 @@ class HubConnectionTest { assertEquals(Double.valueOf(0), value.get()); hubConnection.start(); - mockTransport.receiveMessage("{}" + RECORD_SEPARATOR); mockTransport.receiveMessage("{\"type\":1,\"target\":\"add\",\"arguments\":[12]}" + RECORD_SEPARATOR); // Confirming that our handler was called and the correct message was passed in. @@ -384,7 +374,6 @@ class HubConnectionTest { HubConnection hubConnection = TestUtils.createHubConnection("http://example.com", mockTransport); hubConnection.start(); - mockTransport.receiveMessage("{}" + RECORD_SEPARATOR); AtomicBoolean done = new AtomicBoolean(); Single<Integer> result = hubConnection.invoke(Integer.class, "echo", "message"); @@ -403,7 +392,6 @@ class HubConnectionTest { HubConnection hubConnection = TestUtils.createHubConnection("http://example.com", mockTransport); hubConnection.start(); - mockTransport.receiveMessage("{}" + RECORD_SEPARATOR); AtomicBoolean doneFirst = new AtomicBoolean(); AtomicBoolean doneSecond = new AtomicBoolean(); @@ -430,7 +418,6 @@ class HubConnectionTest { HubConnection hubConnection = TestUtils.createHubConnection("http://example.com", mockTransport); hubConnection.start(); - mockTransport.receiveMessage("{}" + RECORD_SEPARATOR); AtomicBoolean done = new AtomicBoolean(); // int.class is a primitive type and since we use Class.cast to cast an Object to the expected return type @@ -450,7 +437,6 @@ class HubConnectionTest { HubConnection hubConnection = TestUtils.createHubConnection("http://example.com", mockTransport); hubConnection.start(); - mockTransport.receiveMessage("{}" + RECORD_SEPARATOR); AtomicBoolean done = new AtomicBoolean(); Single<Integer> result = hubConnection.invoke(int.class, "echo", "message"); @@ -476,7 +462,6 @@ class HubConnectionTest { HubConnection hubConnection = TestUtils.createHubConnection("http://example.com", mockTransport); hubConnection.start(); - mockTransport.receiveMessage("{}" + RECORD_SEPARATOR); AtomicBoolean done = new AtomicBoolean(); Single<Integer> result = hubConnection.invoke(int.class, "echo", "message"); @@ -508,7 +493,6 @@ class HubConnectionTest { }); hubConnection.start(); - mockTransport.receiveMessage("{}" + RECORD_SEPARATOR); mockTransport.receiveMessage("{\"type\":1,\"target\":\"inc\",\"arguments\":[]}" + RECORD_SEPARATOR); // Confirming that our handler was called and that the counter property was incremented. @@ -527,7 +511,6 @@ class HubConnectionTest { }, String.class); hubConnection.start(); - mockTransport.receiveMessage("{}" + RECORD_SEPARATOR); mockTransport.receiveMessage("{\"type\":1,\"target\":\"inc\",\"arguments\":[\"Hello World\"]}" + RECORD_SEPARATOR); hubConnection.send("inc", "Hello World"); @@ -552,7 +535,6 @@ class HubConnectionTest { }, String.class, Double.class); hubConnection.start(); - mockTransport.receiveMessage("{}" + RECORD_SEPARATOR); mockTransport.receiveMessage("{\"type\":1,\"target\":\"inc\",\"arguments\":[\"Hello World\", 12]}" + RECORD_SEPARATOR); hubConnection.send("inc", "Hello World", 12); @@ -581,7 +563,6 @@ class HubConnectionTest { }, String.class, String.class, String.class); hubConnection.start(); - mockTransport.receiveMessage("{}" + RECORD_SEPARATOR); mockTransport.receiveMessage("{\"type\":1,\"target\":\"inc\",\"arguments\":[\"A\", \"B\", \"C\"]}" + RECORD_SEPARATOR); hubConnection.send("inc", "A", "B", "C"); @@ -614,7 +595,6 @@ class HubConnectionTest { }, String.class, String.class, String.class, String.class); hubConnection.start(); - mockTransport.receiveMessage("{}" + RECORD_SEPARATOR); mockTransport.receiveMessage("{\"type\":1,\"target\":\"inc\",\"arguments\":[\"A\", \"B\", \"C\", \"D\"]}" + RECORD_SEPARATOR); // Confirming that our handler was called and the correct message was passed in. @@ -650,7 +630,6 @@ class HubConnectionTest { }, String.class, String.class, String.class, Boolean.class, Double.class); hubConnection.start(); - mockTransport.receiveMessage("{}" + RECORD_SEPARATOR); mockTransport.receiveMessage("{\"type\":1,\"target\":\"inc\",\"arguments\":[\"A\", \"B\", \"C\",true,12 ]}" + RECORD_SEPARATOR); // Confirming that our handler was called and the correct message was passed in. @@ -690,7 +669,6 @@ class HubConnectionTest { }, String.class, String.class, String.class, Boolean.class, Double.class, String.class); hubConnection.start(); - mockTransport.receiveMessage("{}" + RECORD_SEPARATOR); mockTransport.receiveMessage("{\"type\":1,\"target\":\"inc\",\"arguments\":[\"A\", \"B\", \"C\",true,12,\"D\"]}" + RECORD_SEPARATOR); // Confirming that our handler was called and the correct message was passed in. @@ -734,7 +712,6 @@ class HubConnectionTest { }, String.class, String.class, String.class, Boolean.class, Double.class, String.class, String.class); hubConnection.start(); - mockTransport.receiveMessage("{}" + RECORD_SEPARATOR); mockTransport.receiveMessage("{\"type\":1,\"target\":\"inc\",\"arguments\":[\"A\", \"B\", \"C\",true,12,\"D\",\"E\"]}" + RECORD_SEPARATOR); // Confirming that our handler was called and the correct message was passed in. @@ -782,7 +759,6 @@ class HubConnectionTest { }, String.class, String.class, String.class, Boolean.class, Double.class, String.class, String.class, String.class); hubConnection.start(); - mockTransport.receiveMessage("{}" + RECORD_SEPARATOR); mockTransport.receiveMessage("{\"type\":1,\"target\":\"inc\",\"arguments\":[\"A\", \"B\", \"C\",true,12,\"D\",\"E\",\"F\"]}" + RECORD_SEPARATOR); // Confirming that our handler was called and the correct message was passed in. assertEquals("A", value1.get()); @@ -815,7 +791,6 @@ class HubConnectionTest { }, Custom.class); hubConnection.start(); - mockTransport.receiveMessage("{}" + RECORD_SEPARATOR); mockTransport.receiveMessage("{\"type\":1,\"target\":\"inc\",\"arguments\":[{\"number\":1,\"str\":\"A\",\"bools\":[true,false]}]}" + RECORD_SEPARATOR); // Confirming that our handler was called and the correct message was passed in. @@ -830,7 +805,7 @@ class HubConnectionTest { @Test public void receiveHandshakeResponseAndMessage() throws Exception { AtomicReference<Double> value = new AtomicReference<Double>(0.0); - MockTransport mockTransport = new MockTransport(); + MockTransport mockTransport = new MockTransport(false); HubConnection hubConnection = TestUtils.createHubConnection("http://example.com", mockTransport); hubConnection.on("inc", () ->{ @@ -898,7 +873,6 @@ class HubConnectionTest { assertEquals(ex.getMessage(), "There was an error"); }); hubConnection.start(); - mockTransport.receiveMessage("{}" + RECORD_SEPARATOR); assertEquals(HubConnectionState.CONNECTED, hubConnection.getConnectionState()); @@ -938,7 +912,6 @@ class HubConnectionTest { }, String.class); Completable startFuture = hubConnection.start(); - mockTransport.receiveMessage("{}" + RECORD_SEPARATOR); startFuture.blockingAwait(1000, TimeUnit.MILLISECONDS); RuntimeException exception = assertThrows(RuntimeException.class, () -> mockTransport.receiveMessage("{\"type\":1,\"target\":\"Send\",\"arguments\":[]}" + RECORD_SEPARATOR)); @@ -1091,9 +1064,43 @@ class HubConnectionTest { assertEquals("Bearer newToken", token.get()); } + @Test + public void connectionTimesOutIfServerDoesNotSendMessage() throws Exception { + HubConnection hubConnection = TestUtils.createHubConnection("http://example.com"); + hubConnection.setServerTimeout(Duration.ofMillis(1)); + hubConnection.setTickRate(Duration.ofMillis(1)); + CompletableFuture<Exception> closedFuture = new CompletableFuture<>(); + hubConnection.onClosed((e) -> { + closedFuture.complete(e); + }); + + hubConnection.start().blockingAwait(1000, TimeUnit.MILLISECONDS); + + assertEquals("Server timeout elapsed without receiving a message from the server.", closedFuture.get(1000, TimeUnit.MILLISECONDS).getMessage()); + } + + @Test + public void connectionSendsPingsRegularly() throws InterruptedException, ExecutionException, TimeoutException, Exception { + MockTransport mockTransport = new MockTransport(true, false); + HubConnection hubConnection = TestUtils.createHubConnection("http://example.com", mockTransport); + hubConnection.setKeepAliveInterval(Duration.ofMillis(1)); + hubConnection.setTickRate(Duration.ofMillis(1)); + + hubConnection.start().blockingAwait(1000, TimeUnit.MILLISECONDS); + + TimeUnit.MILLISECONDS.sleep(100); + hubConnection.stop(); + + String[] sentMessages = mockTransport.getSentMessages(); + assertTrue(sentMessages.length > 1); + for (int i = 1; i < sentMessages.length; i++) { + assertEquals("{\"type\":6}" + RECORD_SEPARATOR, sentMessages[i]); + } + } + @Test public void hubConnectionCanBeStartedAfterBeingStopped() throws Exception { - MockTransport transport = new MockTransport(true); + MockTransport transport = new MockTransport(); HubConnection hubConnection = HubConnectionBuilder .create("http://example.com") .withTransport(transport) @@ -1112,7 +1119,7 @@ class HubConnectionTest { @Test public void hubConnectionCanBeStartedAfterBeingStoppedAndRedirected() throws Exception { - MockTransport mockTransport = new MockTransport(true); + MockTransport mockTransport = new MockTransport(); TestHttpClient client = new TestHttpClient() .on("POST", "http://example.com/negotiate", (req) -> CompletableFuture .completedFuture(new HttpResponse(200, "", "{\"url\":\"http://testexample.com/\"}"))) diff --git a/clients/java/signalr/src/test/java/com/microsoft/signalr/MockTransport.java b/clients/java/signalr/src/test/java/com/microsoft/signalr/MockTransport.java index 10677d56f4340e3f76b8e42289ce516b597e97b6..b77e0f79991170d5c375cacc3c3d077822c04866 100644 --- a/clients/java/signalr/src/test/java/com/microsoft/signalr/MockTransport.java +++ b/clients/java/signalr/src/test/java/com/microsoft/signalr/MockTransport.java @@ -12,15 +12,22 @@ class MockTransport implements Transport { private ArrayList<String> sentMessages = new ArrayList<>(); private String url; private Consumer<String> onClose; - private boolean autoHandshake; + final private boolean ignorePings; + final private boolean autoHandshake; private static final String RECORD_SEPARATOR = "\u001e"; public MockTransport() { + this(true, true); } public MockTransport(boolean autoHandshake) { + this(autoHandshake, true); + } + + public MockTransport(boolean autoHandshake, boolean ignorePings) { this.autoHandshake = autoHandshake; + this.ignorePings = ignorePings; } @Override @@ -38,7 +45,9 @@ class MockTransport implements Transport { @Override public CompletableFuture send(String message) { - sentMessages.add(message); + if (!(ignorePings && message.equals("{\"type\":6}" + RECORD_SEPARATOR))) { + sentMessages.add(message); + } return CompletableFuture.completedFuture(null); }