As announced on the Rust Blog a few weeks ago, the long awaited async-await syntax hit beta and is slated for release with 1.39 early November. Take a look at the Async Book for an in-depth introduction.

In eager anticipation of async-await, we’ve been using futures 0.1 crate for some time. But, we’re long overdue to finally take a look at where things are headed. The following is some loosely structured notes on the upcoming “stable” state of futures and async/await.

Foot in the Door

cargo new async and in Cargo.toml add:

[dependencies]
futures = { version = "0.3.0-alpha", package = "futures-preview" }

Here using the “mostly final” Futures 0.3 crate.

As explained in The Book there are three release channels: nightly, beta, and stable. You may have used “nightly” before to access the latest and greatest features- this is where async/await previously lived. “beta” is where a version goes before becoming the next “stable” release.

# Update all installed toolchains
rustup update
# List installed toolchains
rustup toolchain list
# Install beta toolchain
rustup install beta

There’s a few ways to use the beta/nightly channels:

# List toolchain overrides
rustup override list
# Set default toolchain for directory
rustup override set beta # or "nightly"
# Now defaults to beta toolchain
cargo build

# Explicitly build using beta/nightly toolchain
cargo +beta build # or "+nightly"

In src/main.rs:

async fn hello_world() {
    println!("Hello world");
}

async fn start() {
    hello_world().await
}

fn main() {
    let future = start();
    futures::executor::block_on(future);
}

cargo run to greet the world.

Notice how await is post-fix instead of await hello_world() as found in many other languages. The syntax was heavily debated, but the rationale boils down to improving: method chaining, co-existance with the ? operator, and precedence rules.

A contrived example with a series of calls (some of which can fail):

let answer = can_fail().await?
    .some_func().await
    .member_can_fail().await?
    .get_answer()?;

You can’t understand async without understanding Rust’s Future trait. Perhaps the first thing to learn about Future is they’re lazy; nothing happens unless something “pumps” them- as executor::block_on does here. Contrast this with std::thread::spawn which creates a running thread. If futures are polled, does that mean Rust async programming isn’t event-driven à la epoll/kqueue? Don’t fret, a Waker can be used to signal the future is ready to be polled again.

Error-Handling

We have test code like:

while !done.load(Ordering::Relaxed) {
    match block_on(ctx.receive()) {
        Ok(msg) => {
            let pipe = msg.get_pipe()?;
            let mut response = msg.dup()?;
            response.set_pipe(&pipe);
            block_on(ctx.send(response))?;
        }
        _ => panic!(),
    }
}

How we might re-write it:

let future = async {
    while !done.load(Ordering::Relaxed) {
        match ctx.receive().await {
            Ok(msg) => {
                let pipe = msg.get_pipe()?;
                let mut response = msg.dup()?;
                response.set_pipe(&pipe);
                ctx.send(response).await?;
            }
            _ => panic!(),
        }
    }
};

block_on(future);

Unfortunately, how the ? operator works in async blocks (i.e. async {}) is not defined, and async closures (i.e. async || {}) are unstable.

If we replace ? with .unwrap() it compiles and runs.

Heterogeneous Returns

Given:

broker_pull_ctx.receive().for_each(|msg| {
    if let Ok(msg) = msg {
        broker_push_ctx.send(msg).then(|msg| {
            // Do something with the message
            future::ready(())
        })
    } else {
        future::ready(())
    }
});

Sadness:

    |
144 |  /                 if let Ok(msg) = msg {
145 |  |                     broker_push_ctx.send(msg).then(|res| {
    |  |_____________________-
146 | ||                         res.unwrap().unwrap();
147 | ||                         future::ready(())
148 | ||                     })
    | ||______________________- expected because of this
149 |  |                 } else {
150 |  |                     future::ready(())
    |  |                     ^^^^^^^^^^^^^^^^^ expected struct `futures_util::future::then::Then`, found struct `futures_util::future::ready::Ready`
151 |  |                 }
    |  |_________________- if and else have incompatible types
    |
    = note: expected type `futures_util::future::then::Then<futures_channel::oneshot::Receiver<std::result::Result<(), runng::result::Error>>, futures_util::future::ready::Ready<_>, [closure@runng/tests/test_main.rs:145:52: 148:22]>`
               found type `futures_util::future::ready::Ready<_>`

