Yosh is writing

h1 Async Cancellation I
- 2021-11-10

Sometimes we start things but decide midway through that we would prefer to rather not be doing them. That process is sometimes referred to as cancellation. Say we accidentally clicked “download” on a large file in the browser. We should have a way to tell the computer to stop downloading it.

When the async foundations WG was researching user stories earlier this year, async cancellation came up repeatedly. It’s one of those things that’s important to have, but can be tricky to reason about. This is not helped by the fact that little has been written about it, so I figured I might try to make a dent in that by writing a deep-dive on the topic.

In this post we’ll look at async Rust’s async primitives, and cover how cancellation works for those primitives today. We’ll then proceed to look at ways in which we can ensure we do not end up with dangling resources. And finally we’ll take a look at what the current direction of async Rust means for async cancellation. Sounds like a plan? Good, let’s dive in!

Tasks and Futures

For the purpose of this post we need to distinguish between two types of async primitives in Rust: futures and tasks 1.

1

Arguably “Stream” / “AsyncIterator” is a third async primitive, but everything we say about the Future type applies to Stream as well, so we’re considering those the same in this post.

2

“managed” is used throughout this post as: “is scheduled on a runtime”. This typically comes with the requirement futures are 'static (the future does not hold any borrows), and when scheduled on a multi-threaded runtime often, but not always, requires futures be Send as well (the future is thread-safe).

3

You can think of Tasks as being something akin to lightweight threads, managed by your program rather than you operating system.

Many languages, including JavaScript, use equivalents to Rust tasks rather than Rust futures as their core building blocks 4. This is convenient because only a single type of async building block is exposed, and the language runtime optimizers can figure out how to speed things up if needed. But in Rust we unfortunately can’t rely on that, so we manually distinguish between unmanaged (futures) and managed (tasks) primitives.

4

JavaScript’s counterpart to Rust’s tasks is Promise. It starts being executed the moment it’s instantiated, rather than the moment it’s awaited. My understanding is that C#’s Task<T> works much the same way.

Cancelling Futures

Cancellation allows for a future to stop doing work early, when we know we’re no longer interested in its result. Futures in Rust can be cancelled at one of two points. For simplicity most our examples in this post are going to use sleeping and printing operations; where in the real world we’d probably be talking about file/network operations and data processing instead.

1. Cancel a future before it starts executing

Here we create a drop guard, pass it to an async function which returns a future, and then drop the future before we .await it:

use std::time::Duration;

struct Guard;
impl Drop for Guard {
    fn drop(&mut self) {
        println!("2");
    }
}

async fn foo(guard: Guard) {
    println!("3");
    task::sleep(Duration::from_secs(1)).await;
    println!("4");
}

fn main() {
    println!("1");
    let guard = Guard {};
    let fut = foo(guard);
    drop(fut);
    println!("done");
}

This prints:

> 1
> 2
> done

Our Guard type here will print when its destructor (Drop) is run. We never actually execute the future, but the destructor is still run because we passed the value to the async function. This means the first cancellation point of any future is immediately after instantiation before the async functions’s body has run. Meaning not all cancellation points are necessarily demarked by .await.

2. Cancel a future at an await point

Here we create a future, poll it exactly once, and then drop it:

use std::{ptr, task};
use async_std::task;
use std::time::Duration;

async fn foo() {
    println!("2");
    task::sleep(Duration::from_secs(1)).await;
    println!("3");
}

let mut fut = Box::pin(foo());
let mut cx = empty_cx();

println!("1");
assert!(fut.as_mut().poll(&mut cx).is_pending());
drop(fut);
println!("done");

/// Create an empty "Waker" callback, wrapped in a "Context" structure.
/// How this works is not particularly important for the rest of this post.
fn empty_cx() -> task::Context { ... }
> 1
> 2
> done

Fundamentally, you can think of .await as marking a point where cancellation may occur. Where keywords like return and ? mark points where the function may return a value, .await marks a location where the function’s caller may decide the function should not run any further. But importantly: in all cases destructors will be run, allowing resources to be cleaned up.

