What changes does Rust need to make it easier to write asynchronous code?



Asynchronous programming is a powerful tool. But the Rust ecosystem continues to evolve, and so far the language is far from ideal. In particular, for this reason, many people think that asynchronous programming in Rust is a pain. However, some not only criticize, but also suggest. Among such people is the author of this article. 



Here I will share some of the previously proposed ideas and link them to newer suggestions. I'm going to do some thought experiment and try to answer the question "What could we do with asynchronous programming in Rust if we were given complete carte blanche?" 



Rust's ill-conceived changes can ruin it. Therefore, everything must be done carefully, taking into account the pros and cons. I admit that some of the proposals may cause a negative reaction. I treat this with understanding and ask the reader to approach the study of this material as openly as possible.



Threads vs Asynchrony



Writing asynchronous code is often more difficult than just using streams. But with threads, we cannot get a performance benefit, since they are too gluttonous. There is a lot of overhead in the process of switching between different threads and exchanging data between them. Even a thread that sits and does nothing is using up valuable system resources. More often than not, asynchronous code is much faster. But not always:



For example, this echo server is written using streams. It works faster than its asynchronous version - for the case when the number of simultaneous connections does not exceed 100.



In my opinion, the best argument for async is this:



  • it allows you to efficiently model a complex process of control flow of execution. 


For example, suspending or canceling an operation on the fly would be difficult to implement without making the operation asynchronous. Or, in the case where multiple threads (one connection per thread) are competing for a limited resource, you have to use synchronization primitives. By applying the concept of asynchrony, we can achieve better performance by working with multiple connections on the same thread.



The main problem with asynchrony in Rust



At first, it may seem to some that writing asynchronous code in Rust is easy. But his opinion will change after the first difficulties that are associated with pitfalls not described in the documentation. Enthusiasts try to keep track of themselves and propose solutions. You need to dig in several places at once, but in my opinion, the biggest problem with asynchrony in Rust is related to the violation of the principle of least surprise.



If the purpose of an element or combination is unclear, then its behavior should be the most expected. The code should behave as the programmer expects.


In this article, I will give an example many times with a certain Alan, who began to learn Rust and faces the first difficulties. 



So Alan read the book on Rust and Tokio Docks. He wants to write his own chat server. Alan chooses a simple line protocol and uses a prefix code to encrypt the strings. Its string parsing function looks like this:



async fn parse_line(socket: &TcpStream) -> Result<String, Error> {
    let len = socket.read_u32().await?;
    let mut line = vec![0; len];
    socket.read_exact(&mut line).await?;
    let line = str::from_utf8(line)?;
    Ok(line)
}

      
      





This code is very similar to regular, non-asynchronous, Rust code, with the exception of the async keyword and invocation. await . Although Alan has never written in Rust before, he is confident that he knows how this feature works. As we will see later, he is wrong. When tested locally, his chat server works without errors, so Alan sends the link to Barbara. Unfortunately, after a little chatting, the server crashes with an " invalid UTF-8 " error . Now Alan does not understand what the matter is: he checks the code and finds no errors.



So what's the problem? It turns out that under the hood, in the call stack, the select  macro is used ! Futures :: select



macro starts several futures (asynchronous computations) at the same time and transfers control to the user as soon as any of the futures is completed.



loop {
    select! {
        line_in = parse_line(&socket) => {
            if let Some(line_in) = line_in {
                broadcast_line(line_in);
            } else {
                //  ,   
                break;
            }
        }
        line_out = channel.recv() => {
            write_line(&socket, line_out).await;
        }
    }
}
      
      





Let's assume that a message arrived in the chat (via channel ) at the exact time when parse_line is still busy processing data. But select! aborts the parse_line operation, preventing it from completing parsing. On the next iteration of the loop, parse_line is called again and starts parsing from the middle of the frame, resulting in reading gibberish.



Therein lies the problem: any asynchronous Rust function can stop working at any time, as it can simply be interrupted. And this situation is not something out of the ordinary. Where should a beginner Rust developer be thinking to understand the reason for this behavior? 



Yes, none. You just need to change this behavior - make changes to the language itself. 



#1:



If the behavior of the language does not correspond to expectations and intuitive ideas, you need to tell the student the right direction, and not be silent. And in an amicable way, it is necessary to minimize such unpleasant surprises in the learning process, especially at an early stage.



