diff --git a/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/MsQuicStream.cs b/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/MsQuicStream.cs index 616d83788a67c..a6c554b5231d0 100644 --- a/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/MsQuicStream.cs +++ b/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/MsQuicStream.cs @@ -595,9 +595,7 @@ internal override int Read(Span buffer) byte[] rentedBuffer = ArrayPool.Shared.Rent(buffer.Length); try { - Task t = ReadAsync(new Memory(rentedBuffer, 0, buffer.Length)).AsTask(); - ((IAsyncResult)t).AsyncWaitHandle.WaitOne(); - int readLength = t.GetAwaiter().GetResult(); + int readLength = ReadAsync(new Memory(rentedBuffer, 0, buffer.Length)).AsTask().GetAwaiter().GetResult(); rentedBuffer.AsSpan(0, readLength).CopyTo(buffer); return readLength; } @@ -612,9 +610,7 @@ internal override void Write(ReadOnlySpan buffer) ThrowIfDisposed(); // TODO: optimize this. - Task t = WriteAsync(buffer.ToArray()).AsTask(); - ((IAsyncResult)t).AsyncWaitHandle.WaitOne(); - t.GetAwaiter().GetResult(); + WriteAsync(buffer.ToArray()).AsTask().GetAwaiter().GetResult(); } // MsQuic doesn't support explicit flushing diff --git a/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.Blocking.cs b/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.Blocking.cs index 1ac8ecb60c91d..bf2b0d1d1cb3c 100644 --- a/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.Blocking.cs +++ b/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.Blocking.cs @@ -12,7 +12,7 @@ public short MinThreadsGoal get { _threadAdjustmentLock.VerifyIsLocked(); - return Math.Min(_separated.numThreadsGoal, TargetThreadsGoalForBlockingAdjustment); + return Math.Min(_separated.counts.NumThreadsGoal, TargetThreadsGoalForBlockingAdjustment); } } @@ -44,7 +44,7 @@ public bool NotifyThreadBlocked() Debug.Assert(_numBlockedThreads > 0); if (_pendingBlockingAdjustment != PendingBlockingAdjustment.WithDelayIfNecessary && - _separated.numThreadsGoal < TargetThreadsGoalForBlockingAdjustment) + _separated.counts.NumThreadsGoal < TargetThreadsGoalForBlockingAdjustment) { if (_pendingBlockingAdjustment == PendingBlockingAdjustment.None) { @@ -79,7 +79,7 @@ public void NotifyThreadUnblocked() if (_pendingBlockingAdjustment != PendingBlockingAdjustment.Immediately && _numThreadsAddedDueToBlocking > 0 && - _separated.numThreadsGoal > TargetThreadsGoalForBlockingAdjustment) + _separated.counts.NumThreadsGoal > TargetThreadsGoalForBlockingAdjustment) { wakeGateThread = true; _pendingBlockingAdjustment = PendingBlockingAdjustment.Immediately; @@ -126,7 +126,8 @@ private uint PerformBlockingAdjustment(bool previousDelayElapsed, out bool addWo addWorker = false; short targetThreadsGoal = TargetThreadsGoalForBlockingAdjustment; - short numThreadsGoal = _separated.numThreadsGoal; + ThreadCounts counts = _separated.counts; + short numThreadsGoal = counts.NumThreadsGoal; if (numThreadsGoal == targetThreadsGoal) { return 0; @@ -144,7 +145,8 @@ private uint PerformBlockingAdjustment(bool previousDelayElapsed, out bool addWo short toSubtract = Math.Min((short)(numThreadsGoal - targetThreadsGoal), _numThreadsAddedDueToBlocking); _numThreadsAddedDueToBlocking -= toSubtract; - _separated.numThreadsGoal = numThreadsGoal -= toSubtract; + numThreadsGoal -= toSubtract; + _separated.counts.InterlockedSetNumThreadsGoal(numThreadsGoal); HillClimbing.ThreadPoolHillClimber.ForceChange( numThreadsGoal, HillClimbing.StateOrTransition.CooperativeBlocking); @@ -158,7 +160,6 @@ private uint PerformBlockingAdjustment(bool previousDelayElapsed, out bool addWo { // Calculate how many threads can be added without a delay. Threads that were already created but may be just // waiting for work can be released for work without a delay, but creating a new thread may need a delay. - ThreadCounts counts = _separated.counts; short maxThreadsGoalWithoutDelay = Math.Max(configuredMaxThreadsWithoutDelay, Math.Min(counts.NumExistingThreads, _maxThreads)); short targetThreadsGoalWithoutDelay = Math.Min(targetThreadsGoal, maxThreadsGoalWithoutDelay); @@ -225,7 +226,7 @@ private uint PerformBlockingAdjustment(bool previousDelayElapsed, out bool addWo } while (false); _numThreadsAddedDueToBlocking += (short)(newNumThreadsGoal - numThreadsGoal); - _separated.numThreadsGoal = newNumThreadsGoal; + counts = _separated.counts.InterlockedSetNumThreadsGoal(newNumThreadsGoal); HillClimbing.ThreadPoolHillClimber.ForceChange( newNumThreadsGoal, HillClimbing.StateOrTransition.CooperativeBlocking); diff --git a/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.GateThread.cs b/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.GateThread.cs index fefe16da418eb..b28286d8ebe0e 100644 --- a/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.GateThread.cs +++ b/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.GateThread.cs @@ -126,20 +126,31 @@ private static void GateThreadStart() // of the number of existing threads, is compared with the goal. There may be alternative // solutions, for now this is only to maintain consistency in behavior. ThreadCounts counts = threadPoolInstance._separated.counts; - if (counts.NumProcessingWork < threadPoolInstance._maxThreads && - counts.NumProcessingWork >= threadPoolInstance._separated.numThreadsGoal) + while ( + counts.NumProcessingWork < threadPoolInstance._maxThreads && + counts.NumProcessingWork >= counts.NumThreadsGoal) { if (debuggerBreakOnWorkStarvation) { Debugger.Break(); } + ThreadCounts newCounts = counts; short newNumThreadsGoal = (short)(counts.NumProcessingWork + 1); - threadPoolInstance._separated.numThreadsGoal = newNumThreadsGoal; - HillClimbing.ThreadPoolHillClimber.ForceChange( - newNumThreadsGoal, - HillClimbing.StateOrTransition.Starvation); - addWorker = true; + newCounts.NumThreadsGoal = newNumThreadsGoal; + + ThreadCounts countsBeforeUpdate = + threadPoolInstance._separated.counts.InterlockedCompareExchange(newCounts, counts); + if (countsBeforeUpdate == counts) + { + HillClimbing.ThreadPoolHillClimber.ForceChange( + newNumThreadsGoal, + HillClimbing.StateOrTransition.Starvation); + addWorker = true; + break; + } + + counts = countsBeforeUpdate; } } finally @@ -183,7 +194,7 @@ private static bool SufficientDelaySinceLastDequeue(PortableThreadPool threadPoo } else { - minimumDelay = (uint)threadPoolInstance._separated.numThreadsGoal * DequeueDelayThresholdMs; + minimumDelay = (uint)threadPoolInstance._separated.counts.NumThreadsGoal * DequeueDelayThresholdMs; } return delay > minimumDelay; diff --git a/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.ThreadCounts.cs b/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.ThreadCounts.cs index d4673f5cd7329..43aa0fcf7102c 100644 --- a/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.ThreadCounts.cs +++ b/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.ThreadCounts.cs @@ -16,14 +16,15 @@ private struct ThreadCounts // SOS's ThreadPool command depends on this layout private const byte NumProcessingWorkShift = 0; private const byte NumExistingThreadsShift = 16; + private const byte NumThreadsGoalShift = 32; - private uint _data; // SOS's ThreadPool command depends on this name + private ulong _data; // SOS's ThreadPool command depends on this name - private ThreadCounts(uint data) => _data = data; + private ThreadCounts(ulong data) => _data = data; private short GetInt16Value(byte shift) => (short)(_data >> shift); private void SetInt16Value(short value, byte shift) => - _data = (_data & ~((uint)ushort.MaxValue << shift)) | ((uint)(ushort)value << shift); + _data = (_data & ~((ulong)ushort.MaxValue << shift)) | ((ulong)(ushort)value << shift); /// /// Number of threads processing work items. @@ -43,7 +44,7 @@ public void SubtractNumProcessingWork(short value) Debug.Assert(value >= 0); Debug.Assert(value <= NumProcessingWork); - _data -= (uint)(ushort)value << NumProcessingWorkShift; + _data -= (ulong)(ushort)value << NumProcessingWorkShift; } public void InterlockedDecrementNumProcessingWork() @@ -72,19 +73,61 @@ public void SubtractNumExistingThreads(short value) Debug.Assert(value >= 0); Debug.Assert(value <= NumExistingThreads); - _data -= (uint)(ushort)value << NumExistingThreadsShift; + _data -= (ulong)(ushort)value << NumExistingThreadsShift; + } + + /// + /// Max possible thread pool threads we want to have. + /// + public short NumThreadsGoal + { + get => GetInt16Value(NumThreadsGoalShift); + set + { + Debug.Assert(value > 0); + SetInt16Value(value, NumThreadsGoalShift); + } + } + + public ThreadCounts InterlockedSetNumThreadsGoal(short value) + { + ThreadPoolInstance._threadAdjustmentLock.VerifyIsLocked(); + + ThreadCounts counts = this; + while (true) + { + ThreadCounts newCounts = counts; + newCounts.NumThreadsGoal = value; + + ThreadCounts countsBeforeUpdate = InterlockedCompareExchange(newCounts, counts); + if (countsBeforeUpdate == counts) + { + return newCounts; + } + + counts = countsBeforeUpdate; + } } public ThreadCounts VolatileRead() => new ThreadCounts(Volatile.Read(ref _data)); - public ThreadCounts InterlockedCompareExchange(ThreadCounts newCounts, ThreadCounts oldCounts) => - new ThreadCounts(Interlocked.CompareExchange(ref _data, newCounts._data, oldCounts._data)); + public ThreadCounts InterlockedCompareExchange(ThreadCounts newCounts, ThreadCounts oldCounts) + { +#if DEBUG + if (newCounts.NumThreadsGoal != oldCounts.NumThreadsGoal) + { + ThreadPoolInstance._threadAdjustmentLock.VerifyIsLocked(); + } +#endif + + return new ThreadCounts(Interlocked.CompareExchange(ref _data, newCounts._data, oldCounts._data)); + } public static bool operator ==(ThreadCounts lhs, ThreadCounts rhs) => lhs._data == rhs._data; public static bool operator !=(ThreadCounts lhs, ThreadCounts rhs) => lhs._data != rhs._data; public override bool Equals([NotNullWhen(true)] object? obj) => obj is ThreadCounts other && _data == other._data; - public override int GetHashCode() => (int)_data; + public override int GetHashCode() => (int)_data + (int)(_data >> 32); } } } diff --git a/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.WorkerThread.cs b/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.WorkerThread.cs index 8b1a44946d10f..1298d1be4121d 100644 --- a/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.WorkerThread.cs +++ b/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.WorkerThread.cs @@ -124,22 +124,19 @@ private static void WorkerThreadStart() ThreadCounts newCounts = counts; newCounts.SubtractNumExistingThreads(1); short newNumExistingThreads = (short)(numExistingThreads - 1); - - ThreadCounts oldCounts = threadPoolInstance._separated.counts.InterlockedCompareExchange(newCounts, counts); + short newNumThreadsGoal = + Math.Max( + threadPoolInstance.MinThreadsGoal, + Math.Min(newNumExistingThreads, counts.NumThreadsGoal)); + newCounts.NumThreadsGoal = newNumThreadsGoal; + + ThreadCounts oldCounts = + threadPoolInstance._separated.counts.InterlockedCompareExchange(newCounts, counts); if (oldCounts == counts) { - short newNumThreadsGoal = - Math.Max( - threadPoolInstance.MinThreadsGoal, - Math.Min(newNumExistingThreads, threadPoolInstance._separated.numThreadsGoal)); - if (threadPoolInstance._separated.numThreadsGoal != newNumThreadsGoal) - { - threadPoolInstance._separated.numThreadsGoal = newNumThreadsGoal; - HillClimbing.ThreadPoolHillClimber.ForceChange( - newNumThreadsGoal, - HillClimbing.StateOrTransition.ThreadTimedOut); - } - + HillClimbing.ThreadPoolHillClimber.ForceChange( + newNumThreadsGoal, + HillClimbing.StateOrTransition.ThreadTimedOut); if (NativeRuntimeEventSource.Log.IsEnabled()) { NativeRuntimeEventSource.Log.ThreadPoolWorkerThreadStop((uint)newNumExistingThreads); @@ -181,7 +178,7 @@ internal static void MaybeAddWorkingWorker(PortableThreadPool threadPoolInstance while (true) { numProcessingWork = counts.NumProcessingWork; - if (numProcessingWork >= threadPoolInstance._separated.numThreadsGoal) + if (numProcessingWork >= counts.NumThreadsGoal) { return; } @@ -256,7 +253,7 @@ internal static bool ShouldStopProcessingWorkNow(PortableThreadPool threadPoolIn // code from which this implementation was ported, which turns a processing thread into a retired thread // and checks for pending requests like RemoveWorkingWorker. In this implementation there are // no retired threads, so only the count of threads processing work is considered. - if (counts.NumProcessingWork <= threadPoolInstance._separated.numThreadsGoal) + if (counts.NumProcessingWork <= counts.NumThreadsGoal) { return false; } diff --git a/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.cs b/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.cs index 8c6163fa2f7d2..a425298cacd8e 100644 --- a/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.cs +++ b/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.cs @@ -51,8 +51,6 @@ private struct CacheLineSeparated { [FieldOffset(Internal.PaddingHelpers.CACHE_LINE_SIZE * 1)] public ThreadCounts counts; // SOS's ThreadPool command depends on this name - [FieldOffset(Internal.PaddingHelpers.CACHE_LINE_SIZE * 1 + sizeof(uint))] - public short numThreadsGoal; [FieldOffset(Internal.PaddingHelpers.CACHE_LINE_SIZE * 2)] public int lastDequeueTime; @@ -103,7 +101,7 @@ private PortableThreadPool() _maxThreads = _minThreads; } - _separated.numThreadsGoal = _minThreads; + _separated.counts.NumThreadsGoal = _minThreads; } public bool SetMinThreads(int workerThreads, int ioCompletionThreads) @@ -142,9 +140,9 @@ public bool SetMinThreads(int workerThreads, int ioCompletionThreads) wakeGateThread = true; } } - else if (_separated.numThreadsGoal < newMinThreads) + else if (_separated.counts.NumThreadsGoal < newMinThreads) { - _separated.numThreadsGoal = newMinThreads; + _separated.counts.InterlockedSetNumThreadsGoal(newMinThreads); if (_separated.numRequestedWorkers > 0) { addWorker = true; @@ -193,9 +191,9 @@ public bool SetMaxThreads(int workerThreads, int ioCompletionThreads) short newMaxThreads = (short)Math.Min(workerThreads, MaxPossibleThreadCount); _maxThreads = newMaxThreads; - if (_separated.numThreadsGoal > newMaxThreads) + if (_separated.counts.NumThreadsGoal > newMaxThreads) { - _separated.numThreadsGoal = newMaxThreads; + _separated.counts.InterlockedSetNumThreadsGoal(newMaxThreads); } return true; } @@ -272,13 +270,15 @@ private void AdjustMaxWorkersActive() bool addWorker = false; try { - // Skip hill climbing when there is a pending blocking adjustment. Hill climbing may otherwise bypass the - // blocking adjustment heuristics and increase the thread count too quickly. - if (_pendingBlockingAdjustment != PendingBlockingAdjustment.None) + // Repeated checks from ShouldAdjustMaxWorkersActive() inside the lock + ThreadCounts counts = _separated.counts; + if (counts.NumProcessingWork > counts.NumThreadsGoal || + _pendingBlockingAdjustment != PendingBlockingAdjustment.None) { return; } + long startTime = _currentSampleStartTime; long endTime = Stopwatch.GetTimestamp(); long freq = Stopwatch.Frequency; @@ -291,13 +291,13 @@ private void AdjustMaxWorkersActive() int totalNumCompletions = (int)_completionCounter.Count; int numCompletions = totalNumCompletions - _separated.priorCompletionCount; + short oldNumThreadsGoal = counts.NumThreadsGoal; int newNumThreadsGoal; (newNumThreadsGoal, _threadAdjustmentIntervalMs) = - HillClimbing.ThreadPoolHillClimber.Update(_separated.numThreadsGoal, elapsedSeconds, numCompletions); - short oldNumThreadsGoal = _separated.numThreadsGoal; + HillClimbing.ThreadPoolHillClimber.Update(oldNumThreadsGoal, elapsedSeconds, numCompletions); if (oldNumThreadsGoal != (short)newNumThreadsGoal) { - _separated.numThreadsGoal = (short)newNumThreadsGoal; + _separated.counts.InterlockedSetNumThreadsGoal((short)newNumThreadsGoal); // // If we're increasing the goal, inject a thread. If that thread finds work, it will inject @@ -354,7 +354,8 @@ private bool ShouldAdjustMaxWorkersActive(int currentTimeMs) // threads processing work to stop in response to a decreased thread count goal. The logic here is a bit // different from the original CoreCLR code from which this implementation was ported because in this // implementation there are no retired threads, so only the count of threads processing work is considered. - if (_separated.counts.NumProcessingWork > _separated.numThreadsGoal) + ThreadCounts counts = _separated.counts; + if (counts.NumProcessingWork > counts.NumThreadsGoal) { return false; } diff --git a/src/libraries/System.Threading.ThreadPool/tests/ThreadPoolTests.cs b/src/libraries/System.Threading.ThreadPool/tests/ThreadPoolTests.cs index 767e5bef92c16..89c8b54f230d7 100644 --- a/src/libraries/System.Threading.ThreadPool/tests/ThreadPoolTests.cs +++ b/src/libraries/System.Threading.ThreadPool/tests/ThreadPoolTests.cs @@ -2,6 +2,7 @@ // The .NET Foundation licenses this file to you under the MIT license. using System.Collections.Generic; +using System.Diagnostics; using System.Linq; using System.Reflection; using System.Threading.Tasks; @@ -949,6 +950,57 @@ void AppContextSetData(string name, object value) }).Dispose(); } + [ConditionalFact(nameof(IsThreadingAndRemoteExecutorSupported))] + public static void CooperativeBlockingWithProcessingThreadsAndGoalThreadsAndAddWorkerRaceTest() + { + // Avoid contaminating the main process' environment + RemoteExecutor.Invoke(() => + { + try + { + // The test is run affinitized to at most 2 processors for more frequent repros. The actual test process below + // will inherit the affinity. + Process testParentProcess = Process.GetCurrentProcess(); + testParentProcess.ProcessorAffinity = (nint)testParentProcess.ProcessorAffinity & 0x3; + } + catch (PlatformNotSupportedException) + { + // Processor affinity is not supported on some platforms, try to run the test anyway + } + + RemoteExecutor.Invoke(() => + { + const uint TestDurationMs = 4000; + + var done = new ManualResetEvent(false); + int startTimeMs = Environment.TickCount; + Action completingTask = data => ((TaskCompletionSource)data).SetResult(0); + Action repeatingTask = null; + repeatingTask = () => + { + if ((uint)(Environment.TickCount - startTimeMs) >= TestDurationMs) + { + done.Set(); + return; + } + + Task.Run(repeatingTask); + + var tcs = new TaskCompletionSource(); + Task.Factory.StartNew(completingTask, tcs); + tcs.Task.Wait(); + }; + + for (int i = 0; i < Environment.ProcessorCount; ++i) + { + Task.Run(repeatingTask); + } + + done.CheckedWait(); + }).Dispose(); + }).Dispose(); + } + public static bool IsThreadingAndRemoteExecutorSupported => PlatformDetection.IsThreadingSupported && RemoteExecutor.IsSupported; }