System.Threading.Channels - high performance producer-consumer and asynchronous without allocation and stack dive

Hello again. Some time ago I wrote about another little-known tool for high performance hobbyists - System.IO.Pipelines . At its core, the considered System.Threading.Channels (hereinafter β€œchannels”) is built on similar principles as Pipelines, solving the same problem - Producer-Consumer. However, it has a much simpler api that will gracefully merge into any kind of enterprise code. At the same time, it uses asynchrony without allocations and without stack-dive, even in the asynchronous case! (Not always, but often).







Table of contents







Introduction



The Producer / Consumer problem is encountered on the way of programmers quite often and for more than a dozen years. Edsger Dijkstra himself had a hand in solving this problem - he belongs to the idea of ​​using semaphores to synchronize flows when organizing work on the producer / consumer principle. And although its solution in its simplest form is known and rather trivial, in the real world this pattern (Producer / Consumer) can occur in a much more complicated form. Also, modern programming standards impose their fingerprints, the code is written more simplified and broken for further reuse. Everything is done to lower the threshold for writing high-quality code and simplify this process. And the namespace in question - System.Threading.Channels - is another step towards that goal.



I was looking at System.IO.Pipelines a while ago. It required more attentive work and a deeper understanding of the matter, Span and Memory were used, and for efficient work it was required not to call obvious methods (to avoid unnecessary memory allocations) and constantly think in bytes. Because of this, the Pipeline programming interface was nontrivial and not intuitive.



System.Threading.Channels presents the user with a much simpler api to work with. It is worth mentioning that despite the simplicity of the api, this tool is highly optimized and most likely will not allocate memory during its work. Perhaps this is due to the fact that ValueTask is widely used under the hood , and even in the case of real asynchrony, IValueTaskSource is used, which is reused for further operations. This is precisely the whole interest of the implementation of the Channels.



Channels are generic, the generic type is, as you might guess, the type that will be produced and consumed. Interestingly, the implementation of the Channel class, which fits in 1 line ( github source ):



namespace System.Threading.Channels
{
    public abstract class Channel<T> : Channel<T, T> { }
}


Thus, the main class of channels is parameterized by 2 types - separately for the producer and consumer channels. But for realized channels this is not used.

For those familiar with Pipelines, the general approach to getting started will seem familiar. Namely. We create 1 central class from which we pull separately manufacturers ( ChannelWriter ) and consumers ( ChannelReader ). Despite the names, it is worth remembering that this is exactly the producer / consumer, and not the reader / writer from another classic multithreading task of the same name. ChannelReader changes the state of the general channel (pulls out a value), which is no longer available. So he rather does not read, but consumes. But we'll get to know the implementation later.



Beginning of work. Channel



Getting started with channels begins with the abstract Channel <T> class and the static Channel class , which creates the most appropriate implementation. Further, from this common Channel, you can get a ChannelWriter for writing to the channel and a ChannelReader for consumption from the channel. A channel is a repository of general information for ChannelWriter and ChannelReader, so it is all the data that is stored in it. And already the logic of their recording or consumption is dispersed in the ChannelWriter and ChannelReader. Conventionally, channels can be divided into 2 groups - unlimited and limited. The first ones are simpler to implement, you can write in them infinitely (as long as memory allows). The latter are limited to a certain maximum value of the number of records.



This is where the nature of asynchrony is slightly different. In unbounded channels, the write operation will always complete synchronously, there is nothing to stop writing to the channel. The situation is different for limited channels. With standard behavior (which can be replaced), the write operation will end synchronously as long as there is room for new instances in the channel. Once the pipe is full, the write operation will not complete until space is freed (after the consumer has consumed the consumed). Therefore, here the operation will be really asynchronous with changing threads and associated changes (or without changing, which will be described a little later).



For the most part, the behavior of the readers is the same - if there is something in the channel, then the reader simply reads it and ends in sync. If there is nothing, then it waits for someone to write something down.



The Channel static class contains 4 methods for creating the above channels:



Channel<T> CreateUnbounded<T>();
Channel<T> CreateUnbounded<T>(UnboundedChannelOptions options);
Channel<T> CreateBounded<T>(int capacity);
Channel<T> CreateBounded<T>(BoundedChannelOptions options);