Let's start by fixing the unexpected cancellation (or interruption) of asynchronous operations. Let's make them fully execute (this idea was first proposed here ). Using guaranteed completion futures, we add some blocking to asynchronous Rust, but keep the async and await keywords . Spawning spawned tasks (using spawn ) adds concurrency, and asynchronous pipes (type Channel ) provide interaction between threads and exchange of tasks. Therefore, let's pass it to select! channels or "channel types" (eg JoinHandle ).



Here's how the code from the first two examples changes:



async fn handle_connection(socket: TcpStream, channel: Channel) {
    let reader = Arc::new(socket);
    let writer = reader.clone();
    
    let read_task = task::spawn(async move {
        while let Some(line_in) in parse_line(&reader).await {
            broadcast_line(line_in);
        }
    });
    
    loop {
        // 
        select! {
            res = read_task.join() => {
                //  ,   
                break;
            }
            line_out = channel.recv() => {
                write_line(&writer, line_out).await;
            }
        }
    }
}
      
      





All asynchronous operations should now be complete, select! accepts only channel types, the call to parse_line () is moved inside the spawned task. These small changes to the code could have prevented the problem Alan was facing. If Alan tries to call parse_line () inside select !, he will receive a compiler error with a recommendation to create a task in order to call this function inside it. 



When using channel types inside select! you do not have to worry that some branches will not be executed. Channels can store values ​​atomically. The fact that some branches do not work out, the data will not be lost.



Canceling an asynchronous operation



What happens if an error occurs while writing? In the above code, read_task will continue to work. But instead, Alan wants the error to gracefully close the connection and all tasks. Unfortunately, this is where we are already starting to run into language design problems. 



If we could interrupt any asynchronous operation at any time, everything would be decided by the forced termination of the futura. But now we've introduced guaranteed completion futures! We will not go back, we will have to make new changes to Rust. 



The ability to cancel an operation on the fly is one of the main reasons for using the asynchronous approach. Let's try using the cancel () method :



async fn handle_connection(socket: TcpStream, channel: Channel) {
    let reader = Arc::new(socket);
    let writer = reader.clone();
    
    let read_task = task::spawn(async move {
        while let Some(line_in) in parse_line(&reader).await? {
            broadcast_line(line_in)?;
        }
        
        Ok(())
    });
    
    loop {
        // 
        select! {
            _ = read_task.join() => {
                //      - ,
                //   
                break;
            }
            line_out = channel.recv() => {
                if write_line(&writer, line_out).await.is_err() {
                    read_task.cancel();
                    read_task.join();
                }
            }
        }
    }
}
      
      





But what can one cancel () do here? It cannot immediately interrupt the task, because we are using futures with guaranteed completion. And we want the canceled task to stop working and complete as soon as possible. Unfortunately, it will simply return an "interrupted" error instead. Further attempts to use resources in this task will also result in errors. 



As a result, the task hangs indefinitely, continuing to display error messages. True, at some point it still ends.



Finding such strange behavior, Alan could try to figure out what exactly is going on while the task is hanging. To do this, you can add the println statement to your code ! or use other ways to debug.



Explicit and Implicit .await Calls



Unbeknownst to Alan, his chat server avoids most of the system calls by using io_uring (this is an interface to the Linux kernel that allows you to send and receive data asynchronously). Asynchronous Rust can transparently use the io_uring API thanks to guaranteed completion futures. When Alan resets the TcpStream value at the end of handle_connection () , the socket should close asynchronously. AsyncDrop implementation for TcpStream  looks like this:



impl AsyncDrop for TcpStream {
    async fn drop(&mut self) {
        self.uring.close(self.fd).await; //  await  !
    }
}
      
      





And what about when .await is called implicitly? This question remains open. Today, calling .await is required to asynchronously wait for futura to complete. In this case, the AsyncDrop trait adds another pitfall when control goes out of scope of the asynchronous context. This behavior violates the principle of least surprise. Why are implicit calls to .await needed when explicit calls are used along with them?



A head-on solution to the problem suggests itself: 



  • make all .await calls explicit.



my_tcp_stream.read(&mut buf).await?;
async_drop(my_tcp_stream).await;
      
      





And if, for example, the user forgets to call async_drop (my_tcp_stream) .await - what happens? Note that there is an error in the above code snippet: operator ? will skip the call to async_drop if the read fails. The Rust compiler may issue a warning indicating a problem, but how do you fix it?



Change # 2: Removing .await



What if, instead of requiring us to explicitly call async_drop (...) .await , we removed the await keyword altogether? Then you don't have to write it after calling each asynchronous function (for example, socket.read_u32 () .await). However, then when calling asynchronous functions (with the async keyword), all calls to .await become implicit.