Futures cannot be cancelled in between .await calls, or after the last .await call. We also don’t yet have a design for “async Drop” so we also can’t say anything meaningful yet about how how that will interact with cancellation.

Cancelling Tasks

Because tasks aren’t standardized in Rust yet, cancellation of tasks isn’t either. And unsurprisingly different runtimes have different ideas on how to cancel a task. Both async-std and tokio share a similar task model. I’m most comfortable with async-std 5, so let’s use that as an example:

5

Probably because I co-authored the project.

use async_std::task;

let handle = task::spawn(async {
    task::sleep(Duration::from_secs(1)).await;
    println!("2");
});

println!("1");
drop(handle);     // Drop the task handle.
task::sleep(Duration::from_secs(2)).await;
println!("done");
> 1
> 2
> done

Here we dropped the handle, but the task continued to run in the background. This is because the async-std runtime uses “detach on drop” semantics for tasks. This is the same in the tokio runtime. In order to cancel a task, we need to manually call a method on the handle. For async-std this is JoinHandle::cancel, and for tokio this is JoinHandle::abort:

use async_std::task;

let handle = task::spawn(async {
    task::sleep(Duration::from_secs(1)).await;
    println!("2");
});

println!("1");
handle.cancel().await;    // Cancel the task handle
task::sleep(Duration::from_secs(2)).await;
println!("done");
> 1
> done

We can see that if we call JoinHandle::cancel, the task is cancelled at that point, and the number 2 is no longer printed.

Propagating cancellation for futures

In Async Rust cancellation automatically propagates for futures. When we stop polling a future, all futures contained within will in turn also stop making forward progress. And all destructors within them will be run. In this example we’ll be using the FutureExt::timeout function from async-std, which returns a Result<T, TimeoutError>.

use async_std::prelude::*;
use async_std::task;
use std::time::Duration;

async fn foo() {
    println!("2");
    bar().timeout(Duration::from_secs(3)).await;
    println!("5");
}

async fn bar() {
    println!("3");
    task::sleep(Duration::from_secs(2)).await;
    println!("4");
}

println!("1");
foo().timeout(Duration::from_secs(1)).await;
println!("done");
> 1
> 2
> 3
> done    # `4` and `5` are never printed

The tree of futures above can be expressed as the following graph:

main
  -> foo (times out after 1 sec)
    -> bar (times out after 3 secs)
      -> task::sleep (wait for 2 secs)

Because foo is timed out and dropped before bar has the chance to complete, not all numbers get to be printed. bar is dropped before it has the opportunity to complete, and all of its resources are cleaned up when its destructors run.

This means that in order to cancel a chain of future, all we need to do is to drop it, and all resources will in turn be cleaned up. Whether we’re dropping a future by hand, calling a timeout method on a future, or racing multiple futures — cancellation will propagate, and resources will be cleaned up.

Propagating cancellation for tasks

As we showed earlier, simply dropping a task handle is not enough to cancel a task in most runtimes. We need to explicitly invoke a cancellation method to cancel the task. This means tasks aren’t automatically propagated.

In hindsight this was probably a mistake. More specifically, this was likely my mistake. async-std’s JoinHandle was the first in the ecosystem, and I pushed that we should model it using the “detach-on-drop” behavior directly after std::thread::JoinHandle. What I didn’t account for though, is that threads cannot be cancelled externally: std::thread::JoinHandle doesn’t, and may likely never have a cancel method 6.

6

The way a thread can be cancelled from the outside is by passing a channel to the thread, and cancelling when a message is received. Unlike async Rust, threads don’t have .await points which act as natural stopping points. So instead threads have to opt-in to cancellation by manually listening for signals.

Failing to propagate cancellation means that if we cancel a tree of work, we may end up with dangling tasks that continue when we really don’t want to. Rather than having a cancel method which allows us to manually opt-in to cancellation propatation (more on that later), cancellation propagation should be opt-out by default.

Luckily we don’t need to guess what a runtime with “cancellation propagation is opt-out” behavior would look like. The async-task executor, and in turn smol runtime do exactly that.

smol::block_on(async {
    println!("1");
    let task = smol::spawn(async {
        println!("2");
    });
    drop(task);
    println!("done")
});
> 1
> done