If you wish, you can specify more precise options for creating a channel, which will help optimize it for the specified needs.



UnboundedChannelOptions contains 3 properties whose default value is false:



  1. AllowSynchronousContinuations β€” , , . -. , . , , , . , , , . , - - , ;
  2. SingleReader β€” , . , ;
  3. SingleWriter β€” , ;


BoundedChannelOptions contains the same 3 properties and 2 more on top



  1. AllowSynchronousContinuations - the same;
  2. SingleReader - the same;
  3. SingleWriter is the same;
  4. Capacity - the number of recordings to fit into the channel. This parameter is also a constructor parameter;
  5. FullMode - the BoundedChannelFullMode enumeration, which has 4 options, determines the behavior when trying to write to a full channel:

    • Wait - waits for free space to complete the asynchronous operation
    • DropNewest - the item being written overwrites the newest existing one, ends synchronously
    • DropOldest - a recordable item overwrites the oldest of the existing ends synchronously
    • DropWrite - the item being written is not written, it ends synchronously




Depending on the passed parameters and the called method, one of 3 implementations will be created: SingleConsumerUnboundedChannel , UnboundedChannel , BoundedChannel . But this is not so important, because we will use the channel through the base class Channel <TWrite, TRead>.



It has 2 properties:



  • ChannelReader <TRead> Reader {get; protected set; }
  • ChannelWriter <TWrite> Writer {get; protected set; }


And also, 2 operators of implicit type casting to ChannelReader <TRead> and ChannelWriter <TWrite>.



An example of getting started with channels:



Channel<int> channel = Channel.CreateUnbounded<int>();
//  
ChannelWriter<int> writer = channel.Writer;
ChannelReader<int> reader = channel.Reader; 
// 
ChannelWriter<int> writer = channel;
ChannelReader<int> reader = channel;


The data is stored in a queue. For 3 types, 3 different queues are used - ConcurrentQueue <T>, Deque <T> and SingleProducerSingleConsumerQueue <T>. At this point, it seemed to me that I was out of date and missed a bunch of new simple collections. But I hasten to upset - they are not for everyone. Marked internal, so use them will not work. But if you suddenly need them at the prod - you can find them here (SingleProducerConsumerQueue) and here (Deque) . The implementation of the latter is quite simple. I advise you to read it, you can study it very quickly.



So, let's get down to studying ChannelReader and ChannelWriter directly, as well as interesting implementation details. They all boil down to asynchronous, no memory allocation using IValueTaskSource.



ChannelReader - Consumer



When a consumer object is requested, one of the implementations of the abstract ChannelReader <T> class is returned. Again, unlike API Pipelines, it’s simple and there are few methods. Just know the list of methods to understand how to use this in practice.



Methods:



  1. Virtual get-only property Task Completion {get; } An

    object of type Task that completes when the channel is closed;
  2. Virtual get-only property int Count {get; }

    Here it should be emphasized that the current number of readable objects is returned;
  3. Virtual get-only property bool CanCount {get; }

    Indicates whether the Count property is available;
  4. bool TryRead(out T item)

    . bool, , . out ( null, );
  5. ValueTask<bool> WaitToReadAsync(CancellationToken cancellationToken = default)

    ValueTask true, , . ValueTask false, ( );
  6. ValueTask<T> ReadAsync(CancellationToken cancellationToken = default)

    . , . .



    , TryRead WaitToReadAsync. ( cancelation tokens), β€” TryRead. , while(true) WaitToReadAsync. true, , TryRead. TryRead , , . β€” , WaitToReadAsync, , , .

    , , - .




ChannelWriter - producer



Everything is similar to the consumer, so let's look at the methods right away:



  1. Virtual method bool TryComplete (Exception? Error = null)

    Attempts to mark the channel as complete, i.e. show that no more data will be written to it. As an optional parameter, you can throw an exception that caused the termination of the channel. Returns true if it was successfully completed, otherwise false (if the channel has already been completed or does not support termination);
  2. Abstract method bool TryWrite (T item)

    Tries to write a value to the channel. Returns true if successful and false if not
  3. Abstract method ValueTask <bool> WaitToWriteAsync (CancellationToken cancellationToken = default)

    Returns a ValueTask with the value true, which will end when there is a place for recording in the channel. The value will be false if writes to the channel are no longer allowed;
  4. Virtual Method ValueTask WriteAsync (T item, CancellationToken cancellationToken = default)

    Writes asynchronously to the channel. For example, if the channel is full, the operation will be really asynchronous and will complete only after freeing up space for this record;
  5. Method void Complete (Exception? Error = null)

    Just tries to mark the channel as complete with TryComplete, and if it fails, throws an exception.