This line of thought may seem inconsistent. And so it is. But all our proposals and hypotheses need to be tested. The implicit .await is of limited use and depends on the context, since it occurs only in asynchronous operations. Alan only needs to look at the function definition (the async keyword) to understand that he is in an asynchronous context. Moreover, it will become easier not only for Alan, but also for the code analyzers.



Avoiding explicit calls to .await has another advantage: the code becomes more like Rust without asynchrony. And then the only noticeable difference is the need to annotate certain functions with the async keyword. In this case, the problem of “lazy futures” (which are launched only when necessary) also disappears by itself, so Alan cannot “accidentally” write such code and wonder why “two” is printed first.



async fn my_fn_one() {
    println!("one");
}

async fn my_fn_two() {
    println!("two");
}

async fn mixup() {
    let one = my_fn_one();
    let two = my_fn_two();
    
    join!(two, one);
}
      
      





One of the RFC requests at one time really caused some discussion on the topic of implicit calls to .await. At the time, the most compelling argument against was that implicit calls to .await increased the number of contingencies in which an asynchronous operation could be interrupted. But in the case of futures with guaranteed completion, this argument becomes invalid. 



Be that as it may, not explicitly calling .await is a very serious change and must be approached with caution. Appropriate research should reveal how the pros outweigh the cons.



Change # 3: Ditch Arc and use scoped tasks



Alan can now develop his chat server using asynchronous Rust without looking under the hood and encountering unexpected behavior. The compiler encourages him to use channel types and add async to his functions, and these guidelines really work. He shows his code to Barbara and asks if Arc should be used for the socket ( let reader = Arc :: new (socket);).



Barbara instead invites him to look towards scoped tasks . This is the asynchronous equivalent of scoped threads . Tasks of this type are capable of borrowing data belonging to their "parent". 



async fn handle_connection(socket: TcpStream, channel: Channel) {
    task::scope(async |scope| {
        let read_task = scope.spawn(async || {
            while let Some(line_in) in parse_line(&socket)? {
                broadcast_line(line_in)?;
            }

            Ok(())
        });
        
        loop {
            // 
            select! {
                _ = read_task.join() => {
                 //      - ,
                //   
                    break;
                }
                line_out = channel.recv() => {
                    if write_line(&writer, line_out).is_err() {
                        break;
                    }
                }
            }
        }
    });
}

      
      





Such a solution should ensure that asynchronous operations are performed completely. But it has a drawback: in order to use scoped tasks, you have to make the Future :: poll method unsafe, since now we will not be able to poll the futura until it completes. Language developers will have to add an unsafe implementation of the Future trait to the language. You will have to implement such traits as AsyncRead and AsyncIterator. But I think this is an achievable goal.



The asynchronous completion guarantee will also allow pointers to be passed from the scoped task to the Linux kernel when using io_uring or when integrating with C ++ futures. 



Change # 4: Removing FuturesUnordered



Asynchronous Rust applications today can provide concurrency by spawning a new task using select! or FuturesUnordered. So far, we've talked a lot about the first two options. I continue to suggest not talking about FuturesUnordered, as it is a common source of errors. With FuturesUnordered, it's easy to create tasks, expecting them to run in the background and then wondering that they don't show any progress.



You can "imitate" FuturesUnordered using the same scoped tasks and TaskSet . It is much more reliable.



let greeting = «Hello».to_string();

task::scope(async |scope| {

    let mut task_set = scope.task_set();

    

    for i in 0..10 {

        task_set.spawn(async {

            println!(»{} from task {}», greeting, i);

            

            i

        });

    }

    

    async for res in task_set {

        println!(«task completed {:?}», res);

    }

});

      
      





Each spawned task runs in parallel, borrowing data from the spawning task, and the TaskSet provides an API similar to FuturesUnordered. Primitives like buffered stream can also be implemented with scoped tasks.



The current model of asynchronous Rust does not allow exploring other concurrency primitives. This could be possible if we made Rust with guaranteed completion of asynchronous operations (which we arrived at in this article).



Change # 5: Add option # [abort_safe]



At the beginning of this article, I argued that using asynchronous programming allows us to efficiently model complex flow control. The most efficient primitive we have today is select! .. Earlier in this article I suggested using it, albeit only with channel types. But then you need to spawn two tasks for each connection - for simultaneous reading and writing. Spawned tasks really help prevent errors when canceling (interrupting) an operation. But let's try to find a more efficient solution and rewrite the read operation in case of its unexpected interruption.



