Limiting the number of threads

In our project we faced a problem how to make running some calculations concurrently. We can simply start each calculation in a separate thread but the problem is how to limit the number of threads.

The first idea was to use ThreadPool from .NET library. We could easily limit the number of threads using ThreadPool.SetMaxThreads. But here we have another problem: imagine, that our application uses threads somewhere else, or even more – some of the referenced libraries use threads internally. ThreadPool is a static class and setting its maximal number of concurrent threads may break other stuff.

All we want is to limit number of threads which we use locally. So, here we come to the necessity of implementing our own local thread pool. Of course, you can use existent implementation such as Smart Thread Pool. Unfortunately, in our case we are limited to .NET 3.5 and adding additional third-party library is a problem.

After N hours of realizing how to get solution with minimal effort, I wrote the next simple class:

public sealed class ThreadManager
{
    private readonly WaitHandle[] _syncObjects;

    public ThreadManager() : this(Environment.ProcessorCount)
    {
    }

    public ThreadManager(int maxThreads)
    {
        MaxThreads = maxThreads;

        _syncObjects = new WaitHandle[maxThreads];
        for (var i = 0; i < maxThreads; i++)
            _syncObjects[i] = new AutoResetEvent(true);
    }

    public int MaxThreads { get; private set; }

    public void StartTask(Action action)
    {
        var freeIndex = WaitHandle.WaitAny(_syncObjects);

        ThreadPool.QueueUserWorkItem(
            state =>
                {
                    action();
                    ((AutoResetEvent)state).Set();
                },
            _syncObjects[freeIndex]);
    }
}

First of all, we don’t want to manage each thread, observe when to send it to sleep, wake up, how to reuse, etc. – all that work that ThreatPool does. So, the approach is very simple: use threads from ThreadPool (let him do all work for us :) ) and limit their number using sync objects.

In the class constructor we create the array of sync objects according to the specified number of threads. By default, to achieve maximal performance, class creates number of sync objects equal to number of machine’s processors. Each object is in signaled state from the start.

To run the calculation in a separate thread, we simply call StartTask method and pass the action. Inside StartTask method the first thing that we do is waiting for any sync object using WaitHandle.WaitAny method. For sync objects I use AutoResetEvent because WaitHandle.WaitAny automatically switches them to non-signaled state and doing it as atomic operation, preventing from taking the same sync object by two threads.

StartTask blocks code execution until some sync object will be freed (which means that some thread has finished work). Then it takes thread from ThreatPool, executes passed action in it and switches sync object back to signaled state after finishing.

Lets take a look on the example scenario to understand how it works:

  1. Created ThreadManager with limitation to 2 concurrent threads.
  2. Called StartTask for calculation 1. First internal sync object became non-signaled. Calculation 1 is executing in separate thread.
  3. Called StartTask for calculation 2. Second internal sync object became non-signaled. Calculation 2 is executing in separate thread.
  4. Called StartTask for calculation 3. StartTask blocked execution waiting for any sync object with signaled state.
  5. Calculation 1 finished. Method Set was called on appropriate sync object and switched it to signaled state.
  6. StartTask continued execution, took freed signaled sync object and started calculation 3 in separate thread.

Such approach to limit threads has some inconvenience:

  • we can’t check if we have free threads and how many;
  • it’s quite difficult to test it because ideally we need to inject ThreadPool to have an ability to mock it.

To solve (at least, partially) listed problems lets make some improvements:

internal sealed class ThreadManager
{
    private readonly WaitHandle[] _SyncObjects;

    public ThreadManager() : this(Environment.ProcessorCount)
    {
    }

    public ThreadManager(int maxThreads)
    {
        MaxThreads = maxThreads;

        _SyncObjects = new WaitHandle[maxThreads];
        for (var i = 0; i < maxThreads; i++)
            _SyncObjects[i] = new AutoResetEvent(true);
    }

    public int MaxThreads { get; private set; }

