SimpleSynchronizationContext

Let’s try to create a simple SynchronizationContext. I won’t implement a Send method because it is not so interesting.

As you can see from the MaxConcurrencySyncContext example, we need a kind of task queue for the Post method. I will also create a separate thread, which purpose will be to execute the tasks from this queue. As I said in the first note, it means that our synchronization context will encapsulate this thread. Consumers don’t know how exactly their tasks will be handled. They just post the tasks and forget about them.

So, this is a very basic implementation of SynchronizationContext.

public class SimpleSynchronizationContext : SynchronizationContext
{
    private readonly ConcurrentQueue<MyTask> _queue;
    private readonly Thread _processingThread;

    public SimpleSynchronizationContext()
    {
        _queue = new ConcurrentQueue<MyTask>();
        _processingThread = new Thread(Handle);
        _processingThread.Start();
    }

    private void Handle()
    {
        Console.WriteLine($"Start processing tasks on thread: {Environment.CurrentManagedThreadId}");

        var spinWait = new SpinWait();
        while (true)
        {
            if (_queue.TryDequeue(out var myTask))
            {
                myTask.Callback(myTask.State);
            }
            else
            {
                spinWait.SpinOnce();
            }
        }
    }

    public override void Send(SendOrPostCallback d, object? state)
    {
        throw new NotImplementedException();
    }

    public override void Post(SendOrPostCallback d, object? state)
    {
        var myTask = new MyTask(d, state);
        _queue.Enqueue(myTask);
    }
}

public readonly record struct MyTask(SendOrPostCallback Callback, object? State);

Let’s try it in action.

static void RunWithSimpleContext()
{
    Console.WriteLine($"Current thread: {Environment.CurrentManagedThreadId}");

    var syncContext = new SimpleSynchronizationContext();
    for (var i = 0; i < 5; i++)
    {
        syncContext.Post(_ => Console.WriteLine($"Thread id: {Environment.CurrentManagedThreadId}"), null);
    }

    Console.ReadKey();
}
Current thread: 1
Start processing tasks on thread: 7
Thread id: 7
Thread id: 7
Thread id: 7
Thread id: 7
Thread id: 7

AffinitySynchronizationContext

Another fun example is to execute all tasks in a given core of your processor. It’s similar to the previous one, but I use the IdealProcessor and ProcessorAffinity properties to set up a specified core.

public class AffinitySynchronizationContext : SynchronizationContext
{
    private readonly int _processorId;
    private readonly ConcurrentQueue<MyTask> _queue;
    private readonly Thread _processingThread;
    
    public AffinitySynchronizationContext(int processorId)
    {
        _processorId = processorId;
        _queue = new ConcurrentQueue<MyTask>();
        _processingThread = new Thread(Handle)
        {
            IsBackground = true
        };
        _processingThread.Start();
    }

    private void Handle()
    {
        var threadId = Util.GetCurrentThreadId();
        var process = Process.GetCurrentProcess();
        var thread = process.Threads.OfType<ProcessThread>().Single(it => it.Id == threadId);
        thread.IdealProcessor = _processorId;
        thread.ProcessorAffinity = 1 << _processorId;
        
        var spinWait = new SpinWait();
        while (true)
        {
            if (_queue.TryDequeue(out var myTask))
            {
                myTask.Callback(myTask.State);
            }
            else
            {
                spinWait.SpinOnce();
            }
        }
    }
    
    public override void Send(SendOrPostCallback d, object? state)
    {
        throw new NotImplementedException();
    }

    public override void Post(SendOrPostCallback d, object? state)
    {
        var myTask = new MyTask(d, state);
        _queue.Enqueue(myTask);
    }
}

After that, we need to schedule some work for that context.

static void RunWithAffinityContext()
{
    Console.WriteLine("Star");

    var syncContext = new AffinitySynchronizationContext(8);
    for (var i = 0; i < 500_000; i++)
    {
        syncContext.Post(
            _ =>
            {
                long t = 0;
                for (var i = 0; i < 20_000; i++)
                {
                    t += i / 42 - i * 3;
                }
            },
            null
        );
    }
    
    Console.WriteLine("Finished");
    Console.ReadKey();
}

If you open a system monitor, you’ll see that only one of the cores is busy.

System monitor

References

The source code is available here

Comments