Patching cancellation propagation

Unfortunately only smol propagates cancellation across tasks, and it would be a breaking change to modify the cancellation propagation behavior of JoinHandle in other runtimes.

Users of runtimes can still ensure that cancellation will always correctly be propagated by creating a custom spawn function that contains a drop guard like so:

use tokio::task;
use core::future::Future;
use core::pin::Pin;
use core::task::{Context, Poll};

/// Spawn a new tokio Task and cancel it on drop.
pub fn spawn<T>(future: T) -> Wrapper<T::Output>
where
    T: Future + Send + 'static,
    T::Output: Send + 'static,
{
    Wrapper(task::spawn(future))
}

/// Cancels the wrapped tokio Task on Drop.
pub struct Wrapper<T>(task::JoinHandle<T>);

impl<T> Future for Wrapper<T>{
    type Output = Result<T, task::JoinError>;
    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        unsafe { Pin::new_unchecked(&mut self.0) }.poll(cx)
    }
}

impl<T> Drop for Wrapper<T> {
    fn drop(&mut self) {
        // do `let _ = self.0.cancel()` for `async_std::task::Task`
        self.0.abort();
    }
}

This wrapper can be adapted to work for async-std as well, and ensures that cancellation propagates across task bounds.

Structured concurrency

Given we’re talking a lot about propagating cancellation and trees in this post, we probably should mention the concept of “structured concurrency”.

In order for async Rust to be structurally concurrent I think of tasks as having the following requirements:

  1. Ensure 7 sure child tasks don’t outlive their parents.
  2. If we cancel a parent task the child task should be cancelled too.
  3. If a child task raises an error, the parent task should be able to act on it.
7

More on whether we can actually ensure this later on in this post.

This structure makes it so errors don’t accidentally go ignored, or we fail to cancel tasks further down in the tree. It’s the basis for effectively implementing other mechanisms on top, such as retries, limits, supervisors, and transactions.

Cancelling a group of tasks

Cancellation becomes more difficult to implement when we’re dealing with a stream of tasks, each of which needs to be spawned on the runtime. Currently a lot of async code just spawns tasks, detaches them, and logs if case of an error:

// The async-std "echo tcp server" example.
use async_std::io;
use async_std::net::{TcpListener, TcpStream};
use async_std::prelude::*;
use async_std::task;

// Listen for new TCP connections on port 8080.
let listener = TcpListener::bind("127.0.0.1:8080").await?;

// Iterate over the incoming connections, and move each task to a multi-threaded
// runtime.
let mut incoming = listener.incoming();
while let Some(stream) = incoming.next().await {
    task::spawn(async {
        // If an error occurs, log it to stderr.
        if let Err(err) = run(stream).await {
            eprintln!("error: {}", err);
        }
    });
}

// The main logic of our listen loop. This is a simple echo server.
async fn run(stream: io::Result<TcpStream>) -> io::Result<()> {
    let stream = stream?;
    let (reader, writer) = &mut (&stream, &stream);
    io::copy(reader, writer).await?;
    Ok(())
}

But if we’re trying to do Structured Concurrency correctly, we need to ensure cancellation propagates to those spawned tasks too. In order to do so we need to introduce a new async primitive to Rust: the TaskGroup8.

8

This terminology is borrowed from Swift, but is similar-ish to how crossbeam-scope works (n threads managed by a central point). However unlike crossbeam-scope, the name concerns itself less with how the lifetimes flow, and more how you can reason about how it should be used.

To my knowledge no runtimes currently support this out of the box, but crates for this exist on crates.io. One example of such a crate is task-group, authored by the Fastly WASM group. Functionally it allows creating a group of tasks which act as a single unit. If a task errors or panics, all other tasks are cancelled. And when the TaskManager is dropped, all currently running tasks are cancelled.

Task grouping (including task scopes) is a topic that needs its own blog post. But if you’re looking to apply cancellation propagation to a stream of items, at least now you know this is the primitive that enables that to work.

halt-safety

