Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consider removing Stream's semaphore #45089

Open
Tracked by #64596
stephentoub opened this issue Nov 23, 2020 · 12 comments
Open
Tracked by #64596

Consider removing Stream's semaphore #45089

stephentoub opened this issue Nov 23, 2020 · 12 comments
Milestone

Comments

@stephentoub
Copy link
Member

The sole field remaining in the base Stream class is a semaphore used to serlalize any Read/WriteAsync calls issued to run concurrently on the instance. This behavior is an holdover from the original behavior similarly employed by BeginRead/Write that dates back to their original introduction.

On the one hand, it's dubious to use such APIs on Stream concurrently when the Stream doesn't guarantee this behavior is supported, and it'd be nice to be able to remove the semaphore, not only to reduce the size of Stream but also to reduce the complexity of the code that uses it (BufferedStream also relies on this, and some other streams have their own variant of this, e.g. PipeStream). It's also problematic for Streams that do support concurrent reads and writes but that don't override the async methods, as in #36481, though the right solution there is to override and not just rely on the default async-over-sync implementations.)

On the other hand, some streams do explicitly support reading/writing concurrently, e.g. NetworkStream, SslStream, etc., and so it's reasonable someone might assume all streams could be used in this fashion, and this semaphore protects against that. In fact, some streams like FileStream have multiple modes, one of which does support concurrent operations, and the base implementation which is then inherited into the other mode provides some consistency/protection. It's also possible/likely someone somewhere is relying on the serialization this provides, e.g. calling WriteAsync multiple times without waiting on the previous one to complete, knowing the base implementation will handle queueing the operations for them.

We should consider removing the semaphore behavior, at least by default, potentially with a quirk to put it back, at least for a release or two. We could keep the base field to support such a quirk, or alternatively to avoid the extra state in every Stream instance, employ a ConditionalWeakTable to only utilize the semaphore functionality if it's really needed.

@stephentoub stephentoub added this to the 6.0.0 milestone Nov 23, 2020
@Dotnet-GitSync-Bot Dotnet-GitSync-Bot added the untriaged New issue has not been triaged by the area owner label Nov 23, 2020
@geoffkizer
Copy link
Contributor

It never really makes sense to try to do multiple Reads or Writes on a Stream concurrently.

Even the places you cite where we do support this are questionable, in my opinion. We are just blocking at some lower layer in order to guarantee some sort of arbitrary ordering between the concurrent operations. And even then, it's not as reliable as advertised -- see e.g. #44422.

I vote to get rid of this.

@stephentoub
Copy link
Member Author

Even the places you cite where we do support this are questionable, in my opinion.

Sure, we would never add this if we were starting over today. It's a question of compat.

@stephentoub
Copy link
Member Author

@jkotas, do you have any opinions on this? There's an obvious compat risk associated with it, but it's also generally unnecessary and surprising baggage we're carrying around.

@jkotas
Copy link
Member

jkotas commented May 24, 2021

I vote to get rid of this.

+1

@stephentoub
Copy link
Member Author

I've been waffling back and forth between whether to have a quirk for this or not. On the one hand, I'd really like to be able to delete all this cruft and shrink the size of Stream by removing the field. On the other hand, I do have nagging concerns about the compatibility impact, and whether for .NET 6 we should have an opt-in for the serialization and then in .NET 7 rip it all out. Opinions? It's also possible I'm making a mountain out of a mole hill and no one will miss the (poor) behavior.

@jkotas
Copy link
Member

jkotas commented May 27, 2021

My default recomendation is to be reactive with adding quirks, not proactive. We can even add the quirk in servicing if we need to.

@stephentoub
Copy link
Member Author

Ok. I'll be bold ;-)

@davidfowl
Copy link
Member

Delete the locks !

@ghost ghost added the in-pr There is an active PR which will close this issue when it is merged label May 27, 2021
@ghost ghost removed the in-pr There is an active PR which will close this issue when it is merged label Jun 30, 2021
@adamsitnik adamsitnik modified the milestones: 6.0.0, 7.0.0 Jul 22, 2021
@LBensman
Copy link

