diff --git a/src/Servers/Kestrel/Transport.Sockets/src/Client/SocketConnectionFactory.cs b/src/Servers/Kestrel/Transport.Sockets/src/Client/SocketConnectionFactory.cs index b0eb73fa960aa0f5da119d27028fc5aa60d01017..33d7beeb40969c1986ce0f9bb4cac5694fd96e86 100644 --- a/src/Servers/Kestrel/Transport.Sockets/src/Client/SocketConnectionFactory.cs +++ b/src/Servers/Kestrel/Transport.Sockets/src/Client/SocketConnectionFactory.cs @@ -81,6 +81,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets _outputOptions, _options.WaitForDataBeforeAllocatingBuffer); + socketConnection.Start(); return socketConnection; } diff --git a/src/Servers/Kestrel/Transport.Sockets/src/Internal/SocketConnection.DuplexPipe.cs b/src/Servers/Kestrel/Transport.Sockets/src/Internal/SocketConnection.DuplexPipe.cs deleted file mode 100644 index 2a115e21c8dc5905d7ee11baf46621afc67b9478..0000000000000000000000000000000000000000 --- a/src/Servers/Kestrel/Transport.Sockets/src/Internal/SocketConnection.DuplexPipe.cs +++ /dev/null @@ -1,25 +0,0 @@ -// Licensed to the .NET Foundation under one or more agreements. -// The .NET Foundation licenses this file to you under the MIT license. - -using System.IO.Pipelines; - -namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.Internal -{ - internal sealed partial class SocketConnection - { - // We could implement this on SocketConnection to remove an extra allocation but this is a - // bit cleaner - private class SocketDuplexPipe : IDuplexPipe - { - public SocketDuplexPipe(SocketConnection connection) - { - Input = new SocketPipeReader(connection); - Output = new SocketPipeWriter(connection); - } - - public PipeReader Input { get; } - - public PipeWriter Output { get; } - } - } -} diff --git a/src/Servers/Kestrel/Transport.Sockets/src/Internal/SocketConnection.PipeReader.cs b/src/Servers/Kestrel/Transport.Sockets/src/Internal/SocketConnection.PipeReader.cs deleted file mode 100644 index 8f6db8061bff2d513d664240e8782336878edda3..0000000000000000000000000000000000000000 --- a/src/Servers/Kestrel/Transport.Sockets/src/Internal/SocketConnection.PipeReader.cs +++ /dev/null @@ -1,77 +0,0 @@ -// Licensed to the .NET Foundation under one or more agreements. -// The .NET Foundation licenses this file to you under the MIT license. - -using System.IO.Pipelines; - -namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.Internal -{ - internal sealed partial class SocketConnection - { - private class SocketPipeReader : PipeReader - { - private readonly SocketConnection _socketConnection; - private readonly PipeReader _reader; - - public SocketPipeReader(SocketConnection socketConnection) - { - _socketConnection = socketConnection; - _reader = socketConnection.InnerTransport.Input; - } - - public override void AdvanceTo(SequencePosition consumed) - { - _reader.AdvanceTo(consumed); - } - - public override void AdvanceTo(SequencePosition consumed, SequencePosition examined) - { - _reader.AdvanceTo(consumed, examined); - } - - public override void CancelPendingRead() - { - _reader.CancelPendingRead(); - } - - public override void Complete(Exception? exception = null) - { - _reader.Complete(exception); - } - - public override ValueTask CompleteAsync(Exception? exception = null) - { - return _reader.CompleteAsync(exception); - } - - public override Task CopyToAsync(PipeWriter destination, CancellationToken cancellationToken = default) - { - _socketConnection.EnsureStarted(); - return _reader.CopyToAsync(destination, cancellationToken); - } - - public override Task CopyToAsync(Stream destination, CancellationToken cancellationToken = default) - { - _socketConnection.EnsureStarted(); - return _reader.CopyToAsync(destination, cancellationToken); - } - - protected override ValueTask<ReadResult> ReadAtLeastAsyncCore(int minimumSize, CancellationToken cancellationToken) - { - _socketConnection.EnsureStarted(); - return _reader.ReadAtLeastAsync(minimumSize, cancellationToken); - } - - public override ValueTask<ReadResult> ReadAsync(CancellationToken cancellationToken = default) - { - _socketConnection.EnsureStarted(); - return _reader.ReadAsync(cancellationToken); - } - - public override bool TryRead(out ReadResult result) - { - _socketConnection.EnsureStarted(); - return _reader.TryRead(out result); - } - } - } -} diff --git a/src/Servers/Kestrel/Transport.Sockets/src/Internal/SocketConnection.PipeWriter.cs b/src/Servers/Kestrel/Transport.Sockets/src/Internal/SocketConnection.PipeWriter.cs deleted file mode 100644 index 913b835d0f40d9dd274ed2c247ff1320b4735468..0000000000000000000000000000000000000000 --- a/src/Servers/Kestrel/Transport.Sockets/src/Internal/SocketConnection.PipeWriter.cs +++ /dev/null @@ -1,68 +0,0 @@ -// Licensed to the .NET Foundation under one or more agreements. -// The .NET Foundation licenses this file to you under the MIT license. - -using System.IO.Pipelines; - -namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.Internal -{ - internal sealed partial class SocketConnection - { - private class SocketPipeWriter : PipeWriter - { - private readonly SocketConnection _socketConnection; - private readonly PipeWriter _writer; - - public SocketPipeWriter(SocketConnection socketConnection) - { - _socketConnection = socketConnection; - _writer = socketConnection.InnerTransport.Output; - } - - public override bool CanGetUnflushedBytes => _writer.CanGetUnflushedBytes; - - public override long UnflushedBytes => _writer.UnflushedBytes; - - public override void Advance(int bytes) - { - _writer.Advance(bytes); - } - - public override void CancelPendingFlush() - { - _writer.CancelPendingFlush(); - } - - public override void Complete(Exception? exception = null) - { - _writer.Complete(exception); - } - - public override ValueTask CompleteAsync(Exception? exception = null) - { - return _writer.CompleteAsync(exception); - } - - public override ValueTask<FlushResult> WriteAsync(ReadOnlyMemory<byte> source, CancellationToken cancellationToken = default) - { - _socketConnection.EnsureStarted(); - return _writer.WriteAsync(source, cancellationToken); - } - - public override ValueTask<FlushResult> FlushAsync(CancellationToken cancellationToken = default) - { - _socketConnection.EnsureStarted(); - return _writer.FlushAsync(cancellationToken); - } - - public override Memory<byte> GetMemory(int sizeHint = 0) - { - return _writer.GetMemory(sizeHint); - } - - public override Span<byte> GetSpan(int sizeHint = 0) - { - return _writer.GetSpan(sizeHint); - } - } - } -} diff --git a/src/Servers/Kestrel/Transport.Sockets/src/Internal/SocketConnection.cs b/src/Servers/Kestrel/Transport.Sockets/src/Internal/SocketConnection.cs index ccc888cc1325b60f000cfb2e54b3d3b8871f5083..5c076a38b2ce8348e52ba4f8584f78ba4f02e12a 100644 --- a/src/Servers/Kestrel/Transport.Sockets/src/Internal/SocketConnection.cs +++ b/src/Servers/Kestrel/Transport.Sockets/src/Internal/SocketConnection.cs @@ -33,7 +33,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.Internal private readonly TaskCompletionSource _waitForConnectionClosedTcs = new TaskCompletionSource(); private bool _connectionClosed; private readonly bool _waitForData; - private int _connectionStarted; internal SocketConnection(Socket socket, MemoryPool<byte> memoryPool, @@ -68,32 +67,31 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.Internal var pair = DuplexPipe.CreateConnectionPair(inputOptions, outputOptions); - _originalTransport = pair.Transport; + // Set the transport and connection id + Transport = _originalTransport = pair.Transport; Application = pair.Application; - Transport = new SocketDuplexPipe(this); - InitializeFeatures(); } - public IDuplexPipe InnerTransport => _originalTransport; - public PipeWriter Input => Application.Output; public PipeReader Output => Application.Input; public override MemoryPool<byte> MemoryPool { get; } - private void EnsureStarted() + public void Start() { - if (_connectionStarted == 1 || Interlocked.CompareExchange(ref _connectionStarted, 1, 0) == 1) + try { - return; + // Spawn send and receive logic + _receivingTask = DoReceive(); + _sendingTask = DoSend(); + } + catch (Exception ex) + { + _trace.LogError(0, ex, $"Unexpected exception in {nameof(SocketConnection)}.{nameof(Start)}."); } - - // Offload these to avoid potentially blocking the first read/write/flush - _receivingTask = Task.Run(DoReceive); - _sendingTask = Task.Run(DoSend); } public override void Abort(ConnectionAbortedException abortReason) @@ -108,9 +106,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.Internal // Only called after connection middleware is complete which means the ConnectionClosed token has fired. public override async ValueTask DisposeAsync() { - // Just in case we haven't started the connection, start it here so we can clean up properly. - EnsureStarted(); - _originalTransport.Input.Complete(); _originalTransport.Output.Complete(); @@ -130,7 +125,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.Internal } catch (Exception ex) { - _trace.LogError(0, ex, $"Unexpected exception in {nameof(SocketConnection)}."); + _trace.LogError(0, ex, $"Unexpected exception in {nameof(SocketConnection)}.{nameof(Start)}."); } finally { diff --git a/src/Servers/Kestrel/Transport.Sockets/src/SocketConnectionListener.cs b/src/Servers/Kestrel/Transport.Sockets/src/SocketConnectionListener.cs index 5cd57fcf1ff9baa0b21326a7338fa187bb554b07..45daa310ca1d3e7b3ebf77263651325c1eb79cfc 100644 --- a/src/Servers/Kestrel/Transport.Sockets/src/SocketConnectionListener.cs +++ b/src/Servers/Kestrel/Transport.Sockets/src/SocketConnectionListener.cs @@ -136,6 +136,8 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets setting.OutputOptions, waitForData: _options.WaitForDataBeforeAllocatingBuffer); + connection.Start(); + _settingsIndex = (_settingsIndex + 1) % _settingsCount; return connection; diff --git a/src/Servers/Kestrel/test/Sockets.FunctionalTests/SocketTranspotTests.cs b/src/Servers/Kestrel/test/Sockets.FunctionalTests/SocketTranspotTests.cs index 256b8bac1dd197e92cabbebd0eabd7b1548643dc..acadeaff81ba4c51931177f73bb2c6c12cb968dc 100644 --- a/src/Servers/Kestrel/test/Sockets.FunctionalTests/SocketTranspotTests.cs +++ b/src/Servers/Kestrel/test/Sockets.FunctionalTests/SocketTranspotTests.cs @@ -6,12 +6,10 @@ using System.Diagnostics; using System.Net; using System.Net.Http; using System.Net.Sockets; -using System.Text; using System.Threading.Tasks; using Microsoft.AspNetCore.Builder; using Microsoft.AspNetCore.Connections.Features; using Microsoft.AspNetCore.Hosting; -using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http; using Microsoft.AspNetCore.Server.Kestrel.FunctionalTests; using Microsoft.AspNetCore.Testing; using Microsoft.Extensions.Hosting; @@ -60,158 +58,5 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Sockets.FunctionalTests await host.StopAsync(); } - - [Fact] - public async Task CanReadAndWriteFromSocketFeatureInConnectionMiddleware() - { - var builder = TransportSelector.GetHostBuilder() - .ConfigureWebHost(webHostBuilder => - { - webHostBuilder - .UseKestrel(options => - { - options.ListenAnyIP(0, lo => - { - lo.Use(next => - { - return async connection => - { - var socket = connection.Features.Get<IConnectionSocketFeature>().Socket; - Assert.NotNull(socket); - - var buffer = new byte[4096]; - - var read = await socket.ReceiveAsync(buffer, SocketFlags.None); - - static void ParseHttp(ReadOnlySequence<byte> data) - { - var parser = new HttpParser<ParserHandler>(); - var handler = new ParserHandler(); - - var reader = new SequenceReader<byte>(data); - - // Assume we can parse the HTTP request in a single buffer - Assert.True(parser.ParseRequestLine(handler, ref reader)); - Assert.True(parser.ParseHeaders(handler, ref reader)); - - Assert.Equal(KestrelHttpMethod.Get, handler.HttpMethod); - Assert.Equal(KestrelHttpVersion.Http11, handler.HttpVersion); - } - - ParseHttp(new ReadOnlySequence<byte>(buffer[0..read])); - - await socket.SendAsync(Encoding.UTF8.GetBytes("HTTP/1.1 200 OK\r\nConnection: close\r\n\r\n"), SocketFlags.None); - }; - }); - }); - }) - .Configure(app => { }); - }) - .ConfigureServices(AddTestLogging); - - using var host = builder.Build(); - using var client = new HttpClient(); - - await host.StartAsync(); - - var response = await client.GetAsync($"http://127.0.0.1:{host.GetPort()}/"); - response.EnsureSuccessStatusCode(); - - await host.StopAsync(); - } - - [ConditionalFact] - [OSSkipCondition(OperatingSystems.Linux)] - [OSSkipCondition(OperatingSystems.MacOSX)] - public async Task CanDuplicateAndCloseSocketFeatureInConnectionMiddleware() - { - var builder = TransportSelector.GetHostBuilder() - .ConfigureWebHost(webHostBuilder => - { - webHostBuilder - .UseKestrel(options => - { - options.ListenAnyIP(0, lo => - { - lo.Use(next => - { - return async connection => - { - var originalSocket = connection.Features.Get<IConnectionSocketFeature>().Socket; - Assert.NotNull(originalSocket); - - var si = originalSocket.DuplicateAndClose(Process.GetCurrentProcess().Id); - - using var socket = new Socket(si); - var buffer = new byte[4096]; - - var read = await socket.ReceiveAsync(buffer, SocketFlags.None); - - static void ParseHttp(ReadOnlySequence<byte> data) - { - var parser = new HttpParser<ParserHandler>(); - var handler = new ParserHandler(); - - var reader = new SequenceReader<byte>(data); - - // Assume we can parse the HTTP request in a single buffer - Assert.True(parser.ParseRequestLine(handler, ref reader)); - Assert.True(parser.ParseHeaders(handler, ref reader)); - - Assert.Equal(KestrelHttpMethod.Get, handler.HttpMethod); - Assert.Equal(KestrelHttpVersion.Http11, handler.HttpVersion); - } - - ParseHttp(new ReadOnlySequence<byte>(buffer[0..read])); - - await socket.SendAsync(Encoding.UTF8.GetBytes("HTTP/1.1 200 OK\r\nConnection: close\r\n\r\n"), SocketFlags.None); - }; - }); - }); - }) - .Configure(app => { }); - }) - .ConfigureServices(AddTestLogging); - - using var host = builder.Build(); - using var client = new HttpClient(); - - await host.StartAsync(); - - var response = await client.GetAsync($"http://127.0.0.1:{host.GetPort()}/"); - response.EnsureSuccessStatusCode(); - - await host.StopAsync(); - } - - private class ParserHandler : IHttpRequestLineHandler, IHttpHeadersHandler - { - public KestrelHttpVersion HttpVersion { get; set; } - public KestrelHttpMethod HttpMethod { get; set; } - public Dictionary<string, string> Headers = new(); - - public void OnHeader(ReadOnlySpan<byte> name, ReadOnlySpan<byte> value) - { - Headers[Encoding.ASCII.GetString(name)] = Encoding.ASCII.GetString(value); - } - - public void OnHeadersComplete(bool endStream) - { - } - - public void OnStartLine(HttpVersionAndMethod versionAndMethod, TargetOffsetPathLength targetPath, Span<byte> startLine) - { - HttpMethod = versionAndMethod.Method; - HttpVersion = versionAndMethod.Version; - } - - public void OnStaticIndexedHeader(int index) - { - } - - public void OnStaticIndexedHeader(int index, ReadOnlySpan<byte> value) - { - } - } } }