So far we’ve talked a fair bit about cancellation in this post. Now that we know that the existence of .await means that our function might exit, it’s natural to ask: “If our Future’s state machines can be cancelled at any state, how do we ensure they function correctly?”

When we’re using futures created through async/.await, cancellation at .await points will functionally act no different than early returns through the try (?) operator. In both cases function execution is halted, destructors are run, and resources are cleaned up. Take the following example;

// Regardless of where in the function we stop execution, destructors will be
// run and resources will be cleaned up.
async fn do_something(path: PathBuf) -> io::Result<Output> {
                                        // 1. the future is not guaranteed to progress after instantiation
    let file = fs::open(&path).await?;  // 2. `.await` and 3. `?` can cause the function to halt
    let res = parse(file).await;        // 4. `.await` can the function to halt
    res                                 // 5. execution has finished, return a value
}

do_something has 5 distinct points where it can halt execution and destructors are run. It doesn’t matter whether .await triggers a cancellation, ? returns an error, or our function exits as expected. Our destructors don’t need to care, they only need to concern themselves with is ensuring they release whatever resources they’re holding onto 9.

9

“Clean up resources regardless of what triggered the function to halt” is what I’m referring to as “halt-safety”. I’m not in love with the term, but I needed to find a way to give a name to this group of mechanisms. I hope the term is clear enough.

Things change a bit when we talk about Futures created using manual Poll state machines. Within a manual state machine we don’t call .await; instead we make forward progress by manually calling Future::poll or Stream::poll_next. Similar when dealing with async/.await we want to ensure that destructors are run should any of poll_*, ?, or return run. Unlike async/.await futures, resiliency against our function halting isn’t automatically provided for us. Instead manually authored futures need to implement the resiliency that async/.await futures rely on.

I like to think about a futures call graph like a tree. There are the leaf nodes in the tree, which need to guarantee cancellation is handled correctly. And there are branch nodes in the tree, which are all created through async/.await and rely on the leaf nodes correctly implementing resource cleanups.

Kind of like how in non-async Rust we don’t really need to think about releasing file handles or memory since we rely on that just working out of the box. The only time we really need to think about how to release resources, is when we’re implementing primitives like TcpStream or Vec ourselves.

Should tasks be detachable?

We’ve talked quite a bit about the importance of cancellation propagation for tasks, perhaps to the point that you might now be wondering whether tasks should be detachable at all. Unfortunately destructors in Rust are not guaranteed to run, so we can’t can’t actually stop people from passing JoinHandles to mem::forget in order to detach their tasks.

That does not mean that we should make detaching tasks easy though. For example: MutexGuard can be passed to mem::forget too, but we don’t expose a method for that directly on MutexGuard because doing so causes the lock to be held forever. Depending on how we feel about dangling tasks, we may want to use API design to disincentivize people from going into unwanted states.

An async trait which can’t be cancelled?

There has been talk in the Async Foundations WG about potentially creating a new async trait that would create a future which guarantees it must be run to completion (e.g. “can’t be cancelled”). Our existing core::future::Future trait would inherit from that trait, and add the additional guarantee that it can in fact be cancelled.

I’ve seen two main motivations for going in this direction:

  1. FFI compatibility with C++’s async system, where failing to run a “C++ Future” to completion is undefined behavior.
  2. It would make it easier for non-experts to author async Rust by making “cancellation” something you don’t need to learn about up front.

This is not the right post to dig into the first point, but the second point certainly is interesting for us here. While I agree this may indeed pose a hurdle for people writing async today, I don’t believe it will going forward because of the work we’re doing to eliminate the need to manually author Poll state machines already. Work is currently ongoing to for example change the signature of Stream from fn poll_next to async fn next. Similarly the working group is working on adding async traits and async closures, all with the goal to reduce the need to author futures by hand.

As we’ve observed, handling cancellation is most difficult when authoring futures by hand. As that is where we need to build the resiliency to halts (through cancellation and errors alike) that the rest of the futures call graph relies upon. If we make it so that manually authored futures are only required to implement the primitives that the rest of the ecosystem relies upon, then the problem has already been solved.

