h1
Async Cancellation I
-
2021-11-10
Sometimes we start things but decide midway through that we would prefer to rather not be doing them. That process is sometimes referred to as cancellation. Say we accidentally clicked “download” on a large file in the browser. We should have a way to tell the computer to stop downloading it.
When the async foundations WG was researching user stories earlier this year, async cancellation came up repeatedly. It’s one of those things that’s important to have, but can be tricky to reason about. This is not helped by the fact that little has been written about it, so I figured I might try to make a dent in that by writing a deep-dive on the topic.
In this post we’ll look at async Rust’s async primitives, and cover how cancellation works for those primitives today. We’ll then proceed to look at ways in which we can ensure we do not end up with dangling resources. And finally we’ll take a look at what the current direction of async Rust means for async cancellation. Sounds like a plan? Good, let’s dive in!
Tasks and Futures
For the purpose of this post we need to distinguish between two types of async primitives in Rust: futures and tasks 1.
Arguably “Stream” / “AsyncIterator” is a third async primitive, but
everything we say about the Future
type applies to Stream
as well, so we’re
considering those the same in this post.
-
Futures represent the core building block of async Rust, and exist as part of the Rust language and stdlib. A future is something that when
.await
ed becomes a value later. By design it does nothing unless.await
ed, and must be driven to completion by some other, external loop (example of such a loop). -
Tasks are not yet part of the Rust language or stdlib, and represent a managed 2 piece of async execution backed by an asynchronous runtime 3. Tasks often allow for async code to become multi-threaded: it’s common for executors to schedule tasks across multiple threads, and even move them between threads after they’ve started executing. A task does not need to be
.await
ed before it starts executing, and the future it returns is just a way to get the return value of the task once it finishes.
“managed” is used throughout this post as: “is scheduled on a
runtime”. This typically comes with the requirement futures are 'static
(the
future does not hold any borrows), and when scheduled on a multi-threaded
runtime often, but not always, requires futures be Send
as well (the future is
thread-safe).
You can think of Tasks as being something akin to lightweight threads, managed by your program rather than you operating system.
Many languages, including JavaScript, use equivalents to Rust tasks rather than Rust futures as their core building blocks 4. This is convenient because only a single type of async building block is exposed, and the language runtime optimizers can figure out how to speed things up if needed. But in Rust we unfortunately can’t rely on that, so we manually distinguish between unmanaged (futures) and managed (tasks) primitives.
JavaScript’s counterpart to Rust’s tasks is
Promise
.
It starts being executed the moment it’s instantiated, rather than the moment
it’s await
ed. My understanding is that C#’s
Task<T>
works much the same way.
Cancelling Futures
Cancellation allows for a future to stop doing work early, when we know we’re no longer interested in its result. Futures in Rust can be cancelled at one of two points. For simplicity most our examples in this post are going to use sleeping and printing operations; where in the real world we’d probably be talking about file/network operations and data processing instead.
1. Cancel a future before it starts executing
Here we create a drop guard, pass it to an async function which returns a
future, and then drop the future before we .await
it:
use std::time::Duration;
struct Guard;
impl Drop for Guard {
fn drop(&mut self) {
println!("2");
}
}
async fn foo(guard: Guard) {
println!("3");
task::sleep(Duration::from_secs(1)).await;
println!("4");
}
fn main() {
println!("1");
let guard = Guard {};
let fut = foo(guard);
drop(fut);
println!("done");
}
This prints:
> 1
> 2
> done
Our Guard
type here will print when its
destructor (Drop
) is run.
We never actually execute the future, but the destructor is still run because we
passed the value to the async function. This means the first cancellation point
of any future is immediately after instantiation before the async functions’s
body has run. Meaning not all cancellation points are necessarily demarked by
.await
.
2. Cancel a future at an await point
Here we create a future, poll it exactly once, and then drop it:
use std::{ptr, task};
use async_std::task;
use std::time::Duration;
async fn foo() {
println!("2");
task::sleep(Duration::from_secs(1)).await;
println!("3");
}
let mut fut = Box::pin(foo());
let mut cx = empty_cx();
println!("1");
assert!(fut.as_mut().poll(&mut cx).is_pending());
drop(fut);
println!("done");
/// Create an empty "Waker" callback, wrapped in a "Context" structure.
/// How this works is not particularly important for the rest of this post.
fn empty_cx() -> task::Context { ... }
> 1
> 2
> done
Fundamentally, you can think of .await
as marking a point where
cancellation may occur. Where keywords like return
and ?
mark points where
the function may return a value, .await
marks a location where the function’s
caller may decide the function should not run any further. But importantly: in
all cases destructors will be run, allowing resources to be cleaned up.
Futures cannot be cancelled in between .await
calls, or after the last
.await
call. We also don’t yet have a design for “async Drop
” so we also
can’t say anything meaningful yet about how how that will interact with
cancellation.
Cancelling Tasks
Because tasks aren’t standardized in Rust yet, cancellation of tasks isn’t
either. And unsurprisingly different runtimes have different ideas on how to
cancel a task. Both async-std
and tokio
share a similar task model.
I’m most comfortable with async-std
5, so let’s use that as an
example:
Probably because I co-authored the project.
use async_std::task;
let handle = task::spawn(async {
task::sleep(Duration::from_secs(1)).await;
println!("2");
});
println!("1");
drop(handle); // Drop the task handle.
task::sleep(Duration::from_secs(2)).await;
println!("done");
> 1
> 2
> done
Here we dropped the handle, but the task continued to run in the background.
This is because the async-std
runtime uses “detach on drop” semantics for
tasks. This is the same in the tokio
runtime. In order to cancel a task, we
need to manually call a method on the handle. For async-std
this is
JoinHandle::cancel
,
and for tokio
this is
JoinHandle::abort
:
use async_std::task;
let handle = task::spawn(async {
task::sleep(Duration::from_secs(1)).await;
println!("2");
});
println!("1");
handle.cancel().await; // Cancel the task handle
task::sleep(Duration::from_secs(2)).await;
println!("done");
> 1
> done
We can see that if we call JoinHandle::cancel
, the task is cancelled at that
point, and the number 2
is no longer printed.
Propagating cancellation for futures
In Async Rust cancellation automatically propagates for futures. When we stop
polling a future, all futures contained within will in turn also stop making
forward progress. And all
destructors within them
will be run. In this example we’ll be using the
FutureExt::timeout
function from async-std
, which returns a Result<T, TimeoutError>
.
use async_std::prelude::*;
use async_std::task;
use std::time::Duration;
async fn foo() {
println!("2");
bar().timeout(Duration::from_secs(3)).await;
println!("5");
}
async fn bar() {
println!("3");
task::sleep(Duration::from_secs(2)).await;
println!("4");
}
println!("1");
foo().timeout(Duration::from_secs(1)).await;
println!("done");
> 1
> 2
> 3
> done # `4` and `5` are never printed
The tree of futures above can be expressed as the following graph:
main
-> foo (times out after 1 sec)
-> bar (times out after 3 secs)
-> task::sleep (wait for 2 secs)
Because foo
is timed out and dropped before bar
has the chance to complete,
not all numbers get to be printed. bar
is dropped before it has the opportunity
to complete, and all of its resources are cleaned up when its destructors run.
This means that in order to cancel a chain of future, all we need to do is to drop it, and all resources will in turn be cleaned up. Whether we’re dropping a future by hand, calling a timeout method on a future, or racing multiple futures — cancellation will propagate, and resources will be cleaned up.
Propagating cancellation for tasks
As we showed earlier, simply dropping a task handle is not enough to cancel a task in most runtimes. We need to explicitly invoke a cancellation method to cancel the task. This means tasks aren’t automatically propagated.
In hindsight this was probably a mistake. More specifically, this was likely
my mistake. async-std
’s JoinHandle
was the first in the ecosystem,
and I pushed that we should model it using the “detach-on-drop” behavior
directly after std::thread::JoinHandle
. What I didn’t account for though,
is that threads cannot be cancelled externally: std::thread::JoinHandle
doesn’t, and may likely never have a cancel
method 6.
The way a thread can be cancelled from the outside is
by passing a channel to the thread, and cancelling when a message is received.
Unlike async Rust, threads don’t have .await
points which act as natural
stopping points. So instead threads have to opt-in to cancellation by manually
listening for signals.
Failing to propagate cancellation means that if we cancel a tree of work, we
may end up with dangling tasks that continue when we really don’t want to.
Rather than having a cancel
method which allows us to manually opt-in to
cancellation propatation (more on that later), cancellation propagation should
be opt-out by default.
Luckily we don’t need to guess what a runtime with “cancellation propagation is
opt-out” behavior would look like. The
async-task
executor, and in turn smol
runtime do
exactly that.
smol::block_on(async {
println!("1");
let task = smol::spawn(async {
println!("2");
});
drop(task);
println!("done")
});
> 1
> done
Patching cancellation propagation
Unfortunately only smol
propagates cancellation across tasks, and it would be
a breaking change to modify the cancellation propagation behavior of
JoinHandle
in other runtimes.
Users of runtimes can still ensure that cancellation will always correctly be
propagated by creating a custom spawn
function that contains a drop guard like
so:
use tokio::task;
use core::future::Future;
use core::pin::Pin;
use core::task::{Context, Poll};
/// Spawn a new tokio Task and cancel it on drop.
pub fn spawn<T>(future: T) -> Wrapper<T::Output>
where
T: Future + Send + 'static,
T::Output: Send + 'static,
{
Wrapper(task::spawn(future))
}
/// Cancels the wrapped tokio Task on Drop.
pub struct Wrapper<T>(task::JoinHandle<T>);
impl<T> Future for Wrapper<T>{
type Output = Result<T, task::JoinError>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
unsafe { Pin::new_unchecked(&mut self.0) }.poll(cx)
}
}
impl<T> Drop for Wrapper<T> {
fn drop(&mut self) {
// do `let _ = self.0.cancel()` for `async_std::task::Task`
self.0.abort();
}
}
This wrapper can be adapted to work for async-std
as well, and ensures that
cancellation propagates across task bounds.
Structured concurrency
Given we’re talking a lot about propagating cancellation and trees in this post, we probably should mention the concept of “structured concurrency”.
In order for async Rust to be structurally concurrent I think of tasks as having the following requirements:
- Ensure 7 sure child tasks don’t outlive their parents.
- If we cancel a parent task the child task should be cancelled too.
- If a child task raises an error, the parent task should be able to act on it.
More on whether we can actually ensure this later on in this post.
This structure makes it so errors don’t accidentally go ignored, or we fail to cancel tasks further down in the tree. It’s the basis for effectively implementing other mechanisms on top, such as retries, limits, supervisors, and transactions.
Cancelling a group of tasks
Cancellation becomes more difficult to implement when we’re dealing with a stream of tasks, each of which needs to be spawned on the runtime. Currently a lot of async code just spawns tasks, detaches them, and logs if case of an error:
// The async-std "echo tcp server" example.
use async_std::io;
use async_std::net::{TcpListener, TcpStream};
use async_std::prelude::*;
use async_std::task;
// Listen for new TCP connections on port 8080.
let listener = TcpListener::bind("127.0.0.1:8080").await?;
// Iterate over the incoming connections, and move each task to a multi-threaded
// runtime.
let mut incoming = listener.incoming();
while let Some(stream) = incoming.next().await {
task::spawn(async {
// If an error occurs, log it to stderr.
if let Err(err) = run(stream).await {
eprintln!("error: {}", err);
}
});
}
// The main logic of our listen loop. This is a simple echo server.
async fn run(stream: io::Result<TcpStream>) -> io::Result<()> {
let stream = stream?;
let (reader, writer) = &mut (&stream, &stream);
io::copy(reader, writer).await?;
Ok(())
}
But if we’re trying to do Structured
Concurrency
correctly, we need to ensure cancellation propagates to those spawned tasks too.
In order to do so we need to introduce a new async primitive to Rust: the
TaskGroup
8.
This terminology is borrowed from Swift, but is similar-ish to how
crossbeam-scope
works (n threads
managed by a central point). However unlike crossbeam-scope
, the name concerns
itself less with how the lifetimes flow, and more how you can reason about how
it should be used.
To my knowledge no runtimes currently support this out of the box, but crates
for this exist on crates.io. One example of such a crate is
task-group
, authored by the Fastly WASM
group. Functionally it allows creating a group of tasks which act as a single
unit. If a task errors or panics, all other tasks are cancelled. And when the
TaskManager
is dropped, all currently running tasks are cancelled.
Task grouping (including task scopes) is a topic that needs its own blog post. But if you’re looking to apply cancellation propagation to a stream of items, at least now you know this is the primitive that enables that to work.
halt-safety
So far we’ve talked a fair bit about cancellation in this post. Now that we know
that the existence of .await
means that our function might exit, it’s natural
to ask: “If our Future’s state machines can be cancelled at any state, how do
we ensure they function correctly?”
When we’re using futures created through async/.await
, cancellation at
.await
points will functionally act no different than early returns through
the try (?
) operator. In both cases function execution is halted,
destructors are run, and resources are cleaned up. Take the following example;
// Regardless of where in the function we stop execution, destructors will be
// run and resources will be cleaned up.
async fn do_something(path: PathBuf) -> io::Result<Output> {
// 1. the future is not guaranteed to progress after instantiation
let file = fs::open(&path).await?; // 2. `.await` and 3. `?` can cause the function to halt
let res = parse(file).await; // 4. `.await` can the function to halt
res // 5. execution has finished, return a value
}
do_something
has 5 distinct points where it can halt execution and destructors
are run. It doesn’t matter whether .await
triggers a cancellation, ?
returns
an error, or our function exits as expected. Our destructors don’t need to care,
they only need to concern themselves with is ensuring they release whatever
resources they’re holding onto 9.
“Clean up resources regardless of what triggered the function to halt” is what I’m referring to as “halt-safety”. I’m not in love with the term, but I needed to find a way to give a name to this group of mechanisms. I hope the term is clear enough.
Things change a bit when we talk about Futures created using manual
Poll
state machines.
Within a manual state machine we don’t call .await
; instead we make forward
progress by manually calling Future::poll
or Stream::poll_next
. Similar when
dealing with async/.await
we want to ensure that destructors are run should
any of poll_*
, ?
, or return
run. Unlike async/.await
futures,
resiliency against our function halting isn’t automatically provided for us.
Instead manually authored futures need to implement the resiliency that
async/.await
futures rely on.
I like to think about a futures call graph like a tree. There are the leaf
nodes in the tree, which need to guarantee cancellation is handled correctly.
And there are branch nodes in the tree, which are all created through
async/.await
and rely on the leaf nodes correctly implementing resource
cleanups.
Kind of like how in non-async Rust we don’t really need to think about releasing
file handles or memory since we rely on that just working out of the box. The
only time we really need to think about how to release resources, is when we’re
implementing primitives like TcpStream
or Vec
ourselves.
Should tasks be detachable?
We’ve talked quite a bit about the importance of cancellation propagation for
tasks, perhaps to the point that you might now be wondering whether tasks should
be detachable at all. Unfortunately destructors in Rust are not guaranteed to
run, so we can’t can’t
actually stop people from passing JoinHandle
s to mem::forget
in order to
detach their tasks.
That does not mean that we should make detaching tasks easy though. For example:
MutexGuard
can be passed to mem::forget
too, but we don’t expose a method
for that directly on MutexGuard
because doing so causes the lock to be held
forever. Depending on how we feel about dangling tasks, we may want to use API
design to disincentivize people from going into unwanted states.
An async trait which can’t be cancelled?
There has been talk in the Async Foundations WG about potentially
creating a new async trait that would create a future which guarantees it
must be run to completion (e.g. “can’t be cancelled”). Our existing
core::future::Future
trait would inherit from that trait, and add the
additional guarantee that it can in fact be cancelled.
I’ve seen two main motivations for going in this direction:
- FFI compatibility with C++’s async system, where failing to run a “C++ Future” to completion is undefined behavior.
- It would make it easier for non-experts to author async Rust by making “cancellation” something you don’t need to learn about up front.
This is not the right post to dig into the first point, but the second point
certainly is interesting for us here. While I agree this may indeed pose a
hurdle for people writing async today, I don’t believe it will going forward
because of the work we’re doing to eliminate the need to manually author Poll
state machines already. Work is currently ongoing to for example change the
signature of Stream
from fn poll_next
to async fn next
. Similarly the
working group is working on adding async traits and async closures, all with the
goal to reduce the need to author futures by hand.
As we’ve observed, handling cancellation is most difficult when authoring futures by hand. As that is where we need to build the resiliency to halts (through cancellation and errors alike) that the rest of the futures call graph relies upon. If we make it so that manually authored futures are only required to implement the primitives that the rest of the ecosystem relies upon, then the problem has already been solved.
To close this out: I think Rust’s general approach to ensuring {IO safety, memory safety, halt safety} should be multi-pronged. We should make it so that for the vast majority of operations, operators shouldn’t need to reach for Rust’s powertools. But for when powertools are definitely the way to go, we should add all the lints and hints we can to ensure they’re operated safely 10.
For example, if I ever need to author something using
[MaybeUninit
], I want the compiler to remind me to author a drop
guard
when keeping it live past something which may panic. Perhaps someday (:
intermediate matching on cancellation?
While editing this post, I heard of a possible third motivation to potentially
have an alternate future trait: wanting the ability to match on cancellation.
The idea is that we can replace ?
with match
to handle errors using
alternate logic, but we cannot do at a cancellation point for .await
.
// Using the Try operator to re-throw the error.
let string = fs::read_to_string("my-file.txt").await?;
/// Manually re-throwing the error.
let string = match fs::read_to_string("my-file.txt").await {
Err(err) => return Err(err),
Ok(s) => s,
};
// We can replace the `?` with `match`, but we can't replace `.await`
// with anything else.
The idea is that in conjunction with an un-cancellable future trait,
cancellation would instead be performed by sending a signal to the underlying IO
resource we’re waiting on, which would in turn stop what it’s doing and return
an io::ErrorKind::Interrupted
error that should manually be bubbled up by
intermediate futures to where it can be matched on by the caller.
This argument may sound appealing: right now we indeed can’t destructure
.await
into a match
statement the way we can with ?
. So maybe this
mechanism would be useful?
To dive in a little; let’s assume we have the following tree of futures:
main
-> foo (times out after 1 sec)
-> bar (times out after 3 secs)
-> task::sleep (wait for 2 secs)
If we want to handle the timeout of foo (times out after 1 sec)
in our main
function, we could just match on it in either case:
// Current method of handling async cancellation.
let dur = Duration::from_secs(1);
let string = match fs::read_to_string("my-file.txt").timeout(dur).await {
Err(err) => panic!("timed out!"),
Ok(res) => res?,
};
// Runtime-signal cancellation.
let dur = Duration::from_secs(1);
let string = match fs::read_to_string("my-file.txt").timeout(dur).await {
Err(ErrorKind::Interrupted) => panic!("timed out!"),
Err(err) = return err,
Ok(s) => s,
};
In practice both approaches have incredibly similar semantics at the calling side. The main difference is that in the runtime-signal approach we may never hit the error path. In fact, there is no longer a guarantee that the inner futures propagate the cancellation. They can instead choose to ignore the cancellation and (erroneously) attempt to retry. Retries should always be scheduled alongside timeouts; that way if we timeout once we can retry again. If the two are de-coupled, retries beyond the first will not have an associated timeout, and risk hanging forever. This is undesireable, and something we should steer people away from. And our existing semantics already achieve that.
There may be reasons why we might want to detect cancellation on intermediate
futures, for example for the purpose of providing internal
logging. But this
is already possible by combining completion status
tracking
with Drop
guards.
This is further complicated by the fact that we’re now overloading
io::ErrorKind::Interrupted
to carry cancellations from both the operating
system, and trigger in user space. Errors cause by the operating system should
be retried in-place. But errors cause by users should always bubble up. We can
no longer tell them apart.
Another issue is that in order to uniformly support cancellation of futures
futures must now need to carry an io::Result
in their return type. We’d need
to rewrite I/O primitives such as
task::sleep
to be fallible just for this purpose. It’s not the worst; but it is reminescent
of the Futures 0.1 days, where futures had always had to be
fallible.
Overall I do think both approaches are roughly comparable. But because the cancellation-through-signals variant enables cancellations to be ignored (accidental, or otherwise), it exposes a set of footguns to users of async that our current system does not have 11.
This is probably worth an entire blog post on its own. But I have so many posts on in-progress already that I figured I’d add it in here.
Defer blocks?
The mechanisms to guard against halt-safety are similar to those for unwind safety: create a drop guard to ensure we run our destructors. Manually writing these can get pretty verbose.
Languages such as Swift and Go provide a language-level solution to this, in the
form of the defer
keyword. This effectively allows users of the language to
write an in-line drop guard to create an anonymous destructor. “The defer
keyword in Swift: try/finally done
right” is a thorough
overview of how this works in Swift. But for illustrative purposes, here’s an
example:
func writeLog() {
let file = openFile()
defer { closeFile(file) }
let hardwareStatus = fetchHardwareStatus()
guard hardwareStatus != "disaster" else { return /* defer block runs here */ }
file.write(hardwareStatus)
// defer block runs here
}
Rust’s scopeguard crate
provides a collection of defer
macros. But these wouldn’t suffice for the
example we shared
earlier,
since we want to maintain access to the data while continuing to access it, and
scopeguard::defer
doesn’t let
us.
This is because what we want is drop guards to not take ownership of the value
until the destructor is run. I believe this is also referred to as late
binding. And the best way we could achieve that would be by introducing a
language feature.
To be clear: I’m not necessarily advocating we introduce defer
into Rust. It
would add a form of non-linear control flow to the language which could confuse
a lot of folks 12. But if we consider {halt, unwind}
-safety to be
important enough that we want to provide users with better tools; then defer
seems like a candidate we may want to explore more closely.
Folks have told me this from using other languages. I’ve never
actually used defer
, so I can’t comment on what it’s like. But I have done a
lot of non-linear control flow by writing callback-heavy JavaScript, and that
indeed takes some getting used to.
update (2021-11-14): as folks correctly pointed out, while
scopeguard::defer!
does not provide provide access to the captured values
before dropping,
scopeguard::ScopeGuard
does via the Deref
/ DerefMut
traits. The flexibility of being able to
remove the Drop
impl of the guard by converting the guard into its inner value
makes for a compelling argument that solving in-line drop impls would be better
solved through a library addition than through a language item.
Cancellation and io_uring
This section was added on 2021-11-17, after the post was initially published.
Patrick Walton asked the following question /r/rust:
One motivation for completion futures that either wasn’t mentioned or is an unexpected side effect of point #1 (compatibility with C++) is that async code in C/C++ can take non-owning references into buffers. For example, on Linux if you issue an async read() call using io_uring and you later get cancelled, you have to tell the kernel somehow that it needs to not touch the buffer after Rust frees it. There are ways to do this, such as giving the kernel ownership of the buffers using IORING_REGISTER_BUFFERS, but having the kernel own the I/O buffers can make things awkward. (Async C++ folks have shown me patterns that would require copies in this case.) Have you all given any thought as to the best practices here? It’s a tough decision, as the only real solution I can think of involves making poll() unsafe, which is unsavory (NB: not necessarily wrong).
Saoirse wrote an excellent
overview of how the completion-based
Linux io_uring
family of kernel APIs
interacts with cancellation in Rust. The whole post is worth a read as it covers
much of the nuance and safety considerations involved when using io_uring
with
Rust. But it also directly answers Patrick’s question:
So I think this is the solution we should all adopt and move forward with: io-uring controls the buffers, the fastest interfaces on io-uring are the buffered interfaces, the unbuffered interfaces make an extra copy. We can stop being mired in trying to force the language to do something impossible.
Conclusion
In this post we’ve looked at how cancellation works for futures and tasks. We’ve covered how cancellation propagation ought to work, and how we can backport it to existing runtimes. And finally we’ve covered how to reason about structured concurrency, how cancellation propagation can be applied to groups of tasks, and how async cancellation might interact with async designs currently being drafted.
I hope this was informative! You might have noticed the title of this post has a “1” appended to it. I’m planning to post a follow-up to this post covering the design space of triggering cancellation at a distance, and possibly another on task grouping. working on different posts on async concurrency, so expect more on that soon.
I’ve gone and opened a discussion thread on Internals. And if you enjoyed this post, and would like to see whatever I’m dreaming up in a real-time fashion, you can follow me @yoshuawuyts.
Update (2021-11-14): Firstyear wrote a follow-up to this post on transactional operations in Rust, and how they interact with (async) cancellation and halt-safety. If you’re interested in transactions and rollbacks, I recommend giving it a read!
Thanks to: Eric Holk, Ryan Levick, Irina Shestak, and Francesco Cogno for helping review this post prior to publishing. And thanks to Niko Matsakis for walking me through some of the alternate futures (lol) of the future trait.