Skip to content

Commit

Permalink
Add a fallback to filling buffers in DeserializeAsyncEnumerable. (dot…
Browse files Browse the repository at this point in the history
…net#104635)

Co-authored-by: Eirik George Tsarpalis <eitsarpa@microsoft.com>
  • Loading branch information
eiriktsarpalis and Eirik George Tsarpalis committed Jul 10, 2024
1 parent 7029008 commit 7946e6f
Show file tree
Hide file tree
Showing 2 changed files with 110 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,27 @@ internal struct ReadBufferState : IDisposable
private bool _isFirstBlock;
private bool _isFinalBlock;

// An "unsuccessful read" in this context refers to a buffer read operation that
// wasn't sufficient to advance the reader to the next token. This occurs primarily
// when consuming large JSON strings (which don't support streaming today) but is
// also possible with other token types such as numbers, booleans, or nulls.
//
// The JsonSerializer.DeserializeAsyncEnumerable methods employ a special buffering
// strategy where rather than attempting to fill the entire buffer, the deserializer
// will be invoked as soon as the first chunk of data is read from the stream.
// This is to ensure liveness: data should be surfaced on the IAE as soon as they
// are streamed from the server. On the other hand, this can create performance
// problems in cases where the underlying stream uses extremely fine-grained buffering.
// For this reason, we employ a threshold that will revert to buffer filling once crossed.
// The counter is reset to zero whenever the JSON reader has been advanced successfully.
//
// The threshold is set to 5 unsuccessful reads. This is a relatively conservative threshold
// but should still make fallback unlikely in most scenaria. It should ensure that fallback
// isn't triggered in null or boolean tokens even in the worst-case scenario where they are
// streamed one byte at a time.
private const int UnsuccessfulReadCountThreshold = 5;
private int _unsuccessfulReadCount;

public ReadBufferState(int initialBufferSize)
{
_buffer = ArrayPool<byte>.Shared.Rent(Math.Max(initialBufferSize, JsonConstants.Utf8Bom.Length));
Expand All @@ -46,6 +67,7 @@ public readonly async ValueTask<ReadBufferState> ReadFromStreamAsync(
// make all updates on a copy which is returned once complete.
ReadBufferState bufferState = this;

int minBufferCount = fillBuffer || _unsuccessfulReadCount > UnsuccessfulReadCountThreshold ? bufferState._buffer.Length : 0;
do
{
int bytesRead = await utf8Json.ReadAsync(
Expand All @@ -64,7 +86,7 @@ public readonly async ValueTask<ReadBufferState> ReadFromStreamAsync(

bufferState._count += bytesRead;
}
while (fillBuffer && bufferState._count < bufferState._buffer.Length);
while (bufferState._count < minBufferCount);

bufferState.ProcessReadBytes();
return bufferState;
Expand Down Expand Up @@ -106,6 +128,7 @@ public void AdvanceBuffer(int bytesConsumed)
{
Debug.Assert(bytesConsumed <= _count);

_unsuccessfulReadCount = bytesConsumed == 0 ? _unsuccessfulReadCount + 1 : 0;
_count -= bytesConsumed;

if (!_isFinalBlock)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
// The .NET Foundation licenses this file to you under the MIT license.

using System.Collections.Generic;
using System.Diagnostics;
using System.Globalization;
using System.IO;
using System.Linq;
using System.Text.Json.Serialization.Metadata;
Expand Down Expand Up @@ -333,6 +335,47 @@ await Assert.ThrowsAsync<TaskCanceledException>(async () =>
});
}

[Theory]
[InlineData(5, 1024)]
[InlineData(5, 1024 * 1024)]
public static async Task DeserializeAsyncEnumerable_SlowStreamWithLargeStrings(int totalStrings, int stringLength)
{
var options = new JsonSerializerOptions
{
Converters = { new StringLengthConverter() }
};

using var stream = new SlowStream(GenerateJsonCharacters());
string expectedElement = stringLength.ToString(CultureInfo.InvariantCulture);
IAsyncEnumerable<string?> asyncEnumerable = JsonSerializer.DeserializeAsyncEnumerable<string>(stream, options);

await foreach (string? value in asyncEnumerable)
{
Assert.Equal(expectedElement, value);
}

IEnumerable<byte> GenerateJsonCharacters()
{
// ["xxx...x","xxx...x",...,"xxx...x"]
yield return (byte)'[';
for (int i = 0; i < totalStrings; i++)
{
yield return (byte)'"';
for (int j = 0; j < stringLength; j++)
{
yield return (byte)'x';
}
yield return (byte)'"';

if (i < totalStrings - 1)
{
yield return (byte)',';
}
}
yield return (byte)']';
}
}

public static IEnumerable<object[]> GetAsyncEnumerableSources()
{
yield return WrapArgs(Enumerable.Empty<int>(), 1, DeserializeAsyncEnumerableOverload.JsonSerializerOptions);
Expand Down Expand Up @@ -381,5 +424,48 @@ private static async Task<List<T>> ToListAsync<T>(this IAsyncEnumerable<T> sourc
}
return list;
}

private sealed class SlowStream(IEnumerable<byte> byteSource) : Stream, IDisposable
{
private readonly IEnumerator<byte> _enumerator = byteSource.GetEnumerator();
private long _position;

public override bool CanRead => true;
public override int Read(byte[] buffer, int offset, int count)
{
Debug.Assert(buffer != null);
Debug.Assert(offset >= 0 && count <= buffer.Length - offset);

if (count == 0 || !_enumerator.MoveNext())
{
return 0;
}

_position++;
buffer[offset] = _enumerator.Current;
return 1;
}

public override bool CanSeek => false;
public override bool CanWrite => false;
public override long Position { get => _position; set => throw new NotSupportedException(); }
public override long Length => throw new NotSupportedException();
public override void Flush() => throw new NotSupportedException();
public override long Seek(long offset, SeekOrigin origin) => throw new NotSupportedException();
public override void SetLength(long value) => throw new NotSupportedException();
public override void Write(byte[] buffer, int offset, int count) => throw new NotSupportedException();
void IDisposable.Dispose() => _enumerator.Dispose();
}

private sealed class StringLengthConverter : JsonConverter<string>
{
public override string Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options)
{
Debug.Assert(!reader.ValueIsEscaped && !reader.HasValueSequence);
return reader.ValueSpan.Length.ToString(CultureInfo.InvariantCulture);
}

public override void Write(Utf8JsonWriter writer, string value, JsonSerializerOptions options) => throw new NotImplementedException();
}
}
}

0 comments on commit 7946e6f

Please sign in to comment.