Skip to content
This repository has been archived by the owner on Jan 23, 2023. It is now read-only.
/ corefx Public archive

Commit

Permalink
StreamPipeWriter: don't flush when the writer is completed with an ex…
Browse files Browse the repository at this point in the history
…ception. (#40874)

This provides a way to discard buffered data without pushing it to the inner Stream.
  • Loading branch information
tmds authored and davidfowl committed Oct 21, 2019
1 parent db17568 commit 03453d9
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ public override void Complete(Exception? exception = null)

_isCompleted = true;

FlushInternal();
FlushInternal(writeToStream: exception == null);

_internalTokenSource?.Dispose();

Expand All @@ -231,7 +231,7 @@ public override async ValueTask CompleteAsync(Exception? exception = null)

_isCompleted = true;

await FlushAsyncInternal().ConfigureAwait(false);
await FlushAsyncInternal(writeToStream: exception == null).ConfigureAwait(false);

_internalTokenSource?.Dispose();

Expand All @@ -253,15 +253,15 @@ public override ValueTask<FlushResult> FlushAsync(CancellationToken cancellation
return new ValueTask<FlushResult>(new FlushResult(isCanceled: false, isCompleted: false));
}

return FlushAsyncInternal(cancellationToken);
return FlushAsyncInternal(writeToStream: true, cancellationToken);
}

private void Cancel()
{
InternalTokenSource.Cancel();
}

private async ValueTask<FlushResult> FlushAsyncInternal(CancellationToken cancellationToken = default)
private async ValueTask<FlushResult> FlushAsyncInternal(bool writeToStream, CancellationToken cancellationToken = default)
{
// Write all completed segments and whatever remains in the current segment
// and flush the result.
Expand Down Expand Up @@ -291,7 +291,7 @@ private async ValueTask<FlushResult> FlushAsyncInternal(CancellationToken cancel
BufferSegment returnSegment = segment;
segment = segment.NextSegment;

if (returnSegment.Length > 0)
if (returnSegment.Length > 0 && writeToStream)
{
await InnerStream.WriteAsync(returnSegment.Memory, localToken).ConfigureAwait(false);
}
Expand All @@ -303,7 +303,7 @@ private async ValueTask<FlushResult> FlushAsyncInternal(CancellationToken cancel
_head = segment;
}

if (_bytesBuffered > 0)
if (_bytesBuffered > 0 && writeToStream)
{
await InnerStream.FlushAsync(localToken).ConfigureAwait(false);
}
Expand Down Expand Up @@ -335,7 +335,7 @@ private async ValueTask<FlushResult> FlushAsyncInternal(CancellationToken cancel
}
}

private void FlushInternal()
private void FlushInternal(bool writeToStream)
{
// Write all completed segments and whatever remains in the current segment
// and flush the result.
Expand All @@ -354,7 +354,7 @@ private void FlushInternal()
BufferSegment returnSegment = segment;
segment = segment.NextSegment;

if (returnSegment.Length > 0)
if (returnSegment.Length > 0 && writeToStream)
{
#if !NETSTANDARD2_0
InnerStream.Write(returnSegment.Memory.Span);
Expand All @@ -370,7 +370,7 @@ private void FlushInternal()
_head = segment;
}

if (_bytesBuffered > 0)
if (_bytesBuffered > 0 && writeToStream)
{
InnerStream.Flush();
}
Expand Down
34 changes: 34 additions & 0 deletions src/System.IO.Pipelines/tests/StreamPipeWriterTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,40 @@ public async Task DataFlushedOnCompleteAsync()
Assert.Equal("Hello World", Encoding.ASCII.GetString(stream.ToArray()));
}

[Fact]
public void DataNotFlushedOnCompleteWithException()
{
byte[] bytes = Encoding.ASCII.GetBytes("Hello World");
var stream = new MemoryStream();
PipeWriter writer = PipeWriter.Create(stream, new StreamPipeWriterOptions(leaveOpen: true));

bytes.AsSpan().CopyTo(writer.GetSpan(bytes.Length));
writer.Advance(bytes.Length);

Assert.Equal(0, stream.Length);

writer.Complete(new Exception());

Assert.Equal(0, stream.Length);
}

[Fact]
public async Task DataNotFlushedOnCompleteAsyncWithException()
{
byte[] bytes = Encoding.ASCII.GetBytes("Hello World");
var stream = new MemoryStream();
PipeWriter writer = PipeWriter.Create(stream, new StreamPipeWriterOptions(leaveOpen: true));

bytes.AsSpan().CopyTo(writer.GetSpan(bytes.Length));
writer.Advance(bytes.Length);

Assert.Equal(0, stream.Length);

await writer.CompleteAsync(new Exception());

Assert.Equal(0, stream.Length);
}

[Fact]
public async Task CompleteAsyncDoesNotThrowObjectDisposedException()
{
Expand Down

0 comments on commit 03453d9

Please sign in to comment.