Basically, then()- like many Rust combinators- returns a distinct type (Then in this case).

If you reach for a trait object for type erasure via -> Box<dyn Future<Output = ()>> and wrap the returns in Box::new() you’ll run into:

error[E0277]: the trait bound `dyn core::future::future::Future<Output = ()>: std::marker::Unpin` is not satisfied
   --> runng/tests/test_main.rs:155:58
    |
155 |             let fut = broker_pull_ctx.receive().unwrap().for_each(|msg| -> Box<dyn Future<Output = ()>> {
    |                                                          ^^^^^^^^ the trait `std::marker::Unpin` is not implemented for `dyn core::future::future::Future<Output = ()>`
    |
    = note: required because of the requirements on the impl of `core::future::future::Future` for `std::boxed::Box<dyn core::future::future::Future<Output = ()>>`

Lo, the 1.33 feature “pinning”. Thankfully, the type-insanity that is Pin<Box<dyn Future<Output = T>>> is common enough that a future::BoxFuture<T> alias is provided:

let fut = broker_pull_ctx...for_each(|msg| -> future::BoxFuture<()> {
    if let Ok(msg) = msg {
        Box::pin(broker_push_ctx.send(msg).then(|_| { }))
    } else {
        Box::pin(future::ready(()))
    }
});
block_on(fut);

Alternatively, you can multiplex the return with something like future::Either:

let fut = broker_pull_ctx...for_each(|msg| {
    use futures::future::Either;
    if let Ok(msg) = msg {
        Either::Left(
            broker_push_ctx.send(msg).then(|_| { })
        )
    } else {
        Either::Right(future::ready(()))
    }
});
block_on(fut);

This avoids the boxing allocation, but it might become a bit gnarly if there’s a large number of return types.

block_on() != .await

Our initial, exploratory implementation made heavy use of wait() found in futures 0.1. To transition to async/await it’s tempting to replace wait() with block_on():

#[test]
fn block_on_panic() -> runng::Result<()> {
    let url = get_url();

    let mut rep_socket = protocol::Rep0::open()?;
    let mut rep_ctx = rep_socket.listen(&url)?.create_async()?;

    let fut = async {
        block_on(rep_ctx.receive()).unwrap();
    };
    block_on(fut);

    Ok(())
}

cargo test block_on_panic yields:

---- tests::reqrep_tests::block_on_panic stdout ----
thread 'tests::reqrep_tests::block_on_panic' panicked at 'cannot execute `LocalPool` executor from within another executor: EnterError', src/libcore/result.rs:1165:5

Note this isn’t a compiler error, it’s a runtime panic. I haven’t looked into the details of this, but the problem stems from the nested calls to block_on(). It seems that if the inner future finishes immediately everything is fine, but not if it blocks. However, it works as expected with await:

let fut = async {
    rep_ctx.receive().await.unwrap();
};
block_on(fut);

Async Traits

Try:

trait AsyncTrait {
    async fn do_stuff();
}

Nope:

error[E0706]: trait fns cannot be declared `async`
 --> src/main.rs:5:5
  |
5 |     async fn do_stuff();
  |     ^^^^^^^^^^^^^^^^^^^^

How about:

trait AsyncTrait {
    fn do_stuff();
}

struct Type;

impl AsyncTrait for Type {
    async fn do_stuff() { }
}

Nope:

error[E0706]: trait fns cannot be declared `async`
  --> src/main.rs:11:5
   |
11 |     async fn do_stuff() { }
   |     ^^^^^^^^^^^^^^^^^^^^^^^

One work-around involves being explicit about the return types, and using an async block within the impl:

use futures::future::{self, BoxFuture, Future};

trait AsyncTrait {
    fn boxed_trait() -> Box<dyn Future<Output = ()>>;
    fn pinned_box() -> BoxFuture<'static, ()>;
}

impl<T> AsyncTrait for T {
    fn boxed_trait() -> Box<dyn Future<Output = ()>> {
        Box::new(async {
            // .await to your heart's content
        })
    }
    fn pinned_box() -> BoxFuture<'static, ()> {
        Box::pin(async {
            // .await to your heart's content
        })
    }
}