A small example of the above (to easily start your own experiments):



Channel<int> unboundedChannel = Channel.CreateUnbounded<int>();

//      ,        
ChannelWriter<int> writer = unboundedChannel;
ChannelReader<int> reader = unboundedChannel;

//     
int objectToWriteInChannel = 555;
await writer.WriteAsync(objectToWriteInChannel);
//  ,     ,   ,  
writer.Complete();

//         
int valueFromChannel = await reader.ReadAsync();


Now let's move on to the most interesting part.



Asynchrony without allocations



In the process of writing and studying the code, I realized that there is almost nothing interesting in the implementation of all these operations. In general, it can be described as follows - avoiding unnecessary locks with the help of concurrent collections and abundant use of ValueTask, which is a structure that saves memory. However, I hasten to remind you that it is not worthwhile to quickly go through all the files on your PC and replace all Task with ValueTask. It only makes sense in cases where the operation completes synchronously in most cases. After all, as we remember, with asynchrony, it is quite possible to change the thread, which means that the stack will no longer be the same as before. Anyway, a true professional in the field of productivity knows - do not optimize before problems arise.



One good thing is that I will not register myself as a professional, and therefore it's time to figure out what the secret of writing asynchronous code without memory allocations is, which at first glance sounds too good to be true. But it also happens.



IValueTaskSource interface



Let's start our journey from the beginning - the ValueTask structure , which was added in .net core 2.0 and amended in 2.1. Inside this structure, there is a tricky object _obj field. It is easy to guess, based on the talking name, that one of 3 things can be hidden in this field - null, Task / Task <T> or IValueTaskSource. In fact, it follows from the way the ValueTask is created.



As the manufacturer assures, this structure should be used only obviously - with the await keyword. That is, you should not apply await many times to the same ValueTask, use combinators, add multiple continuations, etc. Also, you shouldn't get the result from ValueTask more than once. And this is connected precisely with what we are trying to understand - by reusing all this stuff without allocating memory.



I have already mentioned the IValueTaskSource interface . It is he who helps to save memory. This is done by reusing the IValueTaskSource itself several times for many tasks. But precisely because of this reuse, there is no way to indulge in ValueTask.



So IValueTaskSource. This interface has 3 methods, by implementing which you will successfully save memory and time on the allocation of those treasured bytes.



  1. GetResult - Called once, when the state machine, formed at runtime for asynchronous methods, needs a result. ValueTask has a GetResult method, which calls the interface method of the same name, which, as we recall, can be stored in the _obj field.
  2. GetStatus - Called by the state machine to determine the status of an operation. Also via ValueTask.
  3. OnCompleted - Again, called by the state machine to add a continuation to an outstanding task at that time.