@stephentoub , thank you for your attempt to address this issue, but as I understand, it didn't make it to 6.0 release but is a candidate for 7.0? If so, is there a candidate PR for 7.0 that I can monitor? (Or will it just be reflected on this issue's thread?)

@stephentoub
Copy link
Member Author

@LBensman, correct, there's no change around this in 6.0. We might still address it for 7.0 but there's no PR for it (it would look similar to the one I put up for 6.0 but ultimately closed). Any updates would be noted in this issue.

@geoffkizer
Copy link
Contributor

@LBensman Do you have a scenario that would benefit from this? If so, understanding that would help prioritize this change.

@LBensman
Copy link

LBensman commented Nov 2, 2021

@geoffkizer , yes, I do have scenario as I'm the one who originally discovered and submitted the issue in #36481 and I've discovered it while developing and debugging our project code.

The basic principle behind the scenario is a case of being a blind proxy between two communication endpoints. Our product, where I discovered the issue, bridges various devtool clients and remote targets (so sort of like analogous to a remote debugging) that under typical cases would otherwise be local. To do that, we have code that accepts connections locally and proxies the data over cloud to remote target.

Such proxied connections are first handled by us for authentication, decryption, target selection and other such activity, then we initiate a connection to the target and perform w/e handshake, if any, is necessary there, and once such handling is done, the connections are then treated as blind connections and allow traffic to flow from client to target and back, using w/e protocol that client and target need to talk to. It could be that client opening connection to device to perform a process debugging session, or to push/pull a file to/from device, or to connect to some custom service on a target device -- we're oblivious to the higher-level protocol that's flowing on the connection.

So once our handshake is done and connection from our perspective enters a blind proxy mode, we need to be pushing bytes between the two streams in both directions, without any particular knowledge which end is likely to first produce bytes or, really, at any point who will originate bytes when that need to be proxied to the other connection. E.g. if it's HTTP protocol, it's reasonable to expect that at least client will write first (though there are rare tools that do that in reverse), but in case of telnet or FTP protocol, it's the server that initiates the conversation with a greeting, iirc. The protocols are not important in itself: what's important here is that I simultaneously need to issue a ReadAsync() on both streams and whichever stream first produces bytes from that read, I need to issue WriteAsync() to the other stream. And that's where the problem comes up, as described: pending ReadAsync() prevents WriteAsync() from actually sending bytes, and thus deadlock occurs because that endpoint is waiting first to receive some bytes before issuing write on its end that would satisfy the pending read on my end.

So this code below doesn't work and is subject to deadlocking.

await Task.WhenAll(streamA.CopyToAsync(streamB), streamB.CopyToAsync(streamA));

It seems like a perfectly innocent code that ought to work, but it doesn't as described in the original post and above.