To close this out: I think Rust’s general approach to ensuring {IO safety, memory safety, halt safety} should be multi-pronged. We should make it so that for the vast majority of operations, operators shouldn’t need to reach for Rust’s powertools. But for when powertools are definitely the way to go, we should add all the lints and hints we can to ensure they’re operated safely 10.

10

For example, if I ever need to author something using [MaybeUninit], I want the compiler to remind me to author a drop guard when keeping it live past something which may panic. Perhaps someday (:

intermediate matching on cancellation?

While editing this post, I heard of a possible third motivation to potentially have an alternate future trait: wanting the ability to match on cancellation. The idea is that we can replace ? with match to handle errors using alternate logic, but we cannot do at a cancellation point for .await.

// Using the Try operator to re-throw the error.
let string = fs::read_to_string("my-file.txt").await?;

/// Manually re-throwing the error.
let string = match fs::read_to_string("my-file.txt").await {
    Err(err) => return Err(err),
    Ok(s) => s,
};

// We can replace the `?` with `match`, but we can't replace `.await`
// with anything else.

The idea is that in conjunction with an un-cancellable future trait, cancellation would instead be performed by sending a signal to the underlying IO resource we’re waiting on, which would in turn stop what it’s doing and return an io::ErrorKind::Interrupted error that should manually be bubbled up by intermediate futures to where it can be matched on by the caller.

This argument may sound appealing: right now we indeed can’t destructure .await into a match statement the way we can with ?. So maybe this mechanism would be useful?

To dive in a little; let’s assume we have the following tree of futures:

main
  -> foo (times out after 1 sec)
    -> bar (times out after 3 secs)
      -> task::sleep (wait for 2 secs)

If we want to handle the timeout of foo (times out after 1 sec) in our main function, we could just match on it in either case:

// Current method of handling async cancellation.
let dur = Duration::from_secs(1);
let string = match fs::read_to_string("my-file.txt").timeout(dur).await {
    Err(err) => panic!("timed out!"),
    Ok(res) => res?,
};

// Runtime-signal cancellation.
let dur = Duration::from_secs(1);
let string = match fs::read_to_string("my-file.txt").timeout(dur).await {
    Err(ErrorKind::Interrupted) => panic!("timed out!"),
    Err(err) = return err,
    Ok(s) => s,
};

In practice both approaches have incredibly similar semantics at the calling side. The main difference is that in the runtime-signal approach we may never hit the error path. In fact, there is no longer a guarantee that the inner futures propagate the cancellation. They can instead choose to ignore the cancellation and (erroneously) attempt to retry. Retries should always be scheduled alongside timeouts; that way if we timeout once we can retry again. If the two are de-coupled, retries beyond the first will not have an associated timeout, and risk hanging forever. This is undesireable, and something we should steer people away from. And our existing semantics already achieve that.

There may be reasons why we might want to detect cancellation on intermediate futures, for example for the purpose of providing internal logging. But this is already possible by combining completion status tracking with Drop guards.

This is further complicated by the fact that we’re now overloading io::ErrorKind::Interrupted to carry cancellations from both the operating system, and trigger in user space. Errors cause by the operating system should be retried in-place. But errors cause by users should always bubble up. We can no longer tell them apart.

Another issue is that in order to uniformly support cancellation of futures futures must now need to carry an io::Result in their return type. We’d need to rewrite I/O primitives such as task::sleep to be fallible just for this purpose. It’s not the worst; but it is reminescent of the Futures 0.1 days, where futures had always had to be fallible.

Overall I do think both approaches are roughly comparable. But because the cancellation-through-signals variant enables cancellations to be ignored (accidental, or otherwise), it exposes a set of footguns to users of async that our current system does not have 11.

11

This is probably worth an entire blog post on its own. But I have so many posts on in-progress already that I figured I’d add it in here.

Defer blocks?

The mechanisms to guard against halt-safety are similar to those for unwind safety: create a drop guard to ensure we run our destructors. Manually writing these can get pretty verbose.

Languages such as Swift and Go provide a language-level solution to this, in the form of the defer keyword. This effectively allows users of the language to write an in-line drop guard to create an anonymous destructor. “The defer keyword in Swift: try/finally done right” is a thorough overview of how this works in Swift. But for illustrative purposes, here’s an example:

func writeLog() {
    let file = openFile()
    defer { closeFile(file) }

    let hardwareStatus = fetchHardwareStatus()
    guard hardwareStatus != "disaster" else { return /* defer block runs here */ }
    file.write(hardwareStatus)

    // defer block runs here
}

Rust’s scopeguard crate provides a collection of defer macros. But these wouldn’t suffice for the example we shared earlier, since we want to maintain access to the data while continuing to access it, and scopeguard::defer doesn’t let us. This is because what we want is drop guards to not take ownership of the value until the destructor is run. I believe this is also referred to as late binding. And the best way we could achieve that would be by introducing a language feature.

To be clear: I’m not necessarily advocating we introduce defer into Rust. It would add a form of non-linear control flow to the language which could confuse a lot of folks 12. But if we consider {halt, unwind}-safety to be important enough that we want to provide users with better tools; then defer seems like a candidate we may want to explore more closely.

12

Folks have told me this from using other languages. I’ve never actually used defer, so I can’t comment on what it’s like. But I have done a lot of non-linear control flow by writing callback-heavy JavaScript, and that indeed takes some getting used to.

update (2021-11-14): as folks correctly pointed out, while scopeguard::defer! does not provide provide access to the captured values before dropping, scopeguard::ScopeGuard does via the Deref / DerefMut traits. The flexibility of being able to remove the Drop impl of the guard by converting the guard into its inner value makes for a compelling argument that solving in-line drop impls would be better solved through a library addition than through a language item.

Cancellation and io_uring

This section was added on 2021-11-17, after the post was initially published.

Patrick Walton asked the following question /r/rust:

One motivation for completion futures that either wasn’t mentioned or is an unexpected side effect of point #1 (compatibility with C++) is that async code in C/C++ can take non-owning references into buffers. For example, on Linux if you issue an async read() call using io_uring and you later get cancelled, you have to tell the kernel somehow that it needs to not touch the buffer after Rust frees it. There are ways to do this, such as giving the kernel ownership of the buffers using IORING_REGISTER_BUFFERS, but having the kernel own the I/O buffers can make things awkward. (Async C++ folks have shown me patterns that would require copies in this case.) Have you all given any thought as to the best practices here? It’s a tough decision, as the only real solution I can think of involves making poll() unsafe, which is unsavory (NB: not necessarily wrong).

Saoirse wrote an excellent overview of how the completion-based Linux io_uring family of kernel APIs interacts with cancellation in Rust. The whole post is worth a read as it covers much of the nuance and safety considerations involved when using io_uring with Rust. But it also directly answers Patrick’s question:

So I think this is the solution we should all adopt and move forward with: io-uring controls the buffers, the fastest interfaces on io-uring are the buffered interfaces, the unbuffered interfaces make an extra copy. We can stop being mired in trying to force the language to do something impossible.

Conclusion

In this post we’ve looked at how cancellation works for futures and tasks. We’ve covered how cancellation propagation ought to work, and how we can backport it to existing runtimes. And finally we’ve covered how to reason about structured concurrency, how cancellation propagation can be applied to groups of tasks, and how async cancellation might interact with async designs currently being drafted.

I hope this was informative! You might have noticed the title of this post has a “1” appended to it. I’m planning to post a follow-up to this post covering the design space of triggering cancellation at a distance, and possibly another on task grouping. working on different posts on async concurrency, so expect more on that soon.

I’ve gone and opened a discussion thread on Internals. And if you enjoyed this post, and would like to see whatever I’m dreaming up in a real-time fashion, you can follow me @yoshuawuyts.

Update (2021-11-14): Firstyear wrote a follow-up to this post on transactional operations in Rust, and how they interact with (async) cancellation and halt-safety. If you’re interested in transactions and rollbacks, I recommend giving it a read!

Thanks to: Eric Holk, Ryan Levick, Irina Shestak, and Francesco Cogno for helping review this post prior to publishing. And thanks to Niko Matsakis for walking me through some of the alternate futures (lol) of the future trait.