As announced on the Rust Blog a few weeks ago, the long awaited async-await syntax hit beta and is slated for release with 1.39 early November. Take a look at the Async Book for an in-depth introduction.
In eager anticipation of async-await, we’ve been using futures 0.1 crate for some time. But, we’re long overdue to finally take a look at where things are headed. The following is some loosely structured notes on the upcoming “stable” state of futures and async/await.
Foot in the Door
cargo new async
and in Cargo.toml
add:
[dependencies]
futures = { version = "0.3.0-alpha", package = "futures-preview" }
Here using the “mostly final” Futures 0.3 crate.
As explained in The Book there are three release channels: nightly, beta, and stable. You may have used “nightly” before to access the latest and greatest features- this is where async/await previously lived. “beta” is where a version goes before becoming the next “stable” release.
# Update all installed toolchains
rustup update
# List installed toolchains
rustup toolchain list
# Install beta toolchain
rustup install beta
There’s a few ways to use the beta/nightly channels:
# List toolchain overrides
rustup override list
# Set default toolchain for directory
rustup override set beta # or "nightly"
# Now defaults to beta toolchain
cargo build
# Explicitly build using beta/nightly toolchain
cargo +beta build # or "+nightly"
In src/main.rs
:
async fn hello_world() {
println!("Hello world");
}
async fn start() {
hello_world().await
}
fn main() {
let future = start();
futures::executor::block_on(future);
}
cargo run
to greet the world.
Notice how await
is post-fix instead of await hello_world()
as found in many other languages. The syntax was heavily debated, but the rationale boils down to improving: method chaining, co-existance with the ?
operator, and precedence rules.
A contrived example with a series of calls (some of which can fail):
let answer = can_fail().await?
.some_func().await
.member_can_fail().await?
.get_answer()?;
You can’t understand async
without understanding Rust’s Future
trait. Perhaps the first thing to learn about Future
is they’re lazy; nothing happens unless something “pumps” them- as executor::block_on
does here. Contrast this with std::thread::spawn
which creates a running thread. If futures are polled, does that mean Rust async programming isn’t event-driven à la epoll/kqueue? Don’t fret, a Waker
can be used to signal the future is ready to be polled again.
Error-Handling
We have test code like:
while !done.load(Ordering::Relaxed) {
match block_on(ctx.receive()) {
Ok(msg) => {
let pipe = msg.get_pipe()?;
let mut response = msg.dup()?;
response.set_pipe(&pipe);
block_on(ctx.send(response))?;
}
_ => panic!(),
}
}
How we might re-write it:
let future = async {
while !done.load(Ordering::Relaxed) {
match ctx.receive().await {
Ok(msg) => {
let pipe = msg.get_pipe()?;
let mut response = msg.dup()?;
response.set_pipe(&pipe);
ctx.send(response).await?;
}
_ => panic!(),
}
}
};
block_on(future);
Unfortunately, how the ?
operator works in async blocks (i.e. async {}
) is not defined, and async closures (i.e. async || {}
) are unstable.
If we replace ?
with .unwrap()
it compiles and runs.
Heterogeneous Returns
Given:
broker_pull_ctx.receive().for_each(|msg| {
if let Ok(msg) = msg {
broker_push_ctx.send(msg).then(|msg| {
// Do something with the message
future::ready(())
})
} else {
future::ready(())
}
});
Sadness:
|
144 | / if let Ok(msg) = msg {
145 | | broker_push_ctx.send(msg).then(|res| {
| |_____________________-
146 | || res.unwrap().unwrap();
147 | || future::ready(())
148 | || })
| ||______________________- expected because of this
149 | | } else {
150 | | future::ready(())
| | ^^^^^^^^^^^^^^^^^ expected struct `futures_util::future::then::Then`, found struct `futures_util::future::ready::Ready`
151 | | }
| |_________________- if and else have incompatible types
|
= note: expected type `futures_util::future::then::Then<futures_channel::oneshot::Receiver<std::result::Result<(), runng::result::Error>>, futures_util::future::ready::Ready<_>, [closure@runng/tests/test_main.rs:145:52: 148:22]>`
found type `futures_util::future::ready::Ready<_>`
Basically, then()
- like many Rust combinators- returns a distinct type (Then
in this case).
If you reach for a trait object for type erasure via -> Box<dyn Future<Output = ()>>
and wrap the returns in Box::new()
you’ll run into:
error[E0277]: the trait bound `dyn core::future::future::Future<Output = ()>: std::marker::Unpin` is not satisfied
--> runng/tests/test_main.rs:155:58
|
155 | let fut = broker_pull_ctx.receive().unwrap().for_each(|msg| -> Box<dyn Future<Output = ()>> {
| ^^^^^^^^ the trait `std::marker::Unpin` is not implemented for `dyn core::future::future::Future<Output = ()>`
|
= note: required because of the requirements on the impl of `core::future::future::Future` for `std::boxed::Box<dyn core::future::future::Future<Output = ()>>`
Lo, the 1.33 feature “pinning”. Thankfully, the type-insanity that is Pin<Box<dyn Future<Output = T>>>
is common enough that a future::BoxFuture<T>
alias is provided:
let fut = broker_pull_ctx...for_each(|msg| -> future::BoxFuture<()> {
if let Ok(msg) = msg {
Box::pin(broker_push_ctx.send(msg).then(|_| { }))
} else {
Box::pin(future::ready(()))
}
});
block_on(fut);
Alternatively, you can multiplex the return with something like future::Either
:
let fut = broker_pull_ctx...for_each(|msg| {
use futures::future::Either;
if let Ok(msg) = msg {
Either::Left(
broker_push_ctx.send(msg).then(|_| { })
)
} else {
Either::Right(future::ready(()))
}
});
block_on(fut);
This avoids the boxing allocation, but it might become a bit gnarly if there’s a large number of return types.
block_on()
!= .await
Our initial, exploratory implementation made heavy use of wait()
found in futures 0.1. To transition to async/await it’s tempting to replace wait()
with block_on()
:
#[test]
fn block_on_panic() -> runng::Result<()> {
let url = get_url();
let mut rep_socket = protocol::Rep0::open()?;
let mut rep_ctx = rep_socket.listen(&url)?.create_async()?;
let fut = async {
block_on(rep_ctx.receive()).unwrap();
};
block_on(fut);
Ok(())
}
cargo test block_on_panic
yields:
---- tests::reqrep_tests::block_on_panic stdout ----
thread 'tests::reqrep_tests::block_on_panic' panicked at 'cannot execute `LocalPool` executor from within another executor: EnterError', src/libcore/result.rs:1165:5
Note this isn’t a compiler error, it’s a runtime panic. I haven’t looked into the details of this, but the problem stems from the nested calls to block_on()
. It seems that if the inner future finishes immediately everything is fine, but not if it blocks. However, it works as expected with await:
let fut = async {
rep_ctx.receive().await.unwrap();
};
block_on(fut);
Async Traits
Try:
trait AsyncTrait {
async fn do_stuff();
}
Nope:
error[E0706]: trait fns cannot be declared `async`
--> src/main.rs:5:5
|
5 | async fn do_stuff();
| ^^^^^^^^^^^^^^^^^^^^
How about:
trait AsyncTrait {
fn do_stuff();
}
struct Type;
impl AsyncTrait for Type {
async fn do_stuff() { }
}
Nope:
error[E0706]: trait fns cannot be declared `async`
--> src/main.rs:11:5
|
11 | async fn do_stuff() { }
| ^^^^^^^^^^^^^^^^^^^^^^^
One work-around involves being explicit about the return types, and using an async block within the impl:
use futures::future::{self, BoxFuture, Future};
trait AsyncTrait {
fn boxed_trait() -> Box<dyn Future<Output = ()>>;
fn pinned_box() -> BoxFuture<'static, ()>;
}
impl<T> AsyncTrait for T {
fn boxed_trait() -> Box<dyn Future<Output = ()>> {
Box::new(async {
// .await to your heart's content
})
}
fn pinned_box() -> BoxFuture<'static, ()> {
Box::pin(async {
// .await to your heart's content
})
}
}