As a workaround, I wound up with creating a library with the following two functions, but these are a) afterall workarounds, and b) aren't perfect with drawbacks. The two functions are (code minor adjusted and contains internal functions with evident purposes):

        /// <summary>
        /// Copies data synchronously between two streams in both directions.
        /// </summary>
        /// <param name="streamA">First stream to be copied between.</param>
        /// <param name="streamB">Second stream to be copied between.</param>
        /// <remarks>
        /// <para>
        /// This method is the safest to use compared to
        /// <see cref="CopyBetweenAsync"/>, but is least efficient and is
        /// not suitable for use from async methods since it's a blocking call.
        /// See https://github.com/dotnet/runtime/issues/36481 for details.
        /// </para>
        /// <para>
        /// This method creates a new thread that is used to copy data from
        /// <paramref name="streamB"/> to <paramref name="streamA"/>, while
        /// the calling thread is used to copy data from
        /// <paramref name="streamA"/> to <paramref name="streamB"/>.
        /// The new thread is terminated once all data is copied or if an
        /// exception occurs.
        /// </para>
        /// <para>
        /// If an exception occurs while copying data in either direction, both
        /// <paramref name="streamA"/> and <paramref name="streamB"/> are
        /// disposed prior to this method completing.
        /// </para>
        /// </remarks>
        /// <exception cref="AggregateException">
        /// Contains original exception that was thrown either by
        /// <see cref="Stream.CopyTo(Stream)"/> or by
        /// <see cref="Thread"/> or by <see cref="Thread.Start()"/>.
        /// </exception>
        public static void CopyBetween(this Stream streamA, Stream streamB)
        {
            Thread t = null;
            Exception copyAbException = null, copyBaException = null;

            try
            {
                t = new Thread(() =>
                {
                    try
                    {
                        streamB.CopyTo(streamA);
                    }
                    catch (Exception e)
                    {
                        copyBaException = e;
                        streamA.DisposeSilently();
                        streamB.DisposeSilently();
                    }
                });

                t.Start();

                streamA.CopyTo(streamB);
            }
            catch (Exception e)
            {
                copyAbException = e;
                streamA.DisposeSilently();
                streamB.DisposeSilently();
            }
            finally
            {
                t?.Join();
                if (copyAbException != null || copyBaException != null)
                {
                    // Don't rethrow original exception as it'll clobber its
                    // stack record.  Instead throw a wrapper and preserve the
                    // original exception intact.
                    throw new AggregateException(
                        $"One or more exceptions occured during stream {nameof(CopyBetween)} operation.",
                        new[] {copyAbException, copyBaException}.Where(e => e != null));
                }
            }
        }

        /// <summary>
        /// Potentially unsafe method.
        /// Copies data asynchronously between two streams in both directions.
        /// </summary>
        /// <param name="streamA">First stream to be copied between.</param>
        /// <param name="streamB">Second stream to be copied between.</param>
        /// <param name="cancel">Optional cancellation token.</param>
        /// <remarks>
        /// <para>
        /// Be cautious of using this method. Default implementations of
        /// <see cref="Stream.ReadAsync(byte[],int,int,CancellationToken)"/> and
        /// <see cref="Stream.WriteAsync(byte[],int,int,CancellationToken)"/> are half-duplex
        /// only, and calling both simultaneously, which is exactly what this
        /// function does, will only schedule one side copy task for execution,
        /// with the other pending the first one's completion. If data at that
        /// point is only expected to flow the other way, a deadlock will occur.
        /// See https://github.com/dotnet/runtime/issues/36481 for details.
        /// </para>
        /// <para>
        /// It is safe to use this function if implementations of <see cref="Stream"/> being passed to
        /// <paramref name="streamA"/> and to <paramref name="streamB"/>
        /// overwrite the default implementations of both methods and the
        /// override implementations do not call on base implementations to
        /// schedule the asynchronous operation.
        /// </para>
        /// <para>
        /// The function does attempt to detect if stream implementations
        /// passed to either stream parameter do not override the necessary
        /// methods, and if it detects noncompliance, it will throw
        /// <see cref="ArgumentException"/>. However, it is unable to detect
        /// the case when overrides are present, but override implementations
        /// still call into base <see cref="Stream"/> methods.
        /// </para>
        /// </remarks>
        /// <returns>
        /// <see cref="Task"/> that completes when tasks for both copy
        /// directions complete.
        /// </returns>
        public static async Task CopyBetweenAsync(this Stream streamA, Stream streamB,
            CancellationToken cancel = default)
        {
            EnsureArg.HasValue(nameof(streamA), streamA);
            EnsureArg.HasValue(nameof(streamB), streamB);

            // Handle SpecialStream, FileLoggedStream, LoggedStream,
            // and DisposingStream specifically since
            // those are generalized wrappers servicing underlying streams.

            Stream GetRealStream(Stream stream)
            {
                while (true)
                {
                    switch (stream)
                    {
                        case SpecialStream sStream:
                            stream = sStream.UnderlyingStream;
                            continue;
                        case LoggedStream loggedStream:
                            stream = loggedStream.UnderlyingStream;
                            continue;
                        case FileLoggedStream fileLoggedStream:
                            stream = fileLoggedStream.UnderlyingStream;
                            continue;
                        case DisposingStream disposingStream:
                            stream = disposingStream.UnderlyingStream;
                            continue;
                        default:
                            return stream;
                    }
                }
            }

            var realA = GetRealStream(streamA);
            var realB = GetRealStream(streamB);

            //TODO: Optimization possibility instead of examining type every time, cache in dictionary for quick lookup.
            var streamAReadBase = realA.GetType()
                .GetMethod(nameof(Stream.ReadAsync), new[] {typeof(byte[]), typeof(int), typeof(int), typeof(CancellationToken)})
                .IsOverride();
            var streamAWriteBase = realA.GetType()
                .GetMethod(nameof(Stream.WriteAsync), new[] {typeof(byte[]), typeof(int), typeof(int), typeof(CancellationToken)})
                .IsOverride();
            var streamBReadBase = realB.GetType()
                .GetMethod(nameof(Stream.ReadAsync), new[] {typeof(byte[]), typeof(int), typeof(int), typeof(CancellationToken)})
                .IsOverride();
            var streamBWriteBase = realB.GetType()
                .GetMethod(nameof(Stream.WriteAsync), new[] {typeof(byte[]), typeof(int), typeof(int), typeof(CancellationToken)})
                .IsOverride();

            if (!streamAReadBase)
            {
                throw new ArgumentException(
                    $"Type {realA.GetType().Name} passed in as parameter does not override {nameof(Stream.ReadAsync)} method. See help for {nameof(CopyBetweenAsync)} method for details.",
                    nameof(streamA));
            }

            if (!streamAWriteBase)
            {
                throw new ArgumentException(
                    $"Type {realA.GetType().Name} passed in parameter does not override {nameof(Stream.WriteAsync)} method. See help for {nameof(CopyBetweenAsync)} method for details.",
                    nameof(streamA));
            }
            if (!streamBReadBase)
            {
                throw new ArgumentException(
                    $"Type {realB.GetType().Name} passed in parameter does not override {nameof(Stream.ReadAsync)} method. See help for {nameof(CopyBetweenAsync)} method for details.",
                    nameof(streamB));
            }

            if (!streamBWriteBase)
            {
                throw new ArgumentException(
                    $"Type {realB.GetType().Name} passed in parameter does not override {nameof(Stream.WriteAsync)} method. See help for {nameof(CopyBetweenAsync)} method for details.",
                    nameof(streamB));
            }

            // TODO: It really should be WhenAll, because technically
            // the proxying should stop once both ends of connection have sent
            // EOF and closed their write endpoints, not when just one signaled
            // the end. So WhenAny _seems_ quite wrong and is probably masking
            // some other issue somewhere. But apparently
            // <SomeType.SomeProperty> was using WhenAny to make it work
            // and not WhenAll, so issue been probably for awhile. Could
            // possibly be due to some of our code closing streams in both
            // direction when they detect one direction closure, though not sure
            // if that really explains it.
            await Task.WhenAny(
               streamA.CopyToAsync(streamB, cancel),
               streamB.CopyToAsync(streamA, cancel));
        }

