From 96f55fcf259e587cbdcda250e7547dd79aa7c8b0 Mon Sep 17 00:00:00 2001 From: Stephen Halter <halter73@gmail.com> Date: Tue, 16 Jul 2019 23:06:24 -0700 Subject: [PATCH] Wrap the PipeWriter used by Kestrel's output writing logic (#12081) --- .../src/Internal/Http/Http1OutputProducer.cs | 13 +- .../src/Internal/Http2/Http2Connection.cs | 1 + .../src/Internal/Http2/Http2FrameWriter.cs | 8 +- .../src/Internal/Http2/Http2OutputProducer.cs | 37 +- .../Core/src/Internal/Http2/Http2Stream.cs | 1 - .../PipeWriterHelpers/BufferSegment.cs | 130 ++++++ .../PipeWriterHelpers/BufferSegmentStack.cs | 85 ++++ .../PipeWriterHelpers/ConcurrentPipeWriter.cs | 409 ++++++++++++++++++ .../TimingPipeFlusher.cs | 38 +- .../Middleware/HttpsConnectionMiddleware.cs | 10 +- .../Internal/DuplexPipeStreamAdapter.cs | 107 +---- .../Middleware/LoggingConnectionMiddleware.cs | 1 - .../Core/test/ConcurrentPipeWriterTests.cs | 387 +++++++++++++++++ .../Core/test/Http2FrameWriterTests.cs | 4 +- .../Core/test/TimingPipeFlusherTests.cs | 67 --- .../test/LibuvOutputConsumerTests.cs | 13 +- .../shared/test/TaskTimeoutExtensions.cs | 2 +- .../Http2/Http2TimeoutTests.cs | 12 +- 18 files changed, 1077 insertions(+), 248 deletions(-) create mode 100644 src/Servers/Kestrel/Core/src/Internal/Infrastructure/PipeWriterHelpers/BufferSegment.cs create mode 100644 src/Servers/Kestrel/Core/src/Internal/Infrastructure/PipeWriterHelpers/BufferSegmentStack.cs create mode 100644 src/Servers/Kestrel/Core/src/Internal/Infrastructure/PipeWriterHelpers/ConcurrentPipeWriter.cs rename src/Servers/Kestrel/Core/src/Internal/Infrastructure/{ => PipeWriterHelpers}/TimingPipeFlusher.cs (67%) create mode 100644 src/Servers/Kestrel/Core/test/ConcurrentPipeWriterTests.cs delete mode 100644 src/Servers/Kestrel/Core/test/TimingPipeFlusherTests.cs diff --git a/src/Servers/Kestrel/Core/src/Internal/Http/Http1OutputProducer.cs b/src/Servers/Kestrel/Core/src/Internal/Http/Http1OutputProducer.cs index a45a7409506..85a4da5fbe3 100644 --- a/src/Servers/Kestrel/Core/src/Internal/Http/Http1OutputProducer.cs +++ b/src/Servers/Kestrel/Core/src/Internal/Http/Http1OutputProducer.cs @@ -12,6 +12,7 @@ using Microsoft.AspNetCore.Connections; using Microsoft.AspNetCore.Internal; using Microsoft.AspNetCore.Server.Kestrel.Core.Features; using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure; +using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure.PipeWriterHelpers; namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http { @@ -45,7 +46,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http private long _unflushedBytes; private int _currentMemoryPrefixBytes; - private readonly PipeWriter _pipeWriter; + private readonly ConcurrentPipeWriter _pipeWriter; private IMemoryOwner<byte> _fakeMemoryOwner; // Chunked responses need to be treated uniquely when using GetMemory + Advance. @@ -81,18 +82,16 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http IHttpMinResponseDataRateFeature minResponseDataRateFeature, MemoryPool<byte> memoryPool) { - _pipeWriter = pipeWriter; + // Allow appending more data to the PipeWriter when a flush is pending. + _pipeWriter = new ConcurrentPipeWriter(pipeWriter, memoryPool); _connectionId = connectionId; _connectionContext = connectionContext; _log = log; _minResponseDataRateFeature = minResponseDataRateFeature; - _flusher = new TimingPipeFlusher(pipeWriter, timeoutControl, log); + _flusher = new TimingPipeFlusher(_pipeWriter, timeoutControl, log); _memoryPool = memoryPool; } - // For tests - internal PipeWriter PipeWriter => _pipeWriter; - public Task WriteDataAsync(ReadOnlySpan<byte> buffer, CancellationToken cancellationToken = default) { if (cancellationToken.IsCancellationRequested) @@ -408,6 +407,8 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http { lock (_contextLock) { + _pipeWriter.Abort(); + if (_fakeMemoryOwner != null) { _fakeMemoryOwner.Dispose(); diff --git a/src/Servers/Kestrel/Core/src/Internal/Http2/Http2Connection.cs b/src/Servers/Kestrel/Core/src/Internal/Http2/Http2Connection.cs index be4fece62b3..22ef5cd4346 100644 --- a/src/Servers/Kestrel/Core/src/Internal/Http2/Http2Connection.cs +++ b/src/Servers/Kestrel/Core/src/Internal/Http2/Http2Connection.cs @@ -87,6 +87,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2 context.TimeoutControl, httpLimits.MinResponseDataRate, context.ConnectionId, + context.MemoryPool, context.ServiceContext.Log); _hpackDecoder = new HPackDecoder(http2Limits.HeaderTableSize, http2Limits.MaxRequestHeaderFieldSize); diff --git a/src/Servers/Kestrel/Core/src/Internal/Http2/Http2FrameWriter.cs b/src/Servers/Kestrel/Core/src/Internal/Http2/Http2FrameWriter.cs index acdecca2ada..1f2b0573927 100644 --- a/src/Servers/Kestrel/Core/src/Internal/Http2/Http2FrameWriter.cs +++ b/src/Servers/Kestrel/Core/src/Internal/Http2/Http2FrameWriter.cs @@ -15,6 +15,7 @@ using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http; using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2.FlowControl; using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2.HPack; using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure; +using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure.PipeWriterHelpers; namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2 { @@ -26,7 +27,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2 private readonly object _writeLock = new object(); private readonly Http2Frame _outgoingFrame; private readonly HPackEncoder _hpackEncoder = new HPackEncoder(); - private readonly PipeWriter _outputWriter; + private readonly ConcurrentPipeWriter _outputWriter; private readonly ConnectionContext _connectionContext; private readonly Http2Connection _http2Connection; private readonly OutputFlowControl _connectionOutputFlowControl; @@ -51,9 +52,11 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2 ITimeoutControl timeoutControl, MinDataRate minResponseDataRate, string connectionId, + MemoryPool<byte> memoryPool, IKestrelTrace log) { - _outputWriter = outputPipeWriter; + // Allow appending more data to the PipeWriter when a flush is pending. + _outputWriter = new ConcurrentPipeWriter(outputPipeWriter, memoryPool); _connectionContext = connectionContext; _http2Connection = http2Connection; _connectionOutputFlowControl = connectionOutputFlowControl; @@ -89,6 +92,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2 _completed = true; _connectionOutputFlowControl.Abort(); + _outputWriter.Abort(); } } diff --git a/src/Servers/Kestrel/Core/src/Internal/Http2/Http2OutputProducer.cs b/src/Servers/Kestrel/Core/src/Internal/Http2/Http2OutputProducer.cs index d692059dffa..46b1143929d 100644 --- a/src/Servers/Kestrel/Core/src/Internal/Http2/Http2OutputProducer.cs +++ b/src/Servers/Kestrel/Core/src/Internal/Http2/Http2OutputProducer.cs @@ -12,6 +12,7 @@ using Microsoft.AspNetCore.Internal; using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http; using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2.FlowControl; using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure; +using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure.PipeWriterHelpers; using Microsoft.Extensions.Logging; namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2 @@ -29,7 +30,8 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2 private readonly MemoryPool<byte> _memoryPool; private readonly Http2Stream _stream; private readonly object _dataWriterLock = new object(); - private readonly Pipe _dataPipe; + private readonly PipeWriter _pipeWriter; + private readonly PipeReader _pipeReader; private readonly ValueTask<FlushResult> _dataWriteProcessingTask; private bool _startedWritingDataFrames; private bool _completed; @@ -43,7 +45,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2 int streamId, Http2FrameWriter frameWriter, StreamOutputFlowControl flowControl, - ITimeoutControl timeoutControl, MemoryPool<byte> pool, Http2Stream stream, IKestrelTrace log) @@ -55,8 +56,14 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2 _stream = stream; _log = log; - _dataPipe = CreateDataPipe(pool); - _flusher = new TimingPipeFlusher(_dataPipe.Writer, timeoutControl, log); + var pipe = CreateDataPipe(pool); + + _pipeWriter = new ConcurrentPipeWriter(pipe.Writer, pool); + _pipeReader = pipe.Reader; + + // No need to pass in timeoutControl here, since no minDataRates are passed to the TimingPipeFlusher. + // The minimum output data rate is enforced at the connection level by Http2FrameWriter. + _flusher = new TimingPipeFlusher(_pipeWriter, timeoutControl: null, log); _dataWriteProcessingTask = ProcessDataWrites(); } @@ -193,7 +200,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2 _startedWritingDataFrames = true; - _dataPipe.Writer.Write(data); + _pipeWriter.Write(data); return _flusher.FlushAsync(this, cancellationToken).GetAsTask(); } } @@ -210,7 +217,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2 _completed = true; _suffixSent = true; - _dataPipe.Writer.Complete(); + _pipeWriter.Complete(); return _dataWriteProcessingTask; } } @@ -239,7 +246,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2 _startedWritingDataFrames = true; - _dataPipe.Writer.Advance(bytes); + _pipeWriter.Advance(bytes); } } @@ -254,7 +261,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2 return GetFakeMemory(sizeHint).Span; } - return _dataPipe.Writer.GetSpan(sizeHint); + return _pipeWriter.GetSpan(sizeHint); } } @@ -269,7 +276,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2 return GetFakeMemory(sizeHint); } - return _dataPipe.Writer.GetMemory(sizeHint); + return _pipeWriter.GetMemory(sizeHint); } } @@ -282,7 +289,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2 return; } - _dataPipe.Writer.CancelPendingFlush(); + _pipeWriter.CancelPendingFlush(); } } @@ -306,7 +313,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2 _startedWritingDataFrames = true; - _dataPipe.Writer.Write(data); + _pipeWriter.Write(data); return _flusher.FlushAsync(this, cancellationToken); } } @@ -345,7 +352,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2 // Complete with an exception to prevent an end of stream data frame from being sent without an // explicit call to WriteStreamSuffixAsync. ConnectionAbortedExceptions are swallowed, so the // message doesn't matter - _dataPipe.Writer.Complete(new OperationCanceledException()); + _pipeWriter.Complete(new OperationCanceledException()); _frameWriter.AbortPendingStreamDataWrites(_flowControl); } @@ -364,7 +371,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2 do { - readResult = await _dataPipe.Reader.ReadAsync(); + readResult = await _pipeReader.ReadAsync(); if (readResult.IsCompleted && _stream.ResponseTrailers?.Count > 0) { @@ -393,7 +400,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2 flushResult = await _frameWriter.WriteDataAsync(_streamId, _flowControl, readResult.Buffer, endStream: readResult.IsCompleted); } - _dataPipe.Reader.AdvanceTo(readResult.Buffer.End); + _pipeReader.AdvanceTo(readResult.Buffer.End); } while (!readResult.IsCompleted); } catch (OperationCanceledException) @@ -405,7 +412,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2 _log.LogCritical(ex, nameof(Http2OutputProducer) + "." + nameof(ProcessDataWrites) + " observed an unexpected exception."); } - _dataPipe.Reader.Complete(); + _pipeReader.Complete(); return flushResult; diff --git a/src/Servers/Kestrel/Core/src/Internal/Http2/Http2Stream.cs b/src/Servers/Kestrel/Core/src/Internal/Http2/Http2Stream.cs index 174cccb47fc..caab64c0292 100644 --- a/src/Servers/Kestrel/Core/src/Internal/Http2/Http2Stream.cs +++ b/src/Servers/Kestrel/Core/src/Internal/Http2/Http2Stream.cs @@ -51,7 +51,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2 context.StreamId, context.FrameWriter, _outputFlowControl, - context.TimeoutControl, context.MemoryPool, this, context.ServiceContext.Log); diff --git a/src/Servers/Kestrel/Core/src/Internal/Infrastructure/PipeWriterHelpers/BufferSegment.cs b/src/Servers/Kestrel/Core/src/Internal/Infrastructure/PipeWriterHelpers/BufferSegment.cs new file mode 100644 index 00000000000..fdd8ac7367d --- /dev/null +++ b/src/Servers/Kestrel/Core/src/Internal/Infrastructure/PipeWriterHelpers/BufferSegment.cs @@ -0,0 +1,130 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. + +using System.Buffers; +using System.Diagnostics; +using System.Runtime.CompilerServices; + +namespace System.IO.Pipelines +{ + // Copied from https://github.com/dotnet/corefx/blob/de3902bb56f1254ec1af4bf7d092fc2c048734cc/src/System.IO.Pipelines/src/System/IO/Pipelines/BufferSegment.cs + internal sealed class BufferSegment : ReadOnlySequenceSegment<byte> + { + private object _memoryOwner; + private BufferSegment _next; + private int _end; + + /// <summary> + /// The End represents the offset into AvailableMemory where the range of "active" bytes ends. At the point when the block is leased + /// the End is guaranteed to be equal to Start. The value of Start may be assigned anywhere between 0 and + /// Buffer.Length, and must be equal to or less than End. + /// </summary> + public int End + { + get => _end; + set + { + Debug.Assert(value <= AvailableMemory.Length); + + _end = value; + Memory = AvailableMemory.Slice(0, value); + } + } + + /// <summary> + /// Reference to the next block of data when the overall "active" bytes spans multiple blocks. At the point when the block is + /// leased Next is guaranteed to be null. Start, End, and Next are used together in order to create a linked-list of discontiguous + /// working memory. The "active" memory is grown when bytes are copied in, End is increased, and Next is assigned. The "active" + /// memory is shrunk when bytes are consumed, Start is increased, and blocks are returned to the pool. + /// </summary> + public BufferSegment NextSegment + { + get => _next; + set + { + Next = value; + _next = value; + } + } + + public void SetOwnedMemory(IMemoryOwner<byte> memoryOwner) + { + _memoryOwner = memoryOwner; + AvailableMemory = memoryOwner.Memory; + } + + public void SetOwnedMemory(byte[] arrayPoolBuffer) + { + _memoryOwner = arrayPoolBuffer; + AvailableMemory = arrayPoolBuffer; + } + + public void SetUnownedMemory(Memory<byte> memory) + { + AvailableMemory = memory; + } + + public void ResetMemory() + { + if (_memoryOwner is IMemoryOwner<byte> owner) + { + owner.Dispose(); + } + else if (_memoryOwner is byte[] array) + { + ArrayPool<byte>.Shared.Return(array); + } + + // Order of below field clears is significant as it clears in a sequential order + // https://github.com/dotnet/corefx/pull/35256#issuecomment-462800477 + Next = null; + RunningIndex = 0; + Memory = default; + _memoryOwner = null; + _next = null; + _end = 0; + AvailableMemory = default; + } + + // Exposed for testing + internal object MemoryOwner => _memoryOwner; + + public Memory<byte> AvailableMemory { get; private set; } + + public int Length => End; + + public int WritableBytes + { + [MethodImpl(MethodImplOptions.AggressiveInlining)] + get => AvailableMemory.Length - End; + } + + public void SetNext(BufferSegment segment) + { + Debug.Assert(segment != null); + Debug.Assert(Next == null); + + NextSegment = segment; + + segment = this; + + while (segment.Next != null) + { + segment.NextSegment.RunningIndex = segment.RunningIndex + segment.Length; + segment = segment.NextSegment; + } + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + internal static long GetLength(BufferSegment startSegment, int startIndex, BufferSegment endSegment, int endIndex) + { + return (endSegment.RunningIndex + (uint)endIndex) - (startSegment.RunningIndex + (uint)startIndex); + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + internal static long GetLength(long startPosition, BufferSegment endSegment, int endIndex) + { + return (endSegment.RunningIndex + (uint)endIndex) - startPosition; + } + } +} diff --git a/src/Servers/Kestrel/Core/src/Internal/Infrastructure/PipeWriterHelpers/BufferSegmentStack.cs b/src/Servers/Kestrel/Core/src/Internal/Infrastructure/PipeWriterHelpers/BufferSegmentStack.cs new file mode 100644 index 00000000000..1cd7b26c6dd --- /dev/null +++ b/src/Servers/Kestrel/Core/src/Internal/Infrastructure/PipeWriterHelpers/BufferSegmentStack.cs @@ -0,0 +1,85 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. + +using System.Runtime.CompilerServices; + +namespace System.IO.Pipelines +{ + // Copied from https://github.com/dotnet/corefx/blob/de3902bb56f1254ec1af4bf7d092fc2c048734cc/src/System.IO.Pipelines/src/System/IO/Pipelines/BufferSegmentStack.cs + internal struct BufferSegmentStack + { + private SegmentAsValueType[] _array; + private int _size; + + public BufferSegmentStack(int size) + { + _array = new SegmentAsValueType[size]; + _size = 0; + } + + public int Count => _size; + + public bool TryPop(out BufferSegment result) + { + int size = _size - 1; + SegmentAsValueType[] array = _array; + + if ((uint)size >= (uint)array.Length) + { + result = default; + return false; + } + + _size = size; + result = array[size]; + array[size] = default; + return true; + } + + // Pushes an item to the top of the stack. + public void Push(BufferSegment item) + { + int size = _size; + SegmentAsValueType[] array = _array; + + if ((uint)size < (uint)array.Length) + { + array[size] = item; + _size = size + 1; + } + else + { + PushWithResize(item); + } + } + + // Non-inline from Stack.Push to improve its code quality as uncommon path + [MethodImpl(MethodImplOptions.NoInlining)] + private void PushWithResize(BufferSegment item) + { + Array.Resize(ref _array, 2 * _array.Length); + _array[_size] = item; + _size++; + } + + /// <summary> + /// A simple struct we wrap reference types inside when storing in arrays to + /// bypass the CLR's covariant checks when writing to arrays. + /// </summary> + /// <remarks> + /// We use <see cref="SegmentAsValueType"/> as a wrapper to avoid paying the cost of covariant checks whenever + /// the underlying array that the <see cref="BufferSegmentStack"/> class uses is written to. + /// We've recognized this as a perf win in ETL traces for these stack frames: + /// clr!JIT_Stelem_Ref + /// clr!ArrayStoreCheck + /// clr!ObjIsInstanceOf + /// </remarks> + private readonly struct SegmentAsValueType + { + private readonly BufferSegment _value; + private SegmentAsValueType(BufferSegment value) => _value = value; + public static implicit operator SegmentAsValueType(BufferSegment s) => new SegmentAsValueType(s); + public static implicit operator BufferSegment(SegmentAsValueType s) => s._value; + } + } +} diff --git a/src/Servers/Kestrel/Core/src/Internal/Infrastructure/PipeWriterHelpers/ConcurrentPipeWriter.cs b/src/Servers/Kestrel/Core/src/Internal/Infrastructure/PipeWriterHelpers/ConcurrentPipeWriter.cs new file mode 100644 index 00000000000..5a35bafb850 --- /dev/null +++ b/src/Servers/Kestrel/Core/src/Internal/Infrastructure/PipeWriterHelpers/ConcurrentPipeWriter.cs @@ -0,0 +1,409 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. + +using System; +using System.Buffers; +using System.IO.Pipelines; +using System.Runtime.CompilerServices; +using System.Threading; +using System.Threading.Tasks; + +namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure.PipeWriterHelpers +{ + /// <summary> + /// Wraps a PipeWriter so you can start appending more data to the pipe prior to the previous flush completing. + /// </summary> + internal sealed class ConcurrentPipeWriter : PipeWriter + { + // The following constants were copied from https://github.com/dotnet/corefx/blob/de3902bb56f1254ec1af4bf7d092fc2c048734cc/src/System.IO.Pipelines/src/System/IO/Pipelines/StreamPipeWriter.cs + // and the associated StreamPipeWriterOptions defaults. + private const int InitialSegmentPoolSize = 4; // 16K + private const int MaxSegmentPoolSize = 256; // 1MB + private const int MinimumBufferSize = 4096; // 4K + + private static readonly Exception _successfullyCompletedSentinel = new Exception(); + + private readonly object _sync = new object(); + private readonly PipeWriter _innerPipeWriter; + private readonly MemoryPool<byte> _pool; + private readonly BufferSegmentStack _bufferSegmentPool = new BufferSegmentStack(InitialSegmentPoolSize); + + private BufferSegment _head; + private BufferSegment _tail; + private Memory<byte> _tailMemory; + private int _tailBytesBuffered; + private long _bytesBuffered; + + // When _currentFlushTcs is null and _head/_tail is also null, the ConcurrentPipeWriter is in passthrough mode. + // When the ConcurrentPipeWriter is not in passthrough mode, that could be for one of two reasons: + // + // 1. A flush of the _innerPipeWriter is in progress. + // 2. Or the last flush of the _innerPipeWriter completed between external calls to GetMemory/Span() and Advance(). + // + // In either case, we need to manually append buffer segments until the loop in the current or next call to FlushAsync() + // flushes all the buffers putting the ConcurrentPipeWriter back into passthrough mode. + // The manual buffer appending logic is borrowed from corefx's StreamPipeWriter. + private TaskCompletionSource<FlushResult> _currentFlushTcs; + private bool _bufferedWritePending; + + // We're trusting the Http2FrameWriter and Http1OutputProducer to not call into the PipeWriter after calling Abort() or Complete() + // If an abort occurs while a flush is in progress, we clean up after the flush completes, and don't flush again. + private bool _aborted; + private Exception _completeException; + + public ConcurrentPipeWriter(PipeWriter innerPipeWriter, MemoryPool<byte> pool) + { + _innerPipeWriter = innerPipeWriter; + _pool = pool; + } + + public override Memory<byte> GetMemory(int sizeHint = 0) + { + lock (_sync) + { + if (_currentFlushTcs == null && _head == null) + { + return _innerPipeWriter.GetMemory(sizeHint); + } + + AllocateMemoryUnsynchronized(sizeHint); + return _tailMemory; + } + } + + public override Span<byte> GetSpan(int sizeHint = 0) + { + lock (_sync) + { + if (_currentFlushTcs == null && _head == null) + { + return _innerPipeWriter.GetSpan(sizeHint); + } + + AllocateMemoryUnsynchronized(sizeHint); + return _tailMemory.Span; + } + } + + public override void Advance(int bytes) + { + lock (_sync) + { + if (_currentFlushTcs == null && _head == null) + { + _innerPipeWriter.Advance(bytes); + return; + } + + if ((uint)bytes > (uint)_tailMemory.Length) + { + ThrowArgumentOutOfRangeException(nameof(bytes)); + } + + _tailBytesBuffered += bytes; + _bytesBuffered += bytes; + _tailMemory = _tailMemory.Slice(bytes); + _bufferedWritePending = false; + } + } + + public override ValueTask<FlushResult> FlushAsync(CancellationToken cancellationToken = default) + { + lock (_sync) + { + if (_currentFlushTcs != null) + { + return new ValueTask<FlushResult>(_currentFlushTcs.Task); + } + + if (_bytesBuffered > 0) + { + CopyAndReturnSegmentsUnsynchronized(); + } + + var flushTask = _innerPipeWriter.FlushAsync(cancellationToken); + + if (flushTask.IsCompletedSuccessfully) + { + if (_currentFlushTcs != null) + { + CompleteFlushUnsynchronized(flushTask.GetAwaiter().GetResult(), null); + } + + return flushTask; + } + + // Use a TCS instead of something resettable so it can be awaited by multiple awaiters. + _currentFlushTcs = new TaskCompletionSource<FlushResult>(TaskCreationOptions.RunContinuationsAsynchronously); + var result = new ValueTask<FlushResult>(_currentFlushTcs.Task); + + // FlushAsyncAwaited clears the TCS prior to completing. Make sure to construct the ValueTask + // from the TCS before calling FlushAsyncAwaited in case FlushAsyncAwaited completes inline. + _ = FlushAsyncAwaited(flushTask, cancellationToken); + return result; + } + } + + private async Task FlushAsyncAwaited(ValueTask<FlushResult> flushTask, CancellationToken cancellationToken) + { + try + { + // This while (true) does look scary, but the real continuation condition is at the start of the loop + // after the await, so the _sync lock can be acquired. + while (true) + { + var flushResult = await flushTask; + + lock (_sync) + { + if (_bytesBuffered == 0 || _aborted) + { + CompleteFlushUnsynchronized(flushResult, null); + return; + } + + if (flushResult.IsCanceled) + { + // Complete anyone currently awaiting a flush since CancelPendingFlush() was called + _currentFlushTcs.SetResult(flushResult); + // Reset _currentFlushTcs, so we don't enter passthrough mode while we're still flushing. + _currentFlushTcs = new TaskCompletionSource<FlushResult>(TaskCreationOptions.RunContinuationsAsynchronously); + } + + CopyAndReturnSegmentsUnsynchronized(); + + flushTask = _innerPipeWriter.FlushAsync(cancellationToken); + } + } + } + catch (Exception ex) + { + lock (_sync) + { + CompleteFlushUnsynchronized(default, ex); + } + } + } + + public override void OnReaderCompleted(Action<Exception, object> callback, object state) + { + _innerPipeWriter.OnReaderCompleted(callback, state); + } + + public override void CancelPendingFlush() + { + // We propagate IsCanceled when we do multiple flushes in a loop. If FlushResult.IsCanceled is true with more data pending to flush, + // _currentFlushTcs with canceled flush task, but rekick the FlushAsync loop. + _innerPipeWriter.CancelPendingFlush(); + } + + public override void Complete(Exception exception = null) + { + lock (_sync) + { + // We store the complete exception or s sentinel exception instance in a field if a flush was ongoing. + // We call the inner Complete() method after the flush loop ended. + + // To simply ensure everything gets returned after the PipeWriter is left in some unknown state (say GetMemory() was + // called but not Advance(), or there's a flush pending), but you don't want to complete the inner pipe, just call Abort(). + _completeException = exception ?? _successfullyCompletedSentinel; + + if (_currentFlushTcs == null) + { + if (_bytesBuffered > 0) + { + CopyAndReturnSegmentsUnsynchronized(); + } + + CleanupSegmentsUnsynchronized(); + + _innerPipeWriter.Complete(exception); + } + } + } + + public void Abort() + { + lock (_sync) + { + _aborted = true; + + // If we're flushing, the cleanup will happen after the flush. + if (_currentFlushTcs == null) + { + CleanupSegmentsUnsynchronized(); + } + } + } + + private void CleanupSegmentsUnsynchronized() + { + BufferSegment segment = _head; + while (segment != null) + { + BufferSegment returnSegment = segment; + segment = segment.NextSegment; + returnSegment.ResetMemory(); + } + + _head = null; + _tail = null; + _tailMemory = null; + } + + private void CopyAndReturnSegmentsUnsynchronized() + { + // Update any buffered data + _tail.End += _tailBytesBuffered; + _tailBytesBuffered = 0; + + var segment = _head; + + while (segment != null) + { + _innerPipeWriter.Write(segment.Memory.Span); + + var returnSegment = segment; + segment = segment.NextSegment; + + // We haven't reached the tail of the linked list yet, so we can always return the returnSegment. + if (segment != null) + { + returnSegment.ResetMemory(); + ReturnSegmentUnsynchronized(returnSegment); + } + } + + if (_bufferedWritePending) + { + // If an advance is pending, so is a flush, so the _tail segment should still get returned eventually. + _head = _tail; + } + else + { + _tail.ResetMemory(); + ReturnSegmentUnsynchronized(_tail); + _head = _tail = null; + } + + // Even if a non-passthrough call to Advance is pending, there a 0 bytes currently buffered. + _bytesBuffered = 0; + } + + private void CompleteFlushUnsynchronized(FlushResult flushResult, Exception flushEx) + { + // Ensure all blocks are returned prior to the last call to FlushAsync() completing. + if (_completeException != null || _aborted) + { + CleanupSegmentsUnsynchronized(); + } + + if (ReferenceEquals(_completeException, _successfullyCompletedSentinel)) + { + _innerPipeWriter.Complete(); + } + else if (_completeException != null) + { + _innerPipeWriter.Complete(_completeException); + } + + if (flushEx != null) + { + _currentFlushTcs.SetException(flushEx); + } + else + { + _currentFlushTcs.SetResult(flushResult); + } + + _currentFlushTcs = null; + } + + // The methods below were copied from https://github.com/dotnet/corefx/blob/de3902bb56f1254ec1af4bf7d092fc2c048734cc/src/System.IO.Pipelines/src/System/IO/Pipelines/StreamPipeWriter.cs + private void AllocateMemoryUnsynchronized(int sizeHint) + { + _bufferedWritePending = true; + + if (_head == null) + { + // We need to allocate memory to write since nobody has written before + BufferSegment newSegment = AllocateSegmentUnsynchronized(sizeHint); + + // Set all the pointers + _head = _tail = newSegment; + _tailBytesBuffered = 0; + } + else + { + int bytesLeftInBuffer = _tailMemory.Length; + + if (bytesLeftInBuffer == 0 || bytesLeftInBuffer < sizeHint) + { + if (_tailBytesBuffered > 0) + { + // Flush buffered data to the segment + _tail.End += _tailBytesBuffered; + _tailBytesBuffered = 0; + } + + BufferSegment newSegment = AllocateSegmentUnsynchronized(sizeHint); + + _tail.SetNext(newSegment); + _tail = newSegment; + } + } + } + + private BufferSegment AllocateSegmentUnsynchronized(int sizeHint) + { + BufferSegment newSegment = CreateSegmentUnsynchronized(); + + if (sizeHint <= _pool.MaxBufferSize) + { + // Use the specified pool if it fits + newSegment.SetOwnedMemory(_pool.Rent(GetSegmentSize(sizeHint, _pool.MaxBufferSize))); + } + else + { + // We can't use the pool so allocate an array + newSegment.SetUnownedMemory(new byte[sizeHint]); + } + + _tailMemory = newSegment.AvailableMemory; + + return newSegment; + } + + private BufferSegment CreateSegmentUnsynchronized() + { + if (_bufferSegmentPool.TryPop(out BufferSegment segment)) + { + return segment; + } + + return new BufferSegment(); + } + + private void ReturnSegmentUnsynchronized(BufferSegment segment) + { + if (_bufferSegmentPool.Count < MaxSegmentPoolSize) + { + _bufferSegmentPool.Push(segment); + } + } + + private static int GetSegmentSize(int sizeHint, int maxBufferSize = int.MaxValue) + { + // First we need to handle case where hint is smaller than minimum segment size + sizeHint = Math.Max(MinimumBufferSize, sizeHint); + // After that adjust it to fit into pools max buffer size + var adjustedToMaximumSize = Math.Min(maxBufferSize, sizeHint); + return adjustedToMaximumSize; + } + + // Copied from https://github.com/dotnet/corefx/blob/de3902bb56f1254ec1af4bf7d092fc2c048734cc/src/System.Memory/src/System/ThrowHelper.cs + private static void ThrowArgumentOutOfRangeException(string argumentName) { throw CreateArgumentOutOfRangeException(argumentName); } + [MethodImpl(MethodImplOptions.NoInlining)] + private static Exception CreateArgumentOutOfRangeException(string argumentName) { return new ArgumentOutOfRangeException(argumentName); } + } +} diff --git a/src/Servers/Kestrel/Core/src/Internal/Infrastructure/TimingPipeFlusher.cs b/src/Servers/Kestrel/Core/src/Internal/Infrastructure/PipeWriterHelpers/TimingPipeFlusher.cs similarity index 67% rename from src/Servers/Kestrel/Core/src/Internal/Infrastructure/TimingPipeFlusher.cs rename to src/Servers/Kestrel/Core/src/Internal/Infrastructure/PipeWriterHelpers/TimingPipeFlusher.cs index 22568e0bd02..a1d16d3cc10 100644 --- a/src/Servers/Kestrel/Core/src/Internal/Infrastructure/TimingPipeFlusher.cs +++ b/src/Servers/Kestrel/Core/src/Internal/Infrastructure/PipeWriterHelpers/TimingPipeFlusher.cs @@ -9,7 +9,7 @@ using Microsoft.AspNetCore.Connections; using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http; using Microsoft.Extensions.Logging; -namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure +namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure.PipeWriterHelpers { /// <summary> /// This wraps PipeWriter.FlushAsync() in a way that allows multiple awaiters making it safe to call from publicly @@ -21,15 +21,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure private readonly ITimeoutControl _timeoutControl; private readonly IKestrelTrace _log; - private readonly object _flushLock = new object(); - - // This field should only be get or set under the _flushLock. This is a ValueTask that was either: - // 1. The default value where "IsCompleted" is true - // 2. Created by an async method - // 3. Constructed explicitely from a completed result - // This means it should be safe to await a single _lastFlushTask instance multiple times. - private ValueTask<FlushResult> _lastFlushTask; - public TimingPipeFlusher( PipeWriter writer, ITimeoutControl timeoutControl, @@ -56,25 +47,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure } public ValueTask<FlushResult> FlushAsync(MinDataRate minRate, long count, IHttpOutputAborter outputAborter, CancellationToken cancellationToken) - { - // https://github.com/dotnet/corefxlab/issues/1334 - // Pipelines don't support multiple awaiters on flush. - lock (_flushLock) - { - if (_lastFlushTask.IsCompleted) - { - _lastFlushTask = TimeFlushAsync(minRate, count, outputAborter, cancellationToken); - } - else - { - _lastFlushTask = AwaitLastFlushAndTimeFlushAsync(_lastFlushTask, minRate, count, outputAborter, cancellationToken); - } - - return _lastFlushTask; - } - } - - private ValueTask<FlushResult> TimeFlushAsync(MinDataRate minRate, long count, IHttpOutputAborter outputAborter, CancellationToken cancellationToken) { var pipeFlushTask = _writer.FlushAsync(cancellationToken); @@ -109,7 +81,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure catch (Exception ex) { // A canceled token is the only reason flush should ever throw. - _log.LogError(0, ex, $"Unexpected exception in {nameof(TimingPipeFlusher)}.{nameof(TimeFlushAsync)}."); + _log.LogError(0, ex, $"Unexpected exception in {nameof(TimingPipeFlusher)}.{nameof(FlushAsync)}."); } finally { @@ -123,11 +95,5 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure return default; } - - private async ValueTask<FlushResult> AwaitLastFlushAndTimeFlushAsync(ValueTask<FlushResult> lastFlushTask, MinDataRate minRate, long count, IHttpOutputAborter outputAborter, CancellationToken cancellationToken) - { - await lastFlushTask; - return await TimeFlushAsync(minRate, count, outputAborter, cancellationToken); - } } } diff --git a/src/Servers/Kestrel/Core/src/Middleware/HttpsConnectionMiddleware.cs b/src/Servers/Kestrel/Core/src/Middleware/HttpsConnectionMiddleware.cs index 1c433f713b8..4dce5d03489 100644 --- a/src/Servers/Kestrel/Core/src/Middleware/HttpsConnectionMiddleware.cs +++ b/src/Servers/Kestrel/Core/src/Middleware/HttpsConnectionMiddleware.cs @@ -110,10 +110,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Https.Internal if (_options.ClientCertificateMode == ClientCertificateMode.NoCertificate) { - sslDuplexPipe = new SslDuplexPipe(context.Transport, inputPipeOptions, outputPipeOptions) - { - Log = _logger - }; + sslDuplexPipe = new SslDuplexPipe(context.Transport, inputPipeOptions, outputPipeOptions); certificateRequired = false; } else @@ -150,10 +147,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Https.Internal } return true; - })) - { - Log = _logger - }; + })); certificateRequired = true; } diff --git a/src/Servers/Kestrel/Core/src/Middleware/Internal/DuplexPipeStreamAdapter.cs b/src/Servers/Kestrel/Core/src/Middleware/Internal/DuplexPipeStreamAdapter.cs index 592eeb67c7d..929446f502c 100644 --- a/src/Servers/Kestrel/Core/src/Middleware/Internal/DuplexPipeStreamAdapter.cs +++ b/src/Servers/Kestrel/Core/src/Middleware/Internal/DuplexPipeStreamAdapter.cs @@ -6,7 +6,6 @@ using System.IO; using System.IO.Pipelines; using System.Threading.Tasks; using Microsoft.AspNetCore.Connections; -using Microsoft.Extensions.Logging; namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal { @@ -17,9 +16,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal internal class DuplexPipeStreamAdapter<TStream> : DuplexPipeStream, IDuplexPipe where TStream : Stream { private readonly Pipe _input; - private readonly Pipe _output; private Task _inputTask; - private Task _outputTask; private bool _disposed; private readonly object _disposeLock = new object(); private readonly int _minAllocBufferSize; @@ -42,26 +39,12 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal minimumSegmentSize: readerOptions.Pool.GetMinimumSegmentSize(), useSynchronizationContext: false); - var outputOptions = new PipeOptions(pool: writerOptions.Pool, - readerScheduler: PipeScheduler.Inline, - writerScheduler: PipeScheduler.Inline, - pauseWriterThreshold: 1, - resumeWriterThreshold: 1, - minimumSegmentSize: writerOptions.MinimumBufferSize, - useSynchronizationContext: false); - _minAllocBufferSize = writerOptions.MinimumBufferSize; _input = new Pipe(inputOptions); - - // We're using a pipe here because the HTTP/2 stack in Kestrel currently makes assumptions - // about when it is ok to write to the PipeWriter. This should be reverted back to PipeWriter.Create once - // those patterns are fixed. - _output = new Pipe(outputOptions); + Output = PipeWriter.Create(Stream, writerOptions); } - public ILogger Log { get; set; } - public TStream Stream { get; } public PipeReader Input @@ -70,31 +53,14 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal { if (_inputTask == null) { - RunAsync(); + _inputTask = ReadInputAsync(); } return _input.Reader; } } - public PipeWriter Output - { - get - { - if (_outputTask == null) - { - RunAsync(); - } - - return _output.Writer; - } - } - - public void RunAsync() - { - _inputTask = ReadInputAsync(); - _outputTask = WriteOutputAsync(); - } + public PipeWriter Output { get; } public override async ValueTask DisposeAsync() { @@ -107,27 +73,22 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal _disposed = true; } - _output.Writer.Complete(); _input.Reader.Complete(); + Output.Complete(); - if (_outputTask == null) - { - return; - } - - if (_outputTask != null) - { - await _outputTask; - } - CancelPendingRead(); - + if (_inputTask != null) { await _inputTask; } } + protected override void Dispose(bool disposing) + { + throw new NotSupportedException(); + } + private async Task ReadInputAsync() { Exception error = null; @@ -171,54 +132,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal _input.Writer.Complete(error); } } - - private async Task WriteOutputAsync() - { - try - { - while (true) - { - var result = await _output.Reader.ReadAsync(); - var buffer = result.Buffer; - - try - { - if (buffer.IsEmpty) - { - if (result.IsCompleted) - { - break; - } - - await Stream.FlushAsync(); - } - else if (buffer.IsSingleSegment) - { - await Stream.WriteAsync(buffer.First); - } - else - { - foreach (var memory in buffer) - { - await Stream.WriteAsync(memory); - } - } - } - finally - { - _output.Reader.AdvanceTo(buffer.End); - } - } - } - catch (Exception ex) - { - Log?.LogCritical(0, ex, $"{GetType().Name}.{nameof(WriteOutputAsync)}"); - } - finally - { - _output.Reader.Complete(); - } - } } } diff --git a/src/Servers/Kestrel/Core/src/Middleware/LoggingConnectionMiddleware.cs b/src/Servers/Kestrel/Core/src/Middleware/LoggingConnectionMiddleware.cs index 5ce9f6df510..ed9ffb819c7 100644 --- a/src/Servers/Kestrel/Core/src/Middleware/LoggingConnectionMiddleware.cs +++ b/src/Servers/Kestrel/Core/src/Middleware/LoggingConnectionMiddleware.cs @@ -44,7 +44,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal public LoggingDuplexPipe(IDuplexPipe transport, ILogger logger) : base(transport, stream => new LoggingStream(stream, logger)) { - Log = logger; } } } diff --git a/src/Servers/Kestrel/Core/test/ConcurrentPipeWriterTests.cs b/src/Servers/Kestrel/Core/test/ConcurrentPipeWriterTests.cs new file mode 100644 index 00000000000..eb3ab8eead9 --- /dev/null +++ b/src/Servers/Kestrel/Core/test/ConcurrentPipeWriterTests.cs @@ -0,0 +1,387 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. + +using System; +using System.Buffers; +using System.IO.Pipelines; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure.PipeWriterHelpers; +using Xunit; + +namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests +{ + public class ConcurrentPipeWriterTests + { + [Fact] + public async Task PassthroughIfAllFlushesAreAwaited() + { + using (var slabPool = new SlabMemoryPool()) + using (var diagnosticPool = new DiagnosticMemoryPool(slabPool)) + { + var pipeWriterFlushTcsArray = new[] { + new TaskCompletionSource<FlushResult>(TaskCreationOptions.RunContinuationsAsynchronously), + new TaskCompletionSource<FlushResult>(TaskCreationOptions.RunContinuationsAsynchronously), + }; + + var mockPipeWriter = new MockPipeWriter(pipeWriterFlushTcsArray); + var concurrentPipeWriter = new ConcurrentPipeWriter(mockPipeWriter, diagnosticPool); + + var memory = concurrentPipeWriter.GetMemory(); + Assert.Equal(1, mockPipeWriter.GetMemoryCallCount); + + concurrentPipeWriter.Advance(memory.Length); + Assert.Equal(1, mockPipeWriter.AdvanceCallCount); + + var flushTask0 = concurrentPipeWriter.FlushAsync(); + Assert.Equal(1, mockPipeWriter.FlushCallCount); + + pipeWriterFlushTcsArray[0].SetResult(default); + + await flushTask0.DefaultTimeout(); + + memory = concurrentPipeWriter.GetMemory(); + Assert.Equal(2, mockPipeWriter.GetMemoryCallCount); + + concurrentPipeWriter.Advance(memory.Length); + Assert.Equal(2, mockPipeWriter.AdvanceCallCount); + + var flushTask1 = concurrentPipeWriter.FlushAsync(); + Assert.Equal(2, mockPipeWriter.FlushCallCount); + + pipeWriterFlushTcsArray[1].SetResult(default); + + await flushTask1.DefaultTimeout(); + + var completeEx = new Exception(); + await concurrentPipeWriter.CompleteAsync(completeEx).DefaultTimeout(); + Assert.Same(completeEx, mockPipeWriter.CompleteException); + } + } + + [Fact] + public async Task QueuesIfFlushIsNotAwaited() + { + using (var slabPool = new SlabMemoryPool()) + using (var diagnosticPool = new DiagnosticMemoryPool(slabPool)) + { + var pipeWriterFlushTcsArray = new[] { + new TaskCompletionSource<FlushResult>(TaskCreationOptions.RunContinuationsAsynchronously), + new TaskCompletionSource<FlushResult>(TaskCreationOptions.RunContinuationsAsynchronously), + new TaskCompletionSource<FlushResult>(TaskCreationOptions.RunContinuationsAsynchronously), + }; + + var mockPipeWriter = new MockPipeWriter(pipeWriterFlushTcsArray); + var concurrentPipeWriter = new ConcurrentPipeWriter(mockPipeWriter, diagnosticPool); + + var memory = concurrentPipeWriter.GetMemory(); + Assert.Equal(1, mockPipeWriter.GetMemoryCallCount); + + concurrentPipeWriter.Advance(memory.Length); + Assert.Equal(1, mockPipeWriter.AdvanceCallCount); + + var flushTask0 = concurrentPipeWriter.FlushAsync(); + Assert.Equal(1, mockPipeWriter.FlushCallCount); + + Assert.False(flushTask0.IsCompleted); + + // Since the flush was not awaited, the following API calls are queued. + memory = concurrentPipeWriter.GetMemory(); + concurrentPipeWriter.Advance(memory.Length); + var flushTask1 = concurrentPipeWriter.FlushAsync(); + + Assert.Equal(1, mockPipeWriter.GetMemoryCallCount); + Assert.Equal(1, mockPipeWriter.AdvanceCallCount); + Assert.Equal(1, mockPipeWriter.FlushCallCount); + + Assert.False(flushTask0.IsCompleted); + Assert.False(flushTask1.IsCompleted); + + mockPipeWriter.FlushTcs = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously); + pipeWriterFlushTcsArray[0].SetResult(default); + await mockPipeWriter.FlushTcs.Task.DefaultTimeout(); + + // Since the flush was not awaited, the following API calls are queued. + memory = concurrentPipeWriter.GetMemory(); + concurrentPipeWriter.Advance(memory.Length); + + // We do not need to flush the final bytes, since the incomplete flush will pick it up. + Assert.Equal(2, mockPipeWriter.GetMemoryCallCount); + Assert.Equal(2, mockPipeWriter.AdvanceCallCount); + Assert.Equal(2, mockPipeWriter.FlushCallCount); + + Assert.False(flushTask0.IsCompleted); + Assert.False(flushTask1.IsCompleted); + + mockPipeWriter.FlushTcs = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously); + pipeWriterFlushTcsArray[1].SetResult(default); + await mockPipeWriter.FlushTcs.Task.DefaultTimeout(); + + // Even though we only called flush on the ConcurrentPipeWriter twice, the inner PipeWriter was flushed three times. + Assert.Equal(3, mockPipeWriter.GetMemoryCallCount); + Assert.Equal(3, mockPipeWriter.AdvanceCallCount); + Assert.Equal(3, mockPipeWriter.FlushCallCount); + + Assert.False(flushTask0.IsCompleted); + Assert.False(flushTask1.IsCompleted); + + var completeEx = new Exception(); + await concurrentPipeWriter.CompleteAsync(completeEx); + + // Complete isn't called on the inner PipeWriter until the inner flushes have completed. + Assert.Null(mockPipeWriter.CompleteException); + + pipeWriterFlushTcsArray[2].SetResult(default); + + await flushTask0.DefaultTimeout(); + await flushTask1.DefaultTimeout(); + + Assert.Same(completeEx, mockPipeWriter.CompleteException); + } + } + + [Fact] + public async Task KeepsQueueIfInnerFlushFinishesBetweenGetMemoryAndAdvance() + { + using (var slabPool = new SlabMemoryPool()) + using (var diagnosticPool = new DiagnosticMemoryPool(slabPool)) + { + var pipeWriterFlushTcsArray = new[] { + new TaskCompletionSource<FlushResult>(TaskCreationOptions.RunContinuationsAsynchronously), + new TaskCompletionSource<FlushResult>(TaskCreationOptions.RunContinuationsAsynchronously), + }; + + var mockPipeWriter = new MockPipeWriter(pipeWriterFlushTcsArray); + var concurrentPipeWriter = new ConcurrentPipeWriter(mockPipeWriter, diagnosticPool); + + var memory = concurrentPipeWriter.GetMemory(); + Assert.Equal(1, mockPipeWriter.GetMemoryCallCount); + + concurrentPipeWriter.Advance(memory.Length); + Assert.Equal(1, mockPipeWriter.AdvanceCallCount); + + var flushTask0 = concurrentPipeWriter.FlushAsync(); + Assert.Equal(1, mockPipeWriter.FlushCallCount); + Assert.False(flushTask0.IsCompleted); + + // Only GetMemory() is called but not Advance() is not called yet when the first inner flush complets. + memory = concurrentPipeWriter.GetMemory(); + + // If the inner flush completes between a call to GetMemory() and Advance(), the outer + // flush completes, and the next flush will pick up the buffered data. + pipeWriterFlushTcsArray[0].SetResult(default); + + await flushTask0.DefaultTimeout(); + + Assert.Equal(1, mockPipeWriter.GetMemoryCallCount); + Assert.Equal(1, mockPipeWriter.AdvanceCallCount); + Assert.Equal(1, mockPipeWriter.FlushCallCount); + + concurrentPipeWriter.Advance(memory.Length); + memory = concurrentPipeWriter.GetMemory(); + concurrentPipeWriter.Advance(memory.Length); + + var flushTask1 = concurrentPipeWriter.FlushAsync(); + + // Now that we flushed the ConcurrentPipeWriter again, the GetMemory() and Advance() calls are replayed. + // Make sure that MockPipeWriter.SlabMemoryPoolBlockSize matches SlabMemoryPool._blockSize or else + // it might take more or less calls to the inner PipeWriter's GetMemory method to copy all the data. + Assert.Equal(3, mockPipeWriter.GetMemoryCallCount); + Assert.Equal(3, mockPipeWriter.AdvanceCallCount); + Assert.Equal(2, mockPipeWriter.FlushCallCount); + Assert.False(flushTask1.IsCompleted); + + pipeWriterFlushTcsArray[1].SetResult(default); + + await flushTask1.DefaultTimeout(); + + // Even though we only called flush on the ConcurrentPipeWriter twice, the inner PipeWriter was flushed three times. + Assert.Equal(3, mockPipeWriter.GetMemoryCallCount); + Assert.Equal(3, mockPipeWriter.AdvanceCallCount); + Assert.Equal(2, mockPipeWriter.FlushCallCount); + + var completeEx = new Exception(); + await concurrentPipeWriter.CompleteAsync(completeEx); + Assert.Same(completeEx, mockPipeWriter.CompleteException); + } + } + + [Fact] + public async Task CompleteFlushesQueuedBytes() + { + using (var slabPool = new SlabMemoryPool()) + using (var diagnosticPool = new DiagnosticMemoryPool(slabPool)) + { + var pipeWriterFlushTcsArray = new[] { + new TaskCompletionSource<FlushResult>(TaskCreationOptions.RunContinuationsAsynchronously), + new TaskCompletionSource<FlushResult>(TaskCreationOptions.RunContinuationsAsynchronously), + }; + + var mockPipeWriter = new MockPipeWriter(pipeWriterFlushTcsArray); + var concurrentPipeWriter = new ConcurrentPipeWriter(mockPipeWriter, diagnosticPool); + + var memory = concurrentPipeWriter.GetMemory(); + Assert.Equal(1, mockPipeWriter.GetMemoryCallCount); + + concurrentPipeWriter.Advance(memory.Length); + Assert.Equal(1, mockPipeWriter.AdvanceCallCount); + + var flushTask0 = concurrentPipeWriter.FlushAsync(); + Assert.Equal(1, mockPipeWriter.FlushCallCount); + Assert.False(flushTask0.IsCompleted); + + // Only GetMemory() is called but not Advance() is not called yet when the first inner flush completes. + memory = concurrentPipeWriter.GetMemory(); + + // If the inner flush completes between a call to GetMemory() and Advance(), the outer + // flush completes, and the next flush will pick up the buffered data. + pipeWriterFlushTcsArray[0].SetResult(default); + + await flushTask0.DefaultTimeout(); + + Assert.Equal(1, mockPipeWriter.GetMemoryCallCount); + Assert.Equal(1, mockPipeWriter.AdvanceCallCount); + Assert.Equal(1, mockPipeWriter.FlushCallCount); + + concurrentPipeWriter.Advance(memory.Length); + memory = concurrentPipeWriter.GetMemory(); + concurrentPipeWriter.Advance(memory.Length); + + // Complete the ConcurrentPipeWriter without flushing any of the queued data. + var completeEx = new Exception(); + await concurrentPipeWriter.CompleteAsync(completeEx); + + // Now that we completed the ConcurrentPipeWriter, the GetMemory() and Advance() calls are replayed. + // Make sure that MockPipeWriter.SlabMemoryPoolBlockSize matches SlabMemoryPool._blockSize or else + // it might take more or less calls to the inner PipeWriter's GetMemory method to copy all the data. + Assert.Equal(3, mockPipeWriter.GetMemoryCallCount); + Assert.Equal(3, mockPipeWriter.AdvanceCallCount); + Assert.Equal(1, mockPipeWriter.FlushCallCount); + Assert.Same(completeEx, mockPipeWriter.CompleteException); + } + } + + [Fact] + public async Task CancelPendingFlushInterruptsFlushLoop() + { + using (var slabPool = new SlabMemoryPool()) + using (var diagnosticPool = new DiagnosticMemoryPool(slabPool)) + { + var pipeWriterFlushTcsArray = new[] { + new TaskCompletionSource<FlushResult>(TaskCreationOptions.RunContinuationsAsynchronously), + new TaskCompletionSource<FlushResult>(TaskCreationOptions.RunContinuationsAsynchronously), + }; + + var mockPipeWriter = new MockPipeWriter(pipeWriterFlushTcsArray); + var concurrentPipeWriter = new ConcurrentPipeWriter(mockPipeWriter, diagnosticPool); + + var memory = concurrentPipeWriter.GetMemory(); + Assert.Equal(1, mockPipeWriter.GetMemoryCallCount); + + concurrentPipeWriter.Advance(memory.Length); + Assert.Equal(1, mockPipeWriter.AdvanceCallCount); + + var flushTask0 = concurrentPipeWriter.FlushAsync(); + Assert.Equal(1, mockPipeWriter.FlushCallCount); + + Assert.False(flushTask0.IsCompleted); + + // Since the flush was not awaited, the following API calls are queued. + memory = concurrentPipeWriter.GetMemory(); + concurrentPipeWriter.Advance(memory.Length); + var flushTask1 = concurrentPipeWriter.FlushAsync(); + + Assert.Equal(1, mockPipeWriter.GetMemoryCallCount); + Assert.Equal(1, mockPipeWriter.AdvanceCallCount); + Assert.Equal(1, mockPipeWriter.FlushCallCount); + + Assert.False(flushTask0.IsCompleted); + Assert.False(flushTask1.IsCompleted); + + // CancelPendingFlush() does not get queued. + concurrentPipeWriter.CancelPendingFlush(); + Assert.Equal(1, mockPipeWriter.CancelPendingFlushCallCount); + + pipeWriterFlushTcsArray[0].SetResult(new FlushResult(isCanceled: true, isCompleted: false)); + + Assert.True((await flushTask0.DefaultTimeout()).IsCanceled); + Assert.True((await flushTask1.DefaultTimeout()).IsCanceled); + + var flushTask2 = concurrentPipeWriter.FlushAsync(); + Assert.False(flushTask2.IsCompleted); + + pipeWriterFlushTcsArray[1].SetResult(default); + await flushTask2.DefaultTimeout(); + + // We do not need to flush the final bytes, since the incomplete flush will pick it up. + Assert.Equal(2, mockPipeWriter.GetMemoryCallCount); + Assert.Equal(2, mockPipeWriter.AdvanceCallCount); + Assert.Equal(2, mockPipeWriter.FlushCallCount); + + var completeEx = new Exception(); + await concurrentPipeWriter.CompleteAsync(completeEx); + Assert.Same(completeEx, mockPipeWriter.CompleteException); + } + } + + private class MockPipeWriter : PipeWriter + { + // It's important that this matches SlabMemoryPool._blockSize for all the tests to pass. + private const int SlabMemoryPoolBlockSize = 4096; + + private readonly TaskCompletionSource<FlushResult>[] _flushResults; + + public MockPipeWriter(TaskCompletionSource<FlushResult>[] flushResults) + { + _flushResults = flushResults; + } + + public int GetMemoryCallCount { get; set; } + public int AdvanceCallCount { get; set; } + public int FlushCallCount { get; set; } + public int CancelPendingFlushCallCount { get; set; } + + public TaskCompletionSource<object> FlushTcs { get; set; } + + public Exception CompleteException { get; set; } + + public override ValueTask<FlushResult> FlushAsync(CancellationToken cancellationToken = default) + { + FlushCallCount++; + FlushTcs?.TrySetResult(null); + return new ValueTask<FlushResult>(_flushResults[FlushCallCount - 1].Task); + } + + public override Memory<byte> GetMemory(int sizeHint = 0) + { + GetMemoryCallCount++; + return new Memory<byte>(new byte[sizeHint == 0 ? SlabMemoryPoolBlockSize : sizeHint]); + } + + public override Span<byte> GetSpan(int sizeHint = 0) + { + return GetMemory(sizeHint).Span; + } + + public override void Advance(int bytes) + { + AdvanceCallCount++; + } + + public override void Complete(Exception exception = null) + { + CompleteException = exception; + } + + public override void CancelPendingFlush() + { + CancelPendingFlushCallCount++; + } + + public override void OnReaderCompleted(Action<Exception, object> callback, object state) + { + throw new NotImplementedException(); + } + } + } +} diff --git a/src/Servers/Kestrel/Core/test/Http2FrameWriterTests.cs b/src/Servers/Kestrel/Core/test/Http2FrameWriterTests.cs index dd5f993ea25..6300fcf85bd 100644 --- a/src/Servers/Kestrel/Core/test/Http2FrameWriterTests.cs +++ b/src/Servers/Kestrel/Core/test/Http2FrameWriterTests.cs @@ -41,7 +41,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests { // Arrange var pipe = new Pipe(new PipeOptions(_dirtyMemoryPool, PipeScheduler.Inline, PipeScheduler.Inline)); - var frameWriter = new Http2FrameWriter(pipe.Writer, null, null, null, null, null, null, new Mock<IKestrelTrace>().Object); + var frameWriter = new Http2FrameWriter(pipe.Writer, null, null, null, null, null, null, _dirtyMemoryPool, new Mock<IKestrelTrace>().Object); // Act await frameWriter.WriteWindowUpdateAsync(1, 1); @@ -57,7 +57,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests { // Arrange var pipe = new Pipe(new PipeOptions(_dirtyMemoryPool, PipeScheduler.Inline, PipeScheduler.Inline)); - var frameWriter = new Http2FrameWriter(pipe.Writer, null, null, null, null, null, null, new Mock<IKestrelTrace>().Object); + var frameWriter = new Http2FrameWriter(pipe.Writer, null, null, null, null, null, null, _dirtyMemoryPool, new Mock<IKestrelTrace>().Object); // Act await frameWriter.WriteGoAwayAsync(1, Http2ErrorCode.NO_ERROR); diff --git a/src/Servers/Kestrel/Core/test/TimingPipeFlusherTests.cs b/src/Servers/Kestrel/Core/test/TimingPipeFlusherTests.cs deleted file mode 100644 index 400d390039d..00000000000 --- a/src/Servers/Kestrel/Core/test/TimingPipeFlusherTests.cs +++ /dev/null @@ -1,67 +0,0 @@ -// Copyright (c) .NET Foundation. All rights reserved. -// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. - -using System.IO.Pipelines; -using System.Threading; -using System.Threading.Tasks; -using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure; -using Moq; -using Xunit; - -namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests -{ - public class TimingPipeFlusherTests - { - [Fact] - public async Task IfFlushIsCalledAgainBeforeTheLastFlushCompletedItWaitsForTheLastCall() - { - var mockPipeWriter = new Mock<PipeWriter>(); - var pipeWriterFlushTcsArray = new[] { - new TaskCompletionSource<FlushResult>(TaskCreationOptions.RunContinuationsAsynchronously), - new TaskCompletionSource<FlushResult>(TaskCreationOptions.RunContinuationsAsynchronously), - new TaskCompletionSource<FlushResult>(TaskCreationOptions.RunContinuationsAsynchronously), - }; - var pipeWriterFlushCallCount = 0; - - mockPipeWriter.Setup(p => p.FlushAsync(CancellationToken.None)).Returns(() => - { - return new ValueTask<FlushResult>(pipeWriterFlushTcsArray[pipeWriterFlushCallCount++].Task); - }); - - var timingPipeFlusher = new TimingPipeFlusher(mockPipeWriter.Object, null, null); - - var flushTask0 = timingPipeFlusher.FlushAsync(); - var flushTask1 = timingPipeFlusher.FlushAsync(); - var flushTask2 = timingPipeFlusher.FlushAsync(); - - Assert.False(flushTask0.IsCompleted); - Assert.False(flushTask1.IsCompleted); - Assert.False(flushTask2.IsCompleted); - Assert.Equal(1, pipeWriterFlushCallCount); - - pipeWriterFlushTcsArray[0].SetResult(default); - await flushTask0.AsTask().DefaultTimeout(); - - Assert.True(flushTask0.IsCompleted); - Assert.False(flushTask1.IsCompleted); - Assert.False(flushTask2.IsCompleted); - Assert.True(pipeWriterFlushCallCount <= 2); - - pipeWriterFlushTcsArray[1].SetResult(default); - await flushTask1.AsTask().DefaultTimeout(); - - Assert.True(flushTask0.IsCompleted); - Assert.True(flushTask1.IsCompleted); - Assert.False(flushTask2.IsCompleted); - Assert.True(pipeWriterFlushCallCount <= 3); - - pipeWriterFlushTcsArray[2].SetResult(default); - await flushTask2.AsTask().DefaultTimeout(); - - Assert.True(flushTask0.IsCompleted); - Assert.True(flushTask1.IsCompleted); - Assert.True(flushTask2.IsCompleted); - Assert.Equal(3, pipeWriterFlushCallCount); - } - } -} diff --git a/src/Servers/Kestrel/Transport.Libuv/test/LibuvOutputConsumerTests.cs b/src/Servers/Kestrel/Transport.Libuv/test/LibuvOutputConsumerTests.cs index c295258b2a2..03c24d661aa 100644 --- a/src/Servers/Kestrel/Transport.Libuv/test/LibuvOutputConsumerTests.cs +++ b/src/Servers/Kestrel/Transport.Libuv/test/LibuvOutputConsumerTests.cs @@ -400,6 +400,8 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests await _libuvThread.PostAsync(cb => cb(LibuvConstants.ECONNRESET.Value), triggerNextCompleted); } + await task2Success.DefaultTimeout(); + // Second task is now completed Assert.True(task2Success.IsCompleted); Assert.False(task2Success.IsCanceled); @@ -578,6 +580,8 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests await _libuvThread.PostAsync(cb => cb(0), triggerNextCompleted); } + await task1Waits.DefaultTimeout(); + // First task is completed Assert.True(task1Waits.IsCompleted); Assert.False(task1Waits.IsCanceled); @@ -598,6 +602,8 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests await _libuvThread.PostAsync(cb => cb(0), triggerNextCompleted); } + await task3Success.DefaultTimeout(); + Assert.True(task3Success.IsCompleted); Assert.False(task3Success.IsCanceled); Assert.False(task3Success.IsFaulted);; @@ -760,7 +766,8 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests var processor = new LibuvOuputProcessor { ProcessingTask = outputTask, - OutputProducer = (Http1OutputProducer)http1Connection.Output + OutputProducer = (Http1OutputProducer)http1Connection.Output, + PipeWriter = pair.Transport.Output, }; return processor; @@ -769,11 +776,13 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests private class LibuvOuputProcessor { public Http1OutputProducer OutputProducer { get; set; } + public PipeWriter PipeWriter { get; set; } public Task ProcessingTask { get; set; } public async ValueTask DisposeAsync() { - OutputProducer.PipeWriter.Complete(); + OutputProducer.Dispose(); + PipeWriter.Complete(); await ProcessingTask; } diff --git a/src/Servers/Kestrel/shared/test/TaskTimeoutExtensions.cs b/src/Servers/Kestrel/shared/test/TaskTimeoutExtensions.cs index 68010b090a1..418769c4ea0 100644 --- a/src/Servers/Kestrel/shared/test/TaskTimeoutExtensions.cs +++ b/src/Servers/Kestrel/shared/test/TaskTimeoutExtensions.cs @@ -12,7 +12,7 @@ namespace System.Threading.Tasks return task.AsTask().TimeoutAfter(TestConstants.DefaultTimeout); } - public static Task DefaultTimeout<T>(this ValueTask task) + public static Task DefaultTimeout(this ValueTask task) { return task.AsTask().TimeoutAfter(TestConstants.DefaultTimeout); } diff --git a/src/Servers/Kestrel/test/InMemory.FunctionalTests/Http2/Http2TimeoutTests.cs b/src/Servers/Kestrel/test/InMemory.FunctionalTests/Http2/Http2TimeoutTests.cs index a52db473bc7..90a84bea2e7 100644 --- a/src/Servers/Kestrel/test/InMemory.FunctionalTests/Http2/Http2TimeoutTests.cs +++ b/src/Servers/Kestrel/test/InMemory.FunctionalTests/Http2/Http2TimeoutTests.cs @@ -308,11 +308,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests withFlags: (byte)Http2DataFrameFlags.NONE, withStreamId: 1); - await WaitForConnectionErrorAsync<ConnectionAbortedException>( - ignoreNonGoAwayFrames: false, - expectedLastStreamId: int.MaxValue, - Http2ErrorCode.INTERNAL_ERROR, - null); + Assert.True((await _pair.Application.Input.ReadAsync().AsTask().DefaultTimeout()).IsCompleted); _mockConnectionContext.Verify(c => c.Abort(It.Is<ConnectionAbortedException>(e => e.Message == CoreStrings.ConnectionTimedBecauseResponseMininumDataRateNotSatisfied)), Times.Once); @@ -366,11 +362,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests withFlags: (byte)Http2DataFrameFlags.NONE, withStreamId: 1); - await WaitForConnectionErrorAsync<ConnectionAbortedException>( - ignoreNonGoAwayFrames: false, - expectedLastStreamId: int.MaxValue, - Http2ErrorCode.INTERNAL_ERROR, - null); + Assert.True((await _pair.Application.Input.ReadAsync().AsTask().DefaultTimeout()).IsCompleted); _mockConnectionContext.Verify(c => c.Abort(It.Is<ConnectionAbortedException>(e => e.Message == CoreStrings.ConnectionTimedBecauseResponseMininumDataRateNotSatisfied)), Times.Once); -- GitLab