For example mini-rediswhen parsing frames, it first saves the received data in a buffer. When a read operation is interrupted, data is not lost because it is in the buffer. The next read call will resume where we left off. This implementation can be called "abort_safe".



What if instead of using select! for channel types we will use abort_safe operations. Operations such as receiving data from a pipe or reading from a buffered I / O descriptor are abort_safe by default. We were lucky. But instead, we will require the developer to explicitly specify # [abort_safe] when implementing the appropriate function. This is a more winning strategy.



# [abort_safe]



#[abort_safe]
async fn read_line(&mut self) -> io::Result<Option<String>> {
    loop {
        //     
        if let Some(line) = self.parse_line()? {
            return Ok(line);
        }

        //        
        if 0 == self.socket.read_buf(&mut self.buffer)? {
            //    .
            if self.buffer.is_empty() {
                return Ok(None);
            } else {
                return Err("connection reset by peer".into());
            }
        }
    }
}
      
      





Instead of using abort_safe operations by default, we will make it an option (comparable to opt-in in marketing). You can, as it were, voluntarily "subscribe" to such an option. When a developer gets acquainted with such code, the annotation informs him that these and these functions should be abort_safe. The Rust compiler can even issue additional checks and warnings for functions marked with # [abort_safe].



Alan can now use his read_line () function with "select!" But without channel types.



loop {
    select! {
        line_in = connection.read_line()? => {
            if let Some(line_in) = line_in {
                broadcast_line(line_in);
            } else {
                //  ,   
                break;
            }
        }
        line_out = channel.recv() => {
            connection.write_line(line_out)?;
        }
    }
}

      
      





Note that you can use a combination of functions with and without the # [abort_safe] option in your code. Calling the abort_safe function is always possible from both safe and unsafe contexts. The opposite is not true: the Rust compiler will prevent unsafe safe context functions from being called, and will print an appropriate error message.



async fn must_complete() { ... }

#[abort_safe]
async fn can_abort() {
    // Invalid call => compiler error
    must_complete();
}
 
async fn must_complete() { ... }

#[abort_safe]
async fn can_abort() {
    // Valid call
    spawn(async { must_complete() }).join();
}
      
      





The developer can always create a new task to associate an unsafe function with a safe context.



The inclusion of two flavors of asynchronous functions will complicate the language, but this complexity will appear later in the learning curve. We need to start learning about asynchronous Rust in an unsafe context (without considering abort_safe). From this context, the student can call asynchronous functions regardless of this option. Information about it will be available in the latest, advanced chapters of the Asynchronous Rust tutorial. 



At least that's how I imagine it all.



Bright future



To move from the current asynchronous model with abort_safe operations by default to the model with guaranteed completion, you will need to significantly improve Rust. Let's say all work will be completed by 2026. Normal futures will be changed to guaranteed completion futures. Instead, the old (regular) futures in the 2026 version will live under the name AbortSafeFuture.



It is the addition of # [abort_safe] to the asynchronous functions that will result in the AbortSafeFuture appearing instead of the old Future. Any asynchronous function written in versions of Rust prior to 2026 should be able to use AbortSafeFuture. This will make all existing asynchronous code compatible with the new version (recall that the abort_safe function can be called from any context).



Updating the old language codebase will require adding # [abort_safe] to all asynchronous functions. This is a mechanical process and can be easily automated. To add support for asynchronous Rust with guaranteed completion to the Tokio runtime, it will also need to be overhauled.



Time to act



I've talked about a few changes that I think will help simplify asynchronous programming in Rust:



  1. Using futures with guaranteed completion
  2. We refuse .await
  3. Ditching Arc and using scoped tasks
  4. Ditching FuturesUnordered and Expanding Concurrency Capabilities
  5. Add option # [abort_safe]


They will also help improve the very mechanism for performing asynchronous operations. But before making any decisions, we need more experimental data. What percentage of today's asynchronous code is safe from unwanted interrupts?



Can we do enough research to assess the potential benefits of these changes? Conversely, how much harder will it be to learn and program in Rust if there are two kinds of asynchronous functions (with and without the abort_safe option)?



Hopefully this article will spark some discussion as well, and maybe you can suggest alternative solutions. It's time to try the most daring ideas.






VDS / VPS hosting with fast NVM-drives and daily payment. Upload your ISO.






All Articles