The drawback of the first one is that it launches a dedicated thread every time -- expensive.
The drawback of the second one is that it attempts to recognize stream types in our code base, but it's a) not future-proof; b) doesn't handle wrapping streams (stream implementations that wrap an underlying stream) all that well, albeit attempting to; c) can still fail on streams that override implementation but overrides still call base methods for actual operations, thus subject to deadlock still.

So the scenario is pretty evident: These functions are big, a bit specific (due to knowing codebase's stream implementations), imperfect solutions. In fact, one member of the team was unaware of these functions and recently needed to proxy and wrote await Task.When... variant, and luckily I caught in code-review so these functions are used instead. But, obviously, this is not the expected solution for the scenario 😹 .

The scenario should be ability to use await Task.When___(a.CopyToAsync(b), b.CopyToAsync(a)); and for it to work like one would expect it to work.

Hope this helps.

@stephentoub stephentoub modified the milestones: 7.0.0, Future Jun 28, 2022
@stephentoub stephentoub self-assigned this Dec 26, 2023
@stephentoub stephentoub modified the milestones: Future, 9.0.0 Dec 26, 2023
@stephentoub stephentoub removed this from the 9.0.0 milestone Apr 13, 2024
@stephentoub stephentoub added this to the Future milestone Apr 13, 2024
@stephentoub stephentoub removed their assignment Jun 4, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

8 participants