diff --git a/src/libraries/System.Text.Json/src/System/Text/Json/Serialization/ReadBufferState.cs b/src/libraries/System.Text.Json/src/System/Text/Json/Serialization/ReadBufferState.cs index 56f8d9e720721..c218fd5b7bca2 100644 --- a/src/libraries/System.Text.Json/src/System/Text/Json/Serialization/ReadBufferState.cs +++ b/src/libraries/System.Text.Json/src/System/Text/Json/Serialization/ReadBufferState.cs @@ -20,6 +20,27 @@ internal struct ReadBufferState : IDisposable private bool _isFirstBlock; private bool _isFinalBlock; + // An "unsuccessful read" in this context refers to a buffer read operation that + // wasn't sufficient to advance the reader to the next token. This occurs primarily + // when consuming large JSON strings (which don't support streaming today) but is + // also possible with other token types such as numbers, booleans, or nulls. + // + // The JsonSerializer.DeserializeAsyncEnumerable methods employ a special buffering + // strategy where rather than attempting to fill the entire buffer, the deserializer + // will be invoked as soon as the first chunk of data is read from the stream. + // This is to ensure liveness: data should be surfaced on the IAE as soon as they + // are streamed from the server. On the other hand, this can create performance + // problems in cases where the underlying stream uses extremely fine-grained buffering. + // For this reason, we employ a threshold that will revert to buffer filling once crossed. + // The counter is reset to zero whenever the JSON reader has been advanced successfully. + // + // The threshold is set to 5 unsuccessful reads. This is a relatively conservative threshold + // but should still make fallback unlikely in most scenaria. It should ensure that fallback + // isn't triggered in null or boolean tokens even in the worst-case scenario where they are + // streamed one byte at a time. + private const int UnsuccessfulReadCountThreshold = 5; + private int _unsuccessfulReadCount; + public ReadBufferState(int initialBufferSize) { _buffer = ArrayPool.Shared.Rent(Math.Max(initialBufferSize, JsonConstants.Utf8Bom.Length)); @@ -46,6 +67,7 @@ public readonly async ValueTask ReadFromStreamAsync( // make all updates on a copy which is returned once complete. ReadBufferState bufferState = this; + int minBufferCount = fillBuffer || _unsuccessfulReadCount > UnsuccessfulReadCountThreshold ? bufferState._buffer.Length : 0; do { int bytesRead = await utf8Json.ReadAsync( @@ -64,7 +86,7 @@ public readonly async ValueTask ReadFromStreamAsync( bufferState._count += bytesRead; } - while (fillBuffer && bufferState._count < bufferState._buffer.Length); + while (bufferState._count < minBufferCount); bufferState.ProcessReadBytes(); return bufferState; @@ -106,6 +128,7 @@ public void AdvanceBuffer(int bytesConsumed) { Debug.Assert(bytesConsumed <= _count); + _unsuccessfulReadCount = bytesConsumed == 0 ? _unsuccessfulReadCount + 1 : 0; _count -= bytesConsumed; if (!_isFinalBlock) diff --git a/src/libraries/System.Text.Json/tests/System.Text.Json.Tests/Serialization/Stream.DeserializeAsyncEnumerable.cs b/src/libraries/System.Text.Json/tests/System.Text.Json.Tests/Serialization/Stream.DeserializeAsyncEnumerable.cs index d9da4a9e57222..23c7c5659f848 100644 --- a/src/libraries/System.Text.Json/tests/System.Text.Json.Tests/Serialization/Stream.DeserializeAsyncEnumerable.cs +++ b/src/libraries/System.Text.Json/tests/System.Text.Json.Tests/Serialization/Stream.DeserializeAsyncEnumerable.cs @@ -2,6 +2,8 @@ // The .NET Foundation licenses this file to you under the MIT license. using System.Collections.Generic; +using System.Diagnostics; +using System.Globalization; using System.IO; using System.Linq; using System.Text.Json.Serialization.Metadata; @@ -333,6 +335,47 @@ await Assert.ThrowsAsync(async () => }); } + [Theory] + [InlineData(5, 1024)] + [InlineData(5, 1024 * 1024)] + public static async Task DeserializeAsyncEnumerable_SlowStreamWithLargeStrings(int totalStrings, int stringLength) + { + var options = new JsonSerializerOptions + { + Converters = { new StringLengthConverter() } + }; + + using var stream = new SlowStream(GenerateJsonCharacters()); + string expectedElement = stringLength.ToString(CultureInfo.InvariantCulture); + IAsyncEnumerable asyncEnumerable = JsonSerializer.DeserializeAsyncEnumerable(stream, options); + + await foreach (string? value in asyncEnumerable) + { + Assert.Equal(expectedElement, value); + } + + IEnumerable GenerateJsonCharacters() + { + // ["xxx...x","xxx...x",...,"xxx...x"] + yield return (byte)'['; + for (int i = 0; i < totalStrings; i++) + { + yield return (byte)'"'; + for (int j = 0; j < stringLength; j++) + { + yield return (byte)'x'; + } + yield return (byte)'"'; + + if (i < totalStrings - 1) + { + yield return (byte)','; + } + } + yield return (byte)']'; + } + } + public static IEnumerable GetAsyncEnumerableSources() { yield return WrapArgs(Enumerable.Empty(), 1, DeserializeAsyncEnumerableOverload.JsonSerializerOptions); @@ -381,5 +424,48 @@ private static async Task> ToListAsync(this IAsyncEnumerable sourc } return list; } + + private sealed class SlowStream(IEnumerable byteSource) : Stream, IDisposable + { + private readonly IEnumerator _enumerator = byteSource.GetEnumerator(); + private long _position; + + public override bool CanRead => true; + public override int Read(byte[] buffer, int offset, int count) + { + Debug.Assert(buffer != null); + Debug.Assert(offset >= 0 && count <= buffer.Length - offset); + + if (count == 0 || !_enumerator.MoveNext()) + { + return 0; + } + + _position++; + buffer[offset] = _enumerator.Current; + return 1; + } + + public override bool CanSeek => false; + public override bool CanWrite => false; + public override long Position { get => _position; set => throw new NotSupportedException(); } + public override long Length => throw new NotSupportedException(); + public override void Flush() => throw new NotSupportedException(); + public override long Seek(long offset, SeekOrigin origin) => throw new NotSupportedException(); + public override void SetLength(long value) => throw new NotSupportedException(); + public override void Write(byte[] buffer, int offset, int count) => throw new NotSupportedException(); + void IDisposable.Dispose() => _enumerator.Dispose(); + } + + private sealed class StringLengthConverter : JsonConverter + { + public override string Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options) + { + Debug.Assert(!reader.ValueIsEscaped && !reader.HasValueSequence); + return reader.ValueSpan.Length.ToString(CultureInfo.InvariantCulture); + } + + public override void Write(Utf8JsonWriter writer, string value, JsonSerializerOptions options) => throw new NotImplementedException(); + } } }