But despite the simple interface, the implementation will require some skill. And here we can remember what we started with - Channels . This implementation uses the AsyncOperation classwhich is an implementation of IValueTaskSource. This class is hidden behind the internal access modifier. But this does not stop to understand the basic mechanisms. This begs the question, why not give the implementation of IValueTaskSource to the masses? The first reason (for fun's sake) is when there is a hammer in the hands, nails are everywhere, when the implementation of IValueTaskSource is in the hands, there is an illiterate work with memory everywhere. The second reason (more plausible) is that while the interface is simple and versatile, the real implementation is optimal when using certain nuances of the application. And it is probably for this reason that you can find implementations in various parts of the great and powerful .net, such as AsyncOperation under the hood of channels, AsyncIOOperation inside the new socket API, and so on.

However, in fairness, there is still one common implementation -ManualResetValueTaskSourceCore . But this is already too far from the topic of the article.



CompareExchange



A fairly popular method of the popular class, avoiding the overhead of classic synchronization primitives. I think most of them are familiar with it, but it’s worth describing in 3 words, because this construction is used quite often in AsyncOperation.

In the literature, this function is called compare and swap (CAS). In .net, it is available in the Interlocked class .



The signature is as follows:



public static T CompareExchange<T>(ref T location1, T value, T comparand) where T : class;


There are also overloads with int, long, float, double, IntPtr, object.



The method itself is atomic, that is, it is executed without interruptions. Compares 2 values ​​and, if they are equal, performs the assignment of a new value to a variable. They solve the problem when you need to check the value of a variable and change the variable depending on it.



Let's say you want to increment a variable if its value is less than 10.



Then there are 2 threads.



Stream 1 Stream 2
Checks the value of a variable for some condition (that is, is it less than 10) that is triggered -
Between checking and changing a value Assigns a variable a value that does not satisfy a condition (for example, 15)
Changes the value, although it should not, because the condition is no longer met -




When using this method, you either change exactly the value that you wanted, or do not change, while receiving the actual value of the variable.



location1 is a variable whose value we want to change. It is compared with comparand, in case of equality, value is written in location1. If the operation succeeds, the method will return the past value of the location1 variable. If not, the actual value of location1 will be returned.

Speaking a little deeper, there is a cmpxchg assembly language instruction that performs these actions. It is she who is used under the hood.



Stack dive



While looking at all this code, I came across references to "Stack Dive" more than once. This is a very cool and interesting thing that is actually very undesirable. The bottom line is that with synchronous execution of continuations we can run out of stack resources.



Let's say we have 10,000 tasks in style



//code1
await ...
//code2


Suppose the first task completes the execution and thereby releases the continuation of the second, which we immediately begin to execute synchronously in this thread, that is, taking a piece of the stack with the frame of the given continuation. In turn, this continuation unlocks the continuation of the third task, which we also begin to immediately perform. Etc. If there are no more await's in the sequel or something that somehow drops the stack, then we will simply consume the stack space to the stop. What can cause StackOverflow and application crash. In the code review, I will mention how AsyncOperation fights this.



AsyncOperation as an IValueTaskSource implementation



Source code .



Inside AsyncOperation, there is a _continuation field of type Action <object>. The field is used for, believe it or not, continuations. But, as is often the case in too modern code, fields have additional responsibilities (like the garbage collector and the last bit in the method table reference). Field _continuation from the same series. There are 2 special values ​​that can be stored in this field, besides the continuation itself and null. s_availableSentinel and s_completedSentinel . These fields indicate that the operation is available and completed, respectively. It can be accessed just for reuse for a completely asynchronous operation.



Also AsyncOperation implements IThreadPoolWorkItemwith only one method - void Execute () => SetCompletionAndInvokeContinuation (). The SetCompletionAndInvokeContinuation method does the continuation. And this method is called either directly in the AsyncOperation code, or through the mentioned Execute. After all, types that implement IThreadPoolWorkItem can be thrown into the thread pool somehow like this ThreadPool.UnsafeQueueUserWorkItem (this, preferLocal: false).



The Execute method will be executed by the thread pool.



The execution itself of the continuation is pretty trivial.



The continuation of _continuation is copied to a local variable, s_completedSentinel is written in its place- an artificial puppet object (or a sentry, I don’t know how to speak in our speech), which indicates that the task is completed. And then a local copy of the real continuation is simply executed. If there is an ExecutionContext, these actions are posted to the context. There is no secret here. This code can be called directly by the class - simply by calling a method that encapsulates these actions, or through the IThreadPoolWorkItem interface in the thread pool. Now you can guess how the function with continuation execution works synchronously.



The first method of the IValueTaskSource interface is GetResult ( github ).



It's simple, he:



  1. _currentId.

    _currentId β€” , . . ;
  2. _continuation - s_availableSentinel. , , AsyncOperation . , (pooled = true);
  3. _result.

    _result TrySetResult .


Method TrySetResult ( github ).



The method is trivial. - it stores the received parameter in _result and signals completion, namely, it calls the SignalCompleteion method , which is quite interesting.



SignalCompletion method ( github ).



This method uses everything that we talked about at the beginning.



At the very beginning, if _continuation == null, we write the s_completedSentinel puppet.



Further, the method can be divided into 4 blocks. I'll tell you right away for ease of understanding the circuit, block 4 is just a synchronous execution of the continuation. That is, the trivial execution of the continuation through the method, as I described in the paragraph on IThreadPoolWorkItem.



  1. _schedulingContext == null, .. ( if).

    _runContinuationsAsynchronously == true, , β€” ( if).

    IThreadPoolWorkItem . AsyncOperation . .

    , if ( , ), , 2 3 , β€” .. 4 ;
  2. _schedulingContext is SynchronizationContext, ( if).

    _runContinuationsAsynchronously = true. . , , . , . 2 , :

    sc.Post(s => ((AsyncOperation<TResult>)s).SetCompletionAndInvokeContinuation(), this);
    


    . , , ( , ), 4 β€” ;
  3. , 2 . .

    , _schedulingContext TaskScheduler, . , 2, .. _runContinuationsAsynchronously = true TaskScheduler . , Task.Factory.StartNew . .
  4. β€” . , .


The second method of the IValueTaskSource interface is GetStatus ( github )

Just like a St. Petersburg donkey.



If _continuation! = _CompletedSentinel, then return ValueTaskSourceStatus.Pending

If error == null, then return ValueTaskSourceStatus.Succeeded

If _error.SourceException is OperationCanceledException, then return ValueTaskSourceStatus.Canceled

Well, since much came up here, return ValueTaskSourceStatus.Faulted



third and final , but the most complex method of the IValueTaskSource interface is OnCompleted ( github )



The method adds a continuation that is executed upon completion.



Captures ExecutionContext and SynchronizationContext as needed.



Next, the Interlocked.CompareExchange described above is used to store the continuation in the field, comparing it to null. As a reminder, CompareExchange returns the actual value of a variable.



If the save of the continuation has passed, then the value that was in the variable before the update is returned, that is, null. This means that the operation was not completed at the time of recording the continuation. And the one who completes it himself will figure it out (as we saw above). And we do not make sense to perform any additional actions. And this completes the work of the method.



If the value was not saved, that is, something other than null was returned from CompareExchange. In this case, someone managed to put value in faster than us. That is, one of 2 situations occurred - either the task completed faster than we got here, or there was an attempt to write more than 1 continuation, which cannot be done.



Thus, we check the returned value, whether it is equal to s_completedSentinel - it would be exactly what would be written in case of completion.



  • If this is not s_completedSentinel , then we were not used according to plan - they tried to add more than one continuation. That is, the one that has already been written down, and the one that we are writing. And this is an exceptional situation;
  • s_completedSentinel, , , . , _runContinuationsAsynchronously = false.

    , , OnCompleted, awaiter'. . , AsyncOperation β€” System.Threading.Channels. , . , . , , ( ) . , awaiter' , , . awaiter'.

    In order to avoid this situation, you must run the continuation asynchronously no matter what. It is executed according to the same schemes as the first 3 blocks in the SignalCompleteion method - simply in a pool, in context, or through a factory and scheduler


And here is an example of synchronous continuations:



class Program
    {
        static async Task Main(string[] args)
        {
            Channel<int> unboundedChannel = Channel.CreateUnbounded<int>(new UnboundedChannelOptions
            {
                AllowSynchronousContinuations = true
            });

            ChannelWriter<int> writer = unboundedChannel;
            ChannelReader<int> reader = unboundedChannel;

            Console.WriteLine($"Main, before await. Thread id: {Thread.CurrentThread.ManagedThreadId}");

            var writerTask = Task.Run(async () =>
            {
                Thread.Sleep(500);
                int objectToWriteInChannel = 555;
                Console.WriteLine($"Created thread for writing with delay, before await write. Thread id: {Thread.CurrentThread.ManagedThreadId}");
                await writer.WriteAsync(objectToWriteInChannel);
                Console.WriteLine($"Created thread for writing with delay, after await write. Thread id: {Thread.CurrentThread.ManagedThreadId}");
            });

            //Blocked here because there are no items in channel
            int valueFromChannel = await reader.ReadAsync();
            Console.WriteLine($"Main, after await (will be processed by created thread for writing). Thread id: {Thread.CurrentThread.ManagedThreadId}");

            await writerTask;

            Console.Read();
        }
    }


Output:



Main, before await. Thread id: 1

Created thread for writing with delay, before await write. Thread id: 4

Main, after await (will be processed by created thread for writing). Thread id: 4

Created thread for writing with delay, after await write. Thread id: 4



All Articles