SimpleSynchronizationContext
Let’s try to create a simple SynchronizationContext
. I won’t implement a Send
method because it is not so
interesting.
As you can see from the MaxConcurrencySyncContext
example,
we need a kind of task queue for the Post
method. I will also create a separate thread, which purpose will be to
execute the tasks from this queue. As I said in the first note,
it means that our synchronization context will encapsulate this thread.
Consumers don’t know how exactly their tasks will be handled. They just post the tasks and forget about them.
So, this is a very basic implementation of SynchronizationContext
.
public class SimpleSynchronizationContext : SynchronizationContext
{
private readonly ConcurrentQueue<MyTask> _queue;
private readonly Thread _processingThread;
public SimpleSynchronizationContext()
{
_queue = new ConcurrentQueue<MyTask>();
_processingThread = new Thread(Handle);
_processingThread.Start();
}
private void Handle()
{
Console.WriteLine($"Start processing tasks on thread: {Environment.CurrentManagedThreadId}");
var spinWait = new SpinWait();
while (true)
{
if (_queue.TryDequeue(out var myTask))
{
myTask.Callback(myTask.State);
}
else
{
spinWait.SpinOnce();
}
}
}
public override void Send(SendOrPostCallback d, object? state)
{
throw new NotImplementedException();
}
public override void Post(SendOrPostCallback d, object? state)
{
var myTask = new MyTask(d, state);
_queue.Enqueue(myTask);
}
}
public readonly record struct MyTask(SendOrPostCallback Callback, object? State);
Let’s try it in action.
static void RunWithSimpleContext()
{
Console.WriteLine($"Current thread: {Environment.CurrentManagedThreadId}");
var syncContext = new SimpleSynchronizationContext();
for (var i = 0; i < 5; i++)
{
syncContext.Post(_ => Console.WriteLine($"Thread id: {Environment.CurrentManagedThreadId}"), null);
}
Console.ReadKey();
}
Current thread: 1
Start processing tasks on thread: 7
Thread id: 7
Thread id: 7
Thread id: 7
Thread id: 7
Thread id: 7
AffinitySynchronizationContext
Another fun example is to execute all tasks in a given core of your processor. It’s similar to the previous one, but I
use the IdealProcessor
and ProcessorAffinity
properties to set up a specified core.
public class AffinitySynchronizationContext : SynchronizationContext
{
private readonly int _processorId;
private readonly ConcurrentQueue<MyTask> _queue;
private readonly Thread _processingThread;
public AffinitySynchronizationContext(int processorId)
{
_processorId = processorId;
_queue = new ConcurrentQueue<MyTask>();
_processingThread = new Thread(Handle)
{
IsBackground = true
};
_processingThread.Start();
}
private void Handle()
{
var threadId = Util.GetCurrentThreadId();
var process = Process.GetCurrentProcess();
var thread = process.Threads.OfType<ProcessThread>().Single(it => it.Id == threadId);
thread.IdealProcessor = _processorId;
thread.ProcessorAffinity = 1 << _processorId;
var spinWait = new SpinWait();
while (true)
{
if (_queue.TryDequeue(out var myTask))
{
myTask.Callback(myTask.State);
}
else
{
spinWait.SpinOnce();
}
}
}
public override void Send(SendOrPostCallback d, object? state)
{
throw new NotImplementedException();
}
public override void Post(SendOrPostCallback d, object? state)
{
var myTask = new MyTask(d, state);
_queue.Enqueue(myTask);
}
}
After that, we need to schedule some work for that context.
static void RunWithAffinityContext()
{
Console.WriteLine("Star");
var syncContext = new AffinitySynchronizationContext(8);
for (var i = 0; i < 500_000; i++)
{
syncContext.Post(
_ =>
{
long t = 0;
for (var i = 0; i < 20_000; i++)
{
t += i / 42 - i * 3;
}
},
null
);
}
Console.WriteLine("Finished");
Console.ReadKey();
}
If you open a system monitor, you’ll see that only one of the cores is busy.
References
The source code is available here
Comments