Skip to content

Commit

Permalink
Add a thread pool config var on Windows for choosing the number of IOCPs
Browse files Browse the repository at this point in the history
There were cases where using more than one IOCP was beneficial along with some other changes. Being able to configure the number would be useful for folks to do further testing without having to use private binaries.
  • Loading branch information
kouvel committed Jul 20, 2024
1 parent ef56648 commit 2b99593
Show file tree
Hide file tree
Showing 5 changed files with 195 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,43 +10,85 @@ namespace System.Threading
{
internal sealed partial class PortableThreadPool
{
// Continuations of IO completions are dispatched to the ThreadPool from IO completion poller threads. This avoids
// continuations blocking/stalling the IO completion poller threads. Setting UnsafeInlineIOCompletionCallbacks allows
// continuations to run directly on the IO completion poller thread, but is inherently unsafe due to the potential for
// those threads to become stalled due to blocking. Sometimes, setting this config value may yield better latency. The
// config value is named for consistency with SocketAsyncEngine.Unix.cs.
private static readonly bool UnsafeInlineIOCompletionCallbacks =
Environment.GetEnvironmentVariable("DOTNET_SYSTEM_NET_SOCKETS_INLINE_COMPLETIONS") == "1";
private readonly nint[] _ioPorts = new nint[IOCompletionPortCount];
private uint _ioPortSelectorForRegister = unchecked((uint)-1);
private uint _ioPortSelectorForQueue = unchecked((uint)-1);
private IOCompletionPoller[]? _ioCompletionPollers;

private static readonly int IOCompletionPollerCount = GetIOCompletionPollerCount();
private static short DetermineIOCompletionPortCount()
{
const short DefaultIOPortCount = 1;
const short MaxIOPortCount = 1 << 10;

short ioPortCount =
AppContextConfigHelper.GetInt16Config(
"System.Threading.ThreadPool.IOCompletionPortCount",
"DOTNET_ThreadPool_IOCompletionPortCount",
DefaultIOPortCount,
allowNegative: false);
return ioPortCount == 0 ? DefaultIOPortCount : Math.Min(ioPortCount, MaxIOPortCount);
}

private static int GetIOCompletionPollerCount()
private static int DetermineIOCompletionPollerCount()
{
// Named for consistency with SocketAsyncEngine.Unix.cs, this environment variable is checked to override the exact
// number of IO completion poller threads to use. See the comment in SocketAsyncEngine.Unix.cs about its potential
// uses. For this implementation, the ProcessorsPerIOPollerThread config option below may be preferable as it may be
// less machine-specific.
if (uint.TryParse(Environment.GetEnvironmentVariable("DOTNET_SYSTEM_NET_SOCKETS_THREAD_COUNT"), out uint count))
int ioPollerCount;
if (uint.TryParse(Environment.GetEnvironmentVariable("DOTNET_SYSTEM_NET_SOCKETS_THREAD_COUNT"), out uint count) &&
count != 0)
{
return Math.Min((int)count, MaxPossibleThreadCount);
ioPollerCount = (int)Math.Min(count, (uint)MaxPossibleThreadCount);
}

if (UnsafeInlineIOCompletionCallbacks)
else if (UnsafeInlineIOCompletionCallbacks)
{
// In this mode, default to ProcessorCount pollers to ensure that all processors can be utilized if more work
// happens on the poller threads
return Environment.ProcessorCount;
ioPollerCount = Environment.ProcessorCount;
}
else
{
int processorsPerPoller =
AppContextConfigHelper.GetInt32Config("System.Threading.ThreadPool.ProcessorsPerIOPollerThread", 12, false);
ioPollerCount = (Environment.ProcessorCount - 1) / processorsPerPoller + 1;
}

int processorsPerPoller =
AppContextConfigHelper.GetInt32Config("System.Threading.ThreadPool.ProcessorsPerIOPollerThread", 12, false);
return (Environment.ProcessorCount - 1) / processorsPerPoller + 1;
if (IOCompletionPortCount == 1)
{
return ioPollerCount;
}

// Use at least one IO poller per port
if (ioPollerCount <= IOCompletionPortCount)
{
return IOCompletionPortCount;
}

// Use the same number of IO pollers per port, align up if necessary to make it even
int rem = ioPollerCount % IOCompletionPortCount;
if (rem != 0)
{
ioPollerCount += IOCompletionPortCount - rem;
}

return ioPollerCount;
}

private void InitializeIOOnWindows()
{
Debug.Assert(IOCompletionPollerCount % IOCompletionPortCount == 0);
int numConcurrentThreads = IOCompletionPollerCount / IOCompletionPortCount;
for (int i = 0; i < IOCompletionPortCount; i++)
{
_ioPorts[i] = CreateIOCompletionPort(numConcurrentThreads);
}
}

private static nint CreateIOCompletionPort()
private static nint CreateIOCompletionPort(int numConcurrentThreads)
{
nint port =
Interop.Kernel32.CreateIoCompletionPort(new IntPtr(-1), IntPtr.Zero, UIntPtr.Zero, IOCompletionPollerCount);
Interop.Kernel32.CreateIoCompletionPort(new IntPtr(-1), IntPtr.Zero, UIntPtr.Zero, numConcurrentThreads);
if (port == 0)
{
int hr = Marshal.GetHRForLastWin32Error();
Expand All @@ -58,26 +100,32 @@ private static nint CreateIOCompletionPort()

public void RegisterForIOCompletionNotifications(nint handle)
{
Debug.Assert(_ioPort != 0);
Debug.Assert(_ioPorts != null);

if (_ioCompletionPollers == null)
{
EnsureIOCompletionPollers();
}

nint port = Interop.Kernel32.CreateIoCompletionPort(handle, _ioPort, UIntPtr.Zero, 0);
uint selectedPortIndex =
IOCompletionPortCount == 1
? 0
: Interlocked.Increment(ref _ioPortSelectorForRegister) % (uint)IOCompletionPortCount;
nint selectedPort = _ioPorts[selectedPortIndex];
Debug.Assert(selectedPort != 0);
nint port = Interop.Kernel32.CreateIoCompletionPort(handle, selectedPort, UIntPtr.Zero, 0);
if (port == 0)
{
ThrowHelper.ThrowApplicationException(Marshal.GetHRForLastWin32Error());
}

Debug.Assert(port == _ioPort);
Debug.Assert(port == selectedPort);
}

public unsafe void QueueNativeOverlapped(NativeOverlapped* nativeOverlapped)
{
Debug.Assert(nativeOverlapped != null);
Debug.Assert(_ioPort != 0);
Debug.Assert(_ioPorts != null);

if (_ioCompletionPollers == null)
{
Expand All @@ -89,7 +137,13 @@ public unsafe void QueueNativeOverlapped(NativeOverlapped* nativeOverlapped)
NativeRuntimeEventSource.Log.ThreadPoolIOEnqueue(nativeOverlapped);
}

if (!Interop.Kernel32.PostQueuedCompletionStatus(_ioPort, 0, UIntPtr.Zero, (IntPtr)nativeOverlapped))
uint selectedPortIndex =
IOCompletionPortCount == 1
? 0
: Interlocked.Increment(ref _ioPortSelectorForQueue) % (uint)IOCompletionPortCount;
nint selectedPort = _ioPorts[selectedPortIndex];
Debug.Assert(selectedPort != 0);
if (!Interop.Kernel32.PostQueuedCompletionStatus(selectedPort, 0, UIntPtr.Zero, (IntPtr)nativeOverlapped))
{
ThrowHelper.ThrowApplicationException(Marshal.GetHRForLastWin32Error());
}
Expand All @@ -109,7 +163,7 @@ private void EnsureIOCompletionPollers()
IOCompletionPoller[] pollers = new IOCompletionPoller[IOCompletionPollerCount];
for (int i = 0; i < IOCompletionPollerCount; ++i)
{
pollers[i] = new IOCompletionPoller(_ioPort);
pollers[i] = new IOCompletionPoller(_ioPorts[i % IOCompletionPortCount]);
}

_ioCompletionPollers = pollers;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,19 @@ internal sealed partial class PortableThreadPool
private static readonly short ForcedMaxWorkerThreads =
AppContextConfigHelper.GetInt16Config("System.Threading.ThreadPool.MaxThreads", 0, false);

#if TARGET_WINDOWS
// Continuations of IO completions are dispatched to the ThreadPool from IO completion poller threads. This avoids
// continuations blocking/stalling the IO completion poller threads. Setting UnsafeInlineIOCompletionCallbacks allows
// continuations to run directly on the IO completion poller thread, but is inherently unsafe due to the potential for
// those threads to become stalled due to blocking. Sometimes, setting this config value may yield better latency. The
// config value is named for consistency with SocketAsyncEngine.Unix.cs.
private static readonly bool UnsafeInlineIOCompletionCallbacks =
Environment.GetEnvironmentVariable("DOTNET_SYSTEM_NET_SOCKETS_INLINE_COMPLETIONS") == "1";

private static readonly short IOCompletionPortCount = DetermineIOCompletionPortCount();
private static readonly int IOCompletionPollerCount = DetermineIOCompletionPollerCount();
#endif

private static readonly int ThreadPoolThreadTimeoutMs = DetermineThreadPoolThreadTimeoutMs();

private static int DetermineThreadPoolThreadTimeoutMs()
Expand Down Expand Up @@ -107,11 +120,6 @@ private struct CacheLineSeparated
private long _memoryUsageBytes;
private long _memoryLimitBytes;

#if TARGET_WINDOWS
private readonly nint _ioPort;
private IOCompletionPoller[]? _ioCompletionPollers;
#endif

private readonly LowLevelLock _threadAdjustmentLock = new LowLevelLock();

private CacheLineSeparated _separated; // SOS's ThreadPool command depends on this name
Expand Down Expand Up @@ -149,7 +157,7 @@ private PortableThreadPool()
_separated.counts.NumThreadsGoal = _minThreads;

#if TARGET_WINDOWS
_ioPort = CreateIOCompletionPort();
InitializeIOOnWindows();
#endif
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
<PropertyGroup>
<IncludeRemoteExecutor>true</IncludeRemoteExecutor>
<TargetFramework>$(NetCoreAppCurrent)</TargetFramework>
<AllowUnsafeBlocks>true</AllowUnsafeBlocks>
<TestRuntime>true</TestRuntime>
</PropertyGroup>
<PropertyGroup Condition="'$(TargetOS)' == 'browser'">
Expand Down
100 changes: 100 additions & 0 deletions src/libraries/System.Threading.ThreadPool/tests/ThreadPoolTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1334,6 +1334,106 @@ public static void PrioritizationExperimentConfigVarTest()
}).Dispose();
}

public static IEnumerable<object[]> IOCompletionPortCountConfigVarTest_Args =
from x in Enumerable.Range(0, 9)
select new object[] { x };

// Just verifies that some basic IO operations work with different IOCP counts
[ConditionalTheory(nameof(IsThreadingAndRemoteExecutorSupported), nameof(UsePortableThreadPool))]
[MemberData(nameof(IOCompletionPortCountConfigVarTest_Args))]
[PlatformSpecific(TestPlatforms.Windows)]
public static void IOCompletionPortCountConfigVarTest(int ioCompletionPortCount)
{
// Avoid contaminating the main process' environment
RemoteExecutor.Invoke(ioCompletionPortCountStr =>
{
int ioCompletionPortCount = int.Parse(ioCompletionPortCountStr);
const int PretendProcessorCount = 80;
// The actual test process below will inherit the config vars
Environment.SetEnvironmentVariable("DOTNET_PROCESSOR_COUNT", PretendProcessorCount.ToString());
Environment.SetEnvironmentVariable("DOTNET_SYSTEM_NET_SOCKETS_THREAD_COUNT", "7");
if (ioCompletionPortCount != 0)
{
Environment.SetEnvironmentVariable(
"DOTNET_ThreadPool_IOCompletionPortCount",
ioCompletionPortCount.ToString());
}
RemoteExecutor.Invoke(() =>
{
RunQueueNativeOverlappedTest();
RunAsyncIOTest().Wait();
static unsafe void RunQueueNativeOverlappedTest()
{
var done = new AutoResetEvent(false);
for (int i = 0; i < PretendProcessorCount; i++)
{
// Queue a NativeOverlapped, wait for the callback to run
var overlapped = new Overlapped();
NativeOverlapped* nativeOverlapped = overlapped.Pack((_, _, _) => done.Set(), null);
try
{
ThreadPool.UnsafeQueueNativeOverlapped(nativeOverlapped);
done.CheckedWait();
}
finally
{
if (nativeOverlapped != null)
{
Overlapped.Free(nativeOverlapped);
}
}
}
}
static async Task RunAsyncIOTest()
{
var done = new AutoResetEvent(false);
// Receiver
var t = ThreadTestHelpers.CreateGuardedThread(
out Action checkForThreadErrors,
out Action waitForThread,
async () =>
{
using var listener = new TcpListener(IPAddress.Loopback, 55555);
var receiveBuffer = new byte[1];
listener.Start();
done.Set(); // indicate listener started
while (true)
{
// Accept a connection, receive a byte
using var socket = await listener.AcceptSocketAsync();
int bytesRead =
await socket.ReceiveAsync(new ArraySegment<byte>(receiveBuffer), SocketFlags.None);
Assert.Equal(1, bytesRead);
done.Set(); // indicate byte received
}
});
t.IsBackground = true;
t.Start();
done.CheckedWait(); // wait for listener to start
// Sender
var sendBuffer = new byte[1];
for (int i = 0; i < PretendProcessorCount / 2; i++)
{
// Connect, send a byte
using var client = new TcpClient();
await client.ConnectAsync(IPAddress.Loopback, 55555);
int bytesSent =
await client.Client.SendAsync(new ArraySegment<byte>(sendBuffer), SocketFlags.None);
Assert.Equal(1, bytesSent);
done.CheckedWait(); // wait for byte to the received
}
}
}).Dispose();
}, ioCompletionPortCount.ToString()).Dispose();
}

public static bool IsThreadingAndRemoteExecutorSupported =>
PlatformDetection.IsThreadingSupported && RemoteExecutor.IsSupported;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
<IncludeRemoteExecutor>true</IncludeRemoteExecutor>
<!-- This test project is Windows only as it uses the Windows Threadpool -->
<TargetFramework>$(NetCoreAppCurrent)-windows</TargetFramework>
<AllowUnsafeBlocks>true</AllowUnsafeBlocks>
<TestRuntime>true</TestRuntime>
</PropertyGroup>
<ItemGroup>
Expand Down

0 comments on commit 2b99593

Please sign in to comment.