Skip to content
代码片段 群组 项目
未验证 提交 e15ea556 编辑于 作者: Brennan's avatar Brennan 提交者: GitHub
浏览文件

Cleanup when SendAsync with Upload stream (#13783)

上级 158d3f1c
No related branches found
No related tags found
无相关合并请求
......@@ -316,15 +316,9 @@ namespace Microsoft.AspNetCore.SignalR.Internal
_ = StreamResultsAsync(hubMethodInvocationMessage.InvocationId, connection, enumerable, scope, hubActivator, hub, cts, hubMethodInvocationMessage);
}
else if (string.IsNullOrEmpty(hubMethodInvocationMessage.InvocationId))
{
// Send Async, no response expected
invocation = ExecuteHubMethod(methodExecutor, hub, arguments);
}
else
{
// Invoke Async, one reponse expected
// Invoke or Send
async Task ExecuteInvocation()
{
object result;
......@@ -350,7 +344,12 @@ namespace Microsoft.AspNetCore.SignalR.Internal
}
}
await connection.WriteAsync(CompletionMessage.WithResult(hubMethodInvocationMessage.InvocationId, result));
// No InvocationId - Send Async, no response expected
if (!string.IsNullOrEmpty(hubMethodInvocationMessage.InvocationId))
{
// Invoke Async, one reponse expected
await connection.WriteAsync(CompletionMessage.WithResult(hubMethodInvocationMessage.InvocationId, result));
}
}
invocation = ExecuteInvocation();
}
......
......@@ -3402,6 +3402,46 @@ namespace Microsoft.AspNetCore.SignalR.Tests
}
}
[Fact]
public async Task UploadStreamFromSendReleasesHubActivatorOnceComplete()
{
using (StartVerifiableLog())
{
var serviceProvider = HubConnectionHandlerTestUtils.CreateServiceProvider(builder =>
{
builder.AddSingleton(typeof(IHubActivator<>), typeof(CustomHubActivator<>));
}, LoggerFactory);
var connectionHandler = serviceProvider.GetService<HubConnectionHandler<MethodHub>>();
using (var client = new TestClient())
{
var connectionHandlerTask = await client.ConnectAsync(connectionHandler).OrTimeout();
var hubActivator = serviceProvider.GetService<IHubActivator<MethodHub>>() as CustomHubActivator<MethodHub>;
var createTask = hubActivator.CreateTask.Task;
// null ID means we're doing a Send and not an Invoke
await client.BeginUploadStreamAsync(invocationId: null, nameof(MethodHub.StreamingConcat), streamIds: new[] { "id" }, args: Array.Empty<object>()).OrTimeout();
await client.SendHubMessageAsync(new StreamItemMessage("id", "hello")).OrTimeout();
await client.SendHubMessageAsync(new StreamItemMessage("id", " world")).OrTimeout();
await createTask.OrTimeout();
var tcs = hubActivator.ReleaseTask;
await client.SendHubMessageAsync(CompletionMessage.Empty("id")).OrTimeout();
await tcs.Task.OrTimeout();
// OnConnectedAsync and StreamingConcat hubs have been disposed
Assert.Equal(2, hubActivator.ReleaseCount);
// Shut down
client.Dispose();
await connectionHandlerTask.OrTimeout();
}
}
}
[Fact]
public async Task UploadStreamClosesStreamsOnServerWhenMethodCompletes()
{
......@@ -3740,6 +3780,8 @@ namespace Microsoft.AspNetCore.SignalR.Tests
{
public int ReleaseCount;
private IServiceProvider _serviceProvider;
public TaskCompletionSource<object> ReleaseTask = new TaskCompletionSource<object>();
public TaskCompletionSource<object> CreateTask = new TaskCompletionSource<object>();
public CustomHubActivator(IServiceProvider serviceProvider)
{
......@@ -3748,13 +3790,18 @@ namespace Microsoft.AspNetCore.SignalR.Tests
public THub Create()
{
return new DefaultHubActivator<THub>(_serviceProvider).Create();
ReleaseTask = new TaskCompletionSource<object>();
var hub = new DefaultHubActivator<THub>(_serviceProvider).Create();
CreateTask.TrySetResult(null);
return hub;
}
public void Release(THub hub)
{
ReleaseCount++;
hub.Dispose();
ReleaseTask.TrySetResult(null);
CreateTask = new TaskCompletionSource<object>();
}
}
......
0% 加载中 .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册