    public bool StartTask(Action action, bool wait)
    {
        var freeIndex = WaitHandle.WaitAny(_SyncObjects, wait ? Timeout.Infinite : 0);
        if (freeIndex == WaitHandle.WaitTimeout)
            return false;

        ThreadPool.QueueUserWorkItem(
            state =>
                {
                    action();
                    ((AutoResetEvent)state).Set();
                },
            _SyncObjects[freeIndex]);

        return true;
    }
}

From now we can call StartTask method with or without waiting for free thread. We simply use a possibility to specify timeout in WaitHandle.WaitAny. If we pass 0 and no sync objects in signaled state are available then it returns index equal to WaitHandle.WaitTimeout. This modification allows checking for free threads without blocking execution of the current thread.

Now its much easier to write unit-test to check how ThreadManager limits number of threads and executes code in separate threads:

[TestFixture]
public class ThreadManagerTests
{
    [Test]
    [Timeout(2000)]
    public void CodeExecutionTest()
    {
        // Setup
        const int maxThreads = 3;
        const int tasksNumber = maxThreads + 2;
        var manager = new ThreadManager(maxThreads);

        var syncObjects = new WaitHandle[tasksNumber];
        for (var i = 0; i < tasksNumber; i++)
            syncObjects[i] = new ManualResetEvent(false);

        var n = new[] { 0 };

        // Execute
        for (var i = 0; i < tasksNumber; i++)
        {
            var syncObjectId = i;
            manager.StartTask(
                () =>
                {
                    lock (n) n[0]++;
                    ((ManualResetEvent)syncObjects[syncObjectId]).Set();
                },
                true);
        }

        WaitHandle.WaitAll(syncObjects);

        // Verify
        Assert.AreEqual(tasksNumber, n[0]);
    }

    [Test]
    [Timeout(2000)]
    public void ThreadsUsageTest()
    {
        // Setup
        const int maxThreads = 3;
        var manager = new ThreadManager(maxThreads);

        var syncObjects1 = new WaitHandle[maxThreads];
        var syncObjects2 = new WaitHandle[maxThreads];
        for (var i = 0; i < maxThreads; i++)
        {
            syncObjects1[i] = new ManualResetEvent(false);
            syncObjects2[i] = new ManualResetEvent(false);
        }

        var threadIds = new List<int>();

        // Execute
        for (var i = 0; i < maxThreads; i++)
        {
            var syncObjectId = i;
            manager.StartTask(
                () =>
                {
                    lock (threadIds) threadIds.Add(Thread.CurrentThread.ManagedThreadId);
                    ((ManualResetEvent)syncObjects1[syncObjectId]).Set();
                    syncObjects2[syncObjectId].WaitOne();
                },
                true);
        }

        WaitHandle.WaitAll(syncObjects1);

        foreach (var waitHandle in syncObjects2)
            ((ManualResetEvent)waitHandle).Set();

        // Verify
        Assert.AreEqual(maxThreads, threadIds.Distinct().Count());
    }

    [Test]
    [Timeout(2000)]
    public void ThreadsLimitTest()
    {
        // Setup
        const int maxThreads = 3;
        var manager = new ThreadManager(maxThreads);

        var syncObjects = new WaitHandle[maxThreads];
        for (var i = 0; i < maxThreads; i++)
            syncObjects[i] = new ManualResetEvent(false);

        // Execute
        for (var i = 0; i < maxThreads; i++)
        {
            var syncObjectId = i;
            manager.StartTask(() => syncObjects[syncObjectId].WaitOne(), true);
        }

        // Verify
        var result = manager.StartTask(() => { }, false);
        Assert.IsFalse(result);

        ((ManualResetEvent)syncObjects[0]).Set();

        result = manager.StartTask(() => { }, true);
        Assert.IsTrue(result);
    }
}

The first test checks that code in each thread really executes. The second test checks that every task runs in separate thread (we collect thread Ids and check that they all are different). In the third test we verify ability of ThreadManager to limit number of threads.

Thats all. We have a pretty simple class for using limited number of thread, useful in situations when we have to use the old .NET Framework versions.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s