Last chapter on async Rust

This commit is contained in:
LukeMathWalker 2024-05-15 20:00:48 +02:00
parent 6e581440bf
commit aa58dcb5ca
33 changed files with 1799 additions and 10 deletions

448
Cargo.lock generated
View File

@ -2,6 +2,27 @@
# It is not intended for manual editing.
version = 3
[[package]]
name = "addr2line"
version = "0.21.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8a30b2e23b9e17a9f90641c7ab1549cd9b44f296d3ccbf309d2863cfe398a0cb"
dependencies = [
"gimli",
]
[[package]]
name = "adler"
version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe"
[[package]]
name = "anyhow"
version = "1.0.83"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "25bdb32cbbdce2b519a9cd7df3a678443100e265d5e25ca763b7572a5104f5f3"
[[package]]
name = "arrays"
version = "0.1.0"
@ -14,6 +35,56 @@ version = "0.1.0"
name = "assoc_vs_generic"
version = "0.1.0"
[[package]]
name = "async_fn"
version = "0.1.0"
dependencies = [
"anyhow",
"tokio",
]
[[package]]
name = "async_locks"
version = "0.1.0"
dependencies = [
"tokio",
]
[[package]]
name = "autocfg"
version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0c4b4d0bd25bd0b74681c0ad21497610ce1b7c91b1022cd21c80c6fbdd9476b0"
[[package]]
name = "backtrace"
version = "0.3.71"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "26b05800d2e817c8b3b4b54abd461726265fa9789ae34330622f2db9ee696f9d"
dependencies = [
"addr2line",
"cc",
"cfg-if",
"libc",
"miniz_oxide",
"object",
"rustc-demangle",
]
[[package]]
name = "bitflags"
version = "2.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cf4b9d6a944f767f8e5e0db018570623c85f3d925ac718db4e06d0187adb21c1"
[[package]]
name = "blocking"
version = "0.1.0"
dependencies = [
"anyhow",
"tokio",
]
[[package]]
name = "bounded"
version = "0.1.0"
@ -28,6 +99,31 @@ dependencies = [
"ticket_fields",
]
[[package]]
name = "bytes"
version = "1.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "514de17de45fdb8dc022b1a7975556c53c86f9f0aa5f534b98977b171857c2c9"
[[package]]
name = "cancellation"
version = "0.1.0"
dependencies = [
"tokio",
]
[[package]]
name = "cc"
version = "1.0.97"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "099a5357d84c4c61eb35fc8eafa9a79a902c2f76911e5747ced4e032edd8d9b4"
[[package]]
name = "cfg-if"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
[[package]]
name = "channels"
version = "0.1.0"
@ -126,6 +222,19 @@ version = "0.1.0"
name = "from"
version = "0.1.0"
[[package]]
name = "future"
version = "0.1.0"
dependencies = [
"tokio",
]
[[package]]
name = "gimli"
version = "0.28.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4271d37baee1b8c7e4b708028c57d816cf9d2434acb33a549475f78c181f6253"
[[package]]
name = "hashmap"
version = "0.1.0"
@ -137,6 +246,12 @@ dependencies = [
name = "heap"
version = "0.1.0"
[[package]]
name = "hermit-abi"
version = "0.3.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d231dfb89cfffdbc30e7fc41579ed6066ad03abda9e567ccafae602b97ec5024"
[[package]]
name = "if_else"
version = "0.1.0"
@ -202,7 +317,11 @@ name = "intro_05"
version = "0.1.0"
[[package]]
name = "intro_06"
name = "intro_07"
version = "0.1.0"
[[package]]
name = "intro_08"
version = "0.1.0"
[[package]]
@ -223,6 +342,12 @@ dependencies = [
name = "leaking"
version = "0.1.0"
[[package]]
name = "libc"
version = "0.2.154"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ae743338b92ff9146ce83992f766a31066a91a8c84a45e0e9f21e7cf6de6d346"
[[package]]
name = "lifetime"
version = "0.1.0"
@ -230,6 +355,16 @@ dependencies = [
"ticket_fields",
]
[[package]]
name = "lock_api"
version = "0.4.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "07af8b9cdd281b7915f413fa73f29ebd5d55d0d3f0155584dade1ff18cea1b17"
dependencies = [
"autocfg",
"scopeguard",
]
[[package]]
name = "locks"
version = "0.1.0"
@ -242,6 +377,32 @@ dependencies = [
name = "match_"
version = "0.1.0"
[[package]]
name = "memchr"
version = "2.7.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6c8640c5d730cb13ebd907d8d04b52f55ac9a2eec55b440c8892f40d56c76c1d"
[[package]]
name = "miniz_oxide"
version = "0.7.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9d811f3e15f28568be3407c8e7fdb6514c1cda3cb30683f15b6a1a1dc4ea14a7"
dependencies = [
"adler",
]
[[package]]
name = "mio"
version = "0.8.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a4a650543ca06a924e8b371db273b2756685faae30f8487da1b56505a8f78b0c"
dependencies = [
"libc",
"wasi",
"windows-sys 0.48.0",
]
[[package]]
name = "modules"
version = "0.1.0"
@ -257,6 +418,25 @@ dependencies = [
"common",
]
[[package]]
name = "num_cpus"
version = "1.16.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4161fcb6d602d4d2081af7c3a45852d875a03dd337a6bfdd6e06407b61342a43"
dependencies = [
"hermit-abi",
"libc",
]
[[package]]
name = "object"
version = "0.32.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a6a622008b6e321afc04970976f62ee297fdbaa6f95318ca343e3eebb9648441"
dependencies = [
"memchr",
]
[[package]]
name = "orphan"
version = "0.1.0"
@ -293,6 +473,29 @@ version = "0.1.0"
name = "panics"
version = "0.1.0"
[[package]]
name = "parking_lot"
version = "0.12.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7e4af0ca4f6caed20e900d564c242b8e5d4903fdacf31d3daf527b66fe6f42fb"
dependencies = [
"lock_api",
"parking_lot_core",
]
[[package]]
name = "parking_lot_core"
version = "0.9.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e401f977ab385c9e4e3ab30627d6f26d00e2c73eef317493c4ec6d468726cf8"
dependencies = [
"cfg-if",
"libc",
"redox_syscall",
"smallvec",
"windows-targets 0.52.5",
]
[[package]]
name = "patch"
version = "0.1.0"
@ -301,6 +504,12 @@ dependencies = [
"ticket_fields",
]
[[package]]
name = "pin-project-lite"
version = "0.2.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bda66fc9667c18cb2758a2ac84d1167245054bcf85d5d1aaa6923f45801bdd02"
[[package]]
name = "proc-macro2"
version = "1.0.81"
@ -319,6 +528,15 @@ dependencies = [
"proc-macro2",
]
[[package]]
name = "redox_syscall"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "469052894dcb553421e483e4209ee581a45100d31b4018de03e5a7ad86374a7e"
dependencies = [
"bitflags",
]
[[package]]
name = "references_in_memory"
version = "0.1.0"
@ -334,6 +552,20 @@ dependencies = [
"ticket_fields",
]
[[package]]
name = "runtime"
version = "0.1.0"
dependencies = [
"anyhow",
"tokio",
]
[[package]]
name = "rustc-demangle"
version = "0.1.24"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "719b953e2095829ee67db738b3bfa9fa368c94900df327b3f07fe6e794d2fe1f"
[[package]]
name = "rwlock"
version = "0.1.0"
@ -350,6 +582,12 @@ version = "0.1.0"
name = "scoped_threads"
version = "0.1.0"
[[package]]
name = "scopeguard"
version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49"
[[package]]
name = "setters"
version = "0.1.0"
@ -357,6 +595,15 @@ dependencies = [
"common",
]
[[package]]
name = "signal-hook-registry"
version = "1.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a9e9e0b4211b72e7b8b6e85c807d36c212bdb33ea8587f7569562a84df5465b1"
dependencies = [
"libc",
]
[[package]]
name = "sized"
version = "0.1.0"
@ -365,6 +612,22 @@ version = "0.1.0"
name = "slice"
version = "0.1.0"
[[package]]
name = "smallvec"
version = "1.13.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3c5e1a9a646d36c3599cd173a41282daf47c44583ad367b8e6837255952e5c67"
[[package]]
name = "socket2"
version = "0.5.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ce305eb0b4296696835b71df73eb912e0f1ffd2556a501fcede6e0c50349191c"
dependencies = [
"libc",
"windows-sys 0.52.0",
]
[[package]]
name = "source"
version = "0.1.0"
@ -373,6 +636,14 @@ dependencies = [
"thiserror",
]
[[package]]
name = "spawn"
version = "0.1.0"
dependencies = [
"anyhow",
"tokio",
]
[[package]]
name = "stack"
version = "0.1.0"
@ -456,6 +727,36 @@ dependencies = [
"thiserror",
]
[[package]]
name = "tokio"
version = "1.37.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1adbebffeca75fcfd058afa480fb6c0b81e165a0323f9c9d39c9697e37c46787"
dependencies = [
"backtrace",
"bytes",
"libc",
"mio",
"num_cpus",
"parking_lot",
"pin-project-lite",
"signal-hook-registry",
"socket2",
"tokio-macros",
"windows-sys 0.48.0",
]
[[package]]
name = "tokio-macros"
version = "2.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5b8a1e28f2deaa14e508979454cb3a223b10b938b45af148bc0986de36f1923b"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "trait_"
version = "0.1.0"
@ -510,6 +811,12 @@ version = "0.1.0"
name = "visibility"
version = "0.1.0"
[[package]]
name = "wasi"
version = "0.11.0+wasi-snapshot-preview1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423"
[[package]]
name = "welcome_00"
version = "0.1.0"
@ -518,6 +825,145 @@ version = "0.1.0"
name = "while_"
version = "0.1.0"
[[package]]
name = "windows-sys"
version = "0.48.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "677d2418bec65e3338edb076e806bc1ec15693c5d0104683f2efe857f61056a9"
dependencies = [
"windows-targets 0.48.5",
]
[[package]]
name = "windows-sys"
version = "0.52.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "282be5f36a8ce781fad8c8ae18fa3f9beff57ec1b52cb3de0789201425d9a33d"
dependencies = [
"windows-targets 0.52.5",
]
[[package]]
name = "windows-targets"
version = "0.48.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9a2fa6e2155d7247be68c096456083145c183cbbbc2764150dda45a87197940c"
dependencies = [
"windows_aarch64_gnullvm 0.48.5",
"windows_aarch64_msvc 0.48.5",
"windows_i686_gnu 0.48.5",
"windows_i686_msvc 0.48.5",
"windows_x86_64_gnu 0.48.5",
"windows_x86_64_gnullvm 0.48.5",
"windows_x86_64_msvc 0.48.5",
]
[[package]]
name = "windows-targets"
version = "0.52.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6f0713a46559409d202e70e28227288446bf7841d3211583a4b53e3f6d96e7eb"
dependencies = [
"windows_aarch64_gnullvm 0.52.5",
"windows_aarch64_msvc 0.52.5",
"windows_i686_gnu 0.52.5",
"windows_i686_gnullvm",
"windows_i686_msvc 0.52.5",
"windows_x86_64_gnu 0.52.5",
"windows_x86_64_gnullvm 0.52.5",
"windows_x86_64_msvc 0.52.5",
]
[[package]]
name = "windows_aarch64_gnullvm"
version = "0.48.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2b38e32f0abccf9987a4e3079dfb67dcd799fb61361e53e2882c3cbaf0d905d8"
[[package]]
name = "windows_aarch64_gnullvm"
version = "0.52.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7088eed71e8b8dda258ecc8bac5fb1153c5cffaf2578fc8ff5d61e23578d3263"
[[package]]
name = "windows_aarch64_msvc"
version = "0.48.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dc35310971f3b2dbbf3f0690a219f40e2d9afcf64f9ab7cc1be722937c26b4bc"
[[package]]
name = "windows_aarch64_msvc"
version = "0.52.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9985fd1504e250c615ca5f281c3f7a6da76213ebd5ccc9561496568a2752afb6"
[[package]]
name = "windows_i686_gnu"
version = "0.48.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a75915e7def60c94dcef72200b9a8e58e5091744960da64ec734a6c6e9b3743e"
[[package]]
name = "windows_i686_gnu"
version = "0.52.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "88ba073cf16d5372720ec942a8ccbf61626074c6d4dd2e745299726ce8b89670"
[[package]]
name = "windows_i686_gnullvm"
version = "0.52.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "87f4261229030a858f36b459e748ae97545d6f1ec60e5e0d6a3d32e0dc232ee9"
[[package]]
name = "windows_i686_msvc"
version = "0.48.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8f55c233f70c4b27f66c523580f78f1004e8b5a8b659e05a4eb49d4166cca406"
[[package]]
name = "windows_i686_msvc"
version = "0.52.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "db3c2bf3d13d5b658be73463284eaf12830ac9a26a90c717b7f771dfe97487bf"
[[package]]
name = "windows_x86_64_gnu"
version = "0.48.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "53d40abd2583d23e4718fddf1ebec84dbff8381c07cae67ff7768bbf19c6718e"
[[package]]
name = "windows_x86_64_gnu"
version = "0.52.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4e4246f76bdeff09eb48875a0fd3e2af6aada79d409d33011886d3e1581517d9"
[[package]]
name = "windows_x86_64_gnullvm"
version = "0.48.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0b7b52767868a23d5bab768e390dc5f5c55825b6d30b86c844ff2dc7414044cc"
[[package]]
name = "windows_x86_64_gnullvm"
version = "0.52.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "852298e482cd67c356ddd9570386e2862b5673c85bd5f88df9ab6802b334c596"
[[package]]
name = "windows_x86_64_msvc"
version = "0.48.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ed94fce61571a4006852b7389a063ab983c02eb1bb37b47f8272ce92d06d9538"
[[package]]
name = "windows_x86_64_msvc"
version = "0.52.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bec47e5bfd1bff0eeaf6d8b485cc1074891a197ab4225d504cb7a1ab88b02bf0"
[[package]]
name = "without_channels"
version = "0.1.0"

View File

@ -23,7 +23,7 @@ to get started with the course.
- **Rust** (follow instructions [here](https://www.rust-lang.org/tools/install)).
If `rustup` is already installed on your system, run `rustup update` (or another appropriate command depending on how
you installed Rust on your system)
to make your running on the latest version.
to make sure you're running on the latest stable version.
- _(Optional but recommended)_ An IDE with Rust autocompletion support.
We recommend one of the following:
- [RustRover](https://www.jetbrains.com/rust/);

View File

@ -1,13 +1,10 @@
# Intro
One of Rust's big promises is *fearless concurrency*: making it easier to write safe, concurrent programs.
We haven't seen much of that yet. All the work we've done so far has been single-threaded:
instructions executed one after the other, with strict sequencing. Time to change that!
We haven't seen much of that yet. All the work we've done so far has been single-threaded.
Time to change that!
In this chapter we'll make our ticket store multithreaded.
We will start by allowing multiple users to interface with the same store at the same time. We'll then progress
to having multiple instances of the store running concurrently while sharing the same data.
We'll have the opportunity to touch most of Rust's core concurrency features, including:
- Threads, using the `std::thread` module

View File

@ -0,0 +1,11 @@
# Async Rust
Threads are not the only way to write concurrent programs in Rust.
In this chapter we'll explore another approach: **asynchronous programming**.
In particular, you'll get an introduction to:
- The `async`/`.await` keywords, to write asynchronous code effortlessly
- The `Future` trait, to represent computations that may not be complete yet
- `tokio`, the most popular runtime for running asynchronous code
- The cooperative nature of Rust asynchronous model, and how this affects your code

View File

@ -0,0 +1,144 @@
# Asynchronous functions
All the functions and methods you've written so far were eager.
Nothing happened until you invoked them. But once you did, they ran to
completion: they did **all** their work, and then returned their output.
Sometimes that's undesirable.
For example, if you're writing an HTTP server, there might be a lot of
**waiting**: waiting for the request body to arrive, waiting for the
database to respond, waiting for a downstream service to reply, etc.
What if you could do something else while you're waiting?
What if you could choose to give up midway through a computation?
What if you could choose to prioritise another task over the current one?
That's where **asynchronous functions** come in.
## `async fn`
You use the `async` keyword to define an asynchronous function:
```rust
use tokio::net::TcpListener;
// This function is asynchronous
async fn bind_random() -> TcpListener {
// [...]
}
```
What happens if you call `bind_random` as you would a regular function?
```rust
fn run() {
// Invoke `bind_random`
let listener = bind_random();
// Now what?
}
```
Nothing happens!
Rust doesn't start executing `bind_random` when you call it,
not even as a background task (as you might expect based on your experience
with other languages).
Asynchronous functions in Rust are **lazy**: they don't do any work until you
explicitly ask them to.
Using Rust's terminology, we say that `bind_random` returns a **future**, a type
that represents a computation that may complete later. They're called futures
because they implement the `Future` trait, an interface that we'll examine in
detail later on in this chapter.
## `.await`
The most common way to ask an asynchronous function to do some work is to use
the `.await` keyword:
```rust
use tokio::net::TcpListener;
async fn bind_random() -> TcpListener {
// [...]
}
async fn run() {
// Invoke `bind_random` and wait for it to complete
let listener = bind_random().await;
// Now `listener` is ready
}
```
`.await` doesn't return control to the caller until the asynchronous function
has run to completion—e.g. until the `TcpListener` has been created in the example above.
## Runtimes
If you're puzzled, you're right to be!
We've just said that the perk of asynchronous functions
is that they don't do **all** their work at once. We then introduced `.await`, which
doesn't return until the asynchronous function has run to completion. Haven't we
just re-introduced the problem we were trying to solve? What's the point?
Not quite! A lot happens behind the scenes when you call `.await`!
You're yielding control to an **async runtime**, also known as an **async executor**.
Executors are where the magic happens: they are in charge of managing all your
ongoing asynchronous **tasks**. In particular, they balance two different goals:
- **Progress**: they make sure that tasks make progress whenever they can.
- **Efficiency**: if a task is waiting for something, they try to make sure that
another task can run in the meantime, fully utilising the available resources.
### No default runtime
Rust is fairly unique in its approach to asynchronous programing: there is
no default runtime. The standard library doesn't ship with one. You need to
bring your own!
In most cases, you'll choose one of the options available in the ecosystem.
Some runtimes are designed to be broadly applicable, a solid option for most applications.
`tokio` and `async-std` belong to this category. Other runtimes are optimised for
specific use cases—e.g. `embassy` for embedded systems.
Throughout this course we'll rely on `tokio`, the most popular runtime for general-purpose
asynchronous programming in Rust.
### `#[tokio::main]`
The entrypoint of your executable, the `main` function, must be a synchronous function.
That's where you're supposed to set up and launch your chosen async runtime.
Most runtimes provides a macro to make this easier. For `tokio`, it's `tokio::main`:
```rust
#[tokio::main]
async fn main() {
// Your async code goes here
}
```
which expands to:
```rust
fn main() {
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(
// Your async function goes here
// [...]
);
}
```
### `#[tokio::test]`
The same goes for tests: they must be synchronous functions.
Each test function is run in its own thread, and you're responsible for
setting up and launching an async runtime if you need to run async code
in your tests.
`tokio` provides a `#[tokio::test]` macro to make this easier:
```rust
#[tokio::test]
async fn my_test() {
// Your async test code goes here
}
```

View File

@ -0,0 +1,94 @@
# Spawning tasks
Your solution to the previous exercise should look something like this:
```rust
pub async fn echo(listener: TcpListener) -> Result<(), anyhow::Error> {
loop {
let (mut socket, _) = listener.accept().await?;
let (mut reader, mut writer) = socket.split();
tokio::io::copy(&mut reader, &mut writer).await?;
}
}
```
This is not bad!
If a long time passes between two incoming connections, the `echo` function will be idle
(since `TcpListener::accept` is an asynchronous function), thus allowing the executor
to run other tasks in the meantime.
But how can we actually have multiple tasks running concurrently?
If we always run our asynchronous functions until completion (by using `.await`), we'll never
have more than one task running at a time.
This is where the `tokio::spawn` function comes in.
## `tokio::spawn`
`tokio::spawn` allows you to hand off a task to the executor, **without waiting for it to complete**.
Whenever you invoke `tokio::spawn`, you're telling `tokio` to continue running
the spawned task, in the background, **concurrently** with the task that spawned it.
Here's how you can use it to process multiple connections concurrently:
```rust
use tokio::net::TcpListener;
pub async fn echo(listener: TcpListener) -> Result<(), anyhow::Error> {
loop {
let (mut socket, _) = listener.accept().await?;
// Spawn a background task to handle the connection
// thus allowing the main task to immediately start
// accepting new connections
tokio::spawn(async move {
let (mut reader, mut writer) = socket.split();
tokio::io::copy(&mut reader, &mut writer).await?;
});
}
}
```
### Asynchronous blocks
In this example, we've passed an **asynchronous block** to `tokio::spawn`: `async move { /* */ }`
Asynchronous blocks are a quick way to mark a region of code as asynchronous without having
to define a separate async function.
### `JoinHandle`
`tokio::spawn` returns a `JoinHandle`.
You can use `JoinHandle` to `.await` the background task, in the same way
we used `join` for spawned threads.
```rust
pub async fn run() {
// Spawn a background task to ship telemetry data
// to a remote server
let handle = tokio::spawn(emit_telemetry());
// In the meantime, do some other useful work
do_work().await;
// But don't return to the caller until
// the telemetry data has been successfully delivered
handle.await;
}
pub async fn emit_telemetry() {
// [...]
}
pub async fn do_work() {
// [...]
}
```
### `std::thread::spawn` vs `tokio::spawn`
You can think of `tokio::spawn` as the asynchronous sibling of `std::spawn::thread`.
Notice a key difference: with `std::thread::spawn`, you're delegating control to the OS scheduler.
You're not in control of how threads are scheduled.
With `tokio::spawn`, you're delegating to an async executor that runs entirely in
user space. The underlying OS scheduler is not involved in the decision of which task
to run next. We're in charge of that decision now, via the executor we chose to use.

View File

@ -0,0 +1,88 @@
# Runtime architecture
So far we've been talking about async runtimes as an abstract concept.
Let's dig a bit deeper into the way they are implemented—as you'll see soon enough,
it has an impact on our code.
## Flavors
`tokio` ships two different runtime _flavors_.
You can configure your runtime via `tokio::runtime::Builder`:
- `Builder::new_multi_thread` gives you a **multithreaded `tokio` runtime**
- `Builder::new_current_thread` will instead rely on the **current thread** for execution.
`#[tokio::main]` returns a multithreaded runtime by default, while
`#[tokio::test]` uses a current thread runtime out of the box.
### Current thread runtime
The current-thread runtime, as the name implies, relies exclusively on the OS thread
it was launched on to schedule and execute tasks.
When using the current-thread runtime, you have **concurrency** but no **parallelism**:
asynchronous tasks will be interleaved, but there will always be at most one task running
at any given time.
### Multithreaded runtime
When using the multithreaded runtime, instead, there can up to `N` tasks running
_in parallel_ at any given time, where `N` is the number of threads used by the
runtime. By default, `N` matches the number of available CPU cores.
There's more: `tokio` performs **work-stealing**.
If a thread is idle, it won't wait around: it'll try to find a new task that's ready for
execution, either from a global queue or by stealing it from the local queue of another
thread.
Work-stealing can have significant performance benefits, especially on tail latencies,
whenever your application is dealing with workloads that are not perfectly balanced
across threads.
## Implications
`tokio::spawn` is flavor-agnostic: it'll work no matter if you're running on the multithreaded
or current-thread runtime. The downside is that the signature assume the worst case
(i.e. multithreaded) and is constrained accordingly:
```rust
pub fn spawn<F>(future: F) -> JoinHandle<F::Output>
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{ /* */ }
```
Let's ignore the `Future` trait for now to focus on the rest.
`spawn` is asking all its inputs to be `Send` and have a `'static` lifetime.
The `'static` constraint follows the same rationale of the `'static` constraint
on `std::thread::spawn`: the spawned task may outlive the context it was spawned
from, therefore it shouldn't depend on any local data that may be de-allocated
after the spawning context is destroyed.
```rust
fn spawner() {
let v = vec![1, 2, 3];
// This won't work, since `&v` doesn't
// live long enough.
tokio::spawn(async {
for x in &v {
println!("{x}")
}
})
}
```
`Send`, on the other hand, is a direct consequence of `tokio`'s work-stealing strategy:
a task that was spawned on thread `A` may end up being moved to thread `B` if that's idle,
thus requiring a `Send` bound since we're crossing thread boundaries.
```rust
fn spawner(input: Rc<u64>) {
// This won't work either, because
// `Rc` isn't `Send`.
tokio::spawn(async move {
println!("{}", input);
})
}
```

View File

@ -0,0 +1,165 @@
# The `Future` trait
## The local `Rc` problem
Let's go back to `tokio::spawn`'s signature:
```rust
pub fn spawn<F>(future: F) -> JoinHandle<F::Output>
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{ /* */ }
```
What does it _actually_ mean for `F` to be `Send`?
It implies, as we saw in the previous section, that whatever value it captures from the
spawning environment has to be `Send`. But it goes further than that.
Any value that's _held across a .await point_ has to be `Send`.
Let's look at an example:
```rust
use std::rc::Rc;
use tokio::task::yield_now;
fn spawner() {
tokio::spawn(example());
}
async fn example() {
// A value that's not `Send`,
// created _inside_ the async function
let non_send = Rc::new(1);
// A `.await` point that does nothing
yield_now().await;
// The local non-`Send` value is still needed
// after the `.await`
println!("{}", non_send);
}
```
The compiler will reject this code:
```text
error: future cannot be sent between threads safely
|
5 | tokio::spawn(example());
| ^^^^^^^^^ future returned by `example` is not `Send`
|
note: future is not `Send` as this value is used across an await
|
11 | let non_send = Rc::new(1);
| -------- has type `Rc<i32>` which is not `Send`
12 | // A `.await` point
13 | yield_now().await;
| ^^^^^ await occurs here, with `non_send` maybe used later
note: required by a bound in `tokio::spawn`
|
164 | pub fn spawn<F>(future: F) -> JoinHandle<F::Output>
| ----- required by a bound in this function
165 | where
166 | F: Future + Send + 'static,
| ^^^^ required by this bound in `spawn`
```
To understand why that's the case, we need to refine our understanding of
Rust's asynchronous model.
## The `Future` trait
We stated early on that `async` functions return **futures**, types that implement
the `Future` trait. You can think of a future as a **state machine**.
It's in one of two states:
- **pending**: the computation has not finished yet.
- **ready**: the computation has finished, here's the output.
This is encoded in the trait definition:
```rust
trait Future {
type Output;
// Ignore `Pin` and `Context` for now
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}
```
### `poll`
The `poll` method is the heart of the `Future` trait.
A future on its own doesn't do anything. It needs to be **polled** to make progress.
When you call `poll`, you're asking the future to do some work.
`poll` tries to make progress, and then returns one of the following:
- `Poll::Pending`: the future is not ready yet. You need to call `poll` again later.
- `Poll::Ready(value)`: the future has finished. `value` is the result of the computation,
of type `Self::Output`.
Once `Future::poll` returns `Poll::Ready`, it should not be polled again: the future has
completed, there's nothing left to do.
### The role of the runtime
You'll rarely, if ever, be calling poll directly.
That's the job of your async runtime: it has all the required information (the `Context`
in `poll`'s signature) to ensure that your futures are making progress whenever they can.
## `async fn` and futures
We've worked with the high-level interface, asynchronous functions.
We've now looked at the low-level primitive, the `Future trait`.
How are they related?
Every time you mark a function as asynchronous, that function will return a future.
The compiler will transform the body of your asynchronous function into a **state machine**:
one state for each `.await` point.
Going back to our `Rc` example:
```rust
use std::rc::Rc;
use tokio::task::yield_now;
async fn example() {
let non_send = Rc::new(1);
yield_now().await;
println!("{}", non_send);
}
```
The compiler would transform it into an enum that looks somewhat like this:
```rust
pub enum ExampleFuture {
NotStarted,
YieldNow(Rc<i32>),
Terminated,
}
```
When `example` is called, it returns `ExampleFuture::NotStarted`. The future has never
been polled yet, so nothing has happened.
When the runtime polls it the first time, `ExampleFuture` will advance until the next
`.await` point: it'll stop at the `ExampleFuture::YieldNow(Rc<i32>)` stage of the state
machine, returning `Poll::Pending`.
When it's polled again, it'll execute the remaining code (`println!`) and
return `Poll::Ready(())`.
When you look at its state machine representation, `ExampleFuture`,
it is now clear why `example` is not `Send`: it holds an `Rc`, therefore
it cannot be `Send`.
## Yield points
As you've just seen with `example`, every `.await` point creates a new intermediate
state in the lifecycle of a future.
That's why `.await` points are also known as **yield points**: your future _yields control_
back to the runtime that was polling it, allowing the runtime to pause it and (if necessary)
schedule another task for execution, thus making progress on multiple fronts concurrently.
We'll come back to the importance of yielding in a later section.

View File

@ -0,0 +1,79 @@
# Don't block the runtime
Let's circle back to yield points.
Unlike threads, **Rust tasks cannot be preempted**.
`tokio` cannot, on its own, decide to pause a task and run another one in its place.
The control goes back to the executor **exclusively** when the task yields—i.e.
when `Future::poll` returns `Poll::Pending` or, in the case of `async fn`, when
you `.await` a future.
This exposes the runtime to a risk: if a task never yields, the runtime will never
be able to run another task. This is called **blocking the runtime**.
## What is blocking?
How long is too long? How much time can a task spend without yielding before it
becomes a problem?
It depends on the runtime, the application, the number of in-flight tasks, and
many other factors. But, as a general rule of thumb, try to spend less than 100
microseconds between yield points.
## Consequences
Blocking the runtime can lead to:
- **Deadlocks**: if the task that's not yielding is waiting for another task to
complete, and that task is waiting for the first one to yield, you have a deadlock.
No progress can be made, unless the runtime is able to schedule the other task on
a different thread.
- **Starvation**: other tasks might not be able to run, or might run after a long
delay, which can lead to poor performances (e.g. high tail latencies).
## Blocking is not always obvious
Some types of operations should generally be avoided in async code, like:
- Synchronous I/O. You can't predict how long it will take, and it's likely to be
longer than 100 microseconds.
- Expensive CPU-bound computations.
The latter category is not always obvious though. For example, sorting a vector with
a few elements is not a problem; that evaluation changes if the vector has billions
of entries.
## How to avoid blocking
OK, so how do you avoid blocking the runtime assuming you _must_ perform an operation
that qualifies or risks qualifying as blocking?
You need to move the work to a different thread. You don't want to use the so-called
runtime threads, the ones used by `tokio` to run tasks.
`tokio` provides a dedicated threadpool for this purpose, called the **blocking pool**.
You can spawn a synchronous operation on the blocking pool using the
`tokio::task::spawn_blocking` function. `spawn_blocking` returns a future that resolves
to the result of the operation when it completes.
```rust
use tokio::task;
fn expensive_computation() -> u64 {
// [...]
}
async fn run() {
let handle = task::spawn_blocking(expensive_computation);
// Do other stuff in the meantime
let result = handle.await.unwrap();
}
```
The blocking pool is long-lived. `spawn_blocking` should be faster
than creating a new thread directly via `std::thread::spawn`
because the cost of thread initialization is amortized over multiple calls.
## Further reading
- Check out [Alice Ryhl's blog post](https://ryhl.io/blog/async-what-is-blocking/)
on the topic.

View File

@ -0,0 +1,129 @@
# Async-aware primitives
If you browse `tokio`'s documentation, you'll notice that it provides a lot of types
that "mirror" the ones in the standard library, but with an asynchronous twist:
locks, channels, timers, and more.
When working in an asynchronous context, you should prefer these asynchronous alternatives
to their synchronous counterparts.
To understand why, let's take a look at `Mutex`, the mutually exclusive lock we explored
in the previous chapter.
## Case study: `Mutex`
Let's look at a simple example:
```rust
use std::sync::{Arc, Mutex};
async fn run(m: Arc<Mutex<Vec<u64>>>) {
let guard = m.lock().unwrap();
http_call(&guard).await;
println!("Sent {:?} to the server", &guard);
// `guard` is dropped here
}
/// Use `v` as the body of an HTTP call.
async fn http_call(v: &[u64]) {
// [...]
}
```
### `std::sync::MutexGuard` and yield points
This code will compile, but it's dangerous.
We try to acquire a lock over a `Mutex` from `std` in an asynchronous context.
We then hold on to the resulting `MutexGuard` across a yield point (the `.await` on
`http_call`).
Let's imagine that there are two tasks executing `run`, concurrently, on a single-threaded
runtime. We observe the following sequence of scheduling events:
```text
Task A Task B
|
Acquire lock
Yields to runtime
|
+--------------+
|
Tries to acquire lock
```
We have a deadlock. Task B we'll never manage to acquire the lock, because the lock
is currently held by task A, which has yielded to the runtime before releasing the
lock and won't be scheduled again because the runtime cannot preempt task B.
### `tokio::sync::Mutex`
You can solve the issue by switching to `tokio::sync::Mutex`:
```rust
use std::sync::Arc;
use tokio::sync::Mutex;
async fn run(m: Arc<Mutex<Vec<u64>>>) {
let guard = m.lock().await;
http_call(&guard).await;
println!("Sent {:?} to the server", &guard);
// `guard` is dropped here
}
```
Acquiring the lock is now an asynchronous operation, which yields back to the runtime
if it can't make progress.
Going back to the previous scenario, the following would happen:
```text
Task A Task B
|
Acquires the lock
Starts `http_call`
Yields to runtime
|
+--------------+
|
Tries to acquire the lock
Cannot acquire the lock
Yields to runtime
|
+--------------+
|
`http_call` completes
Releases the lock
Yield to runtime
|
+--------------+
|
Acquires the lock
[...]
```
All good!
### Multithreaded won't save you
We've used a single-threaded runtime as the execution context in our
previous example, but the same risk persists even when using a multithreaded
runtime.
The only difference is in the number of concurrent tasks required to create the deadlock:
in a single-threaded runtime, 2 are enough; in a multithreaded runtime, we
would need `N+1` tasks, where `N` is the number of runtime threads.
### Downsides
Having an async-aware `Mutex` comes with a performance penalty.
If you're confident that the lock isn't under significant contention
_and_ you're careful to never hold it across a yield point, you can
still use `std::sync::Mutex` in an asynchronous context.
But weigh the performance benefit against the liveness risk you
will incur.
## Other primitives
We used `Mutex` as an example, but the same applies to `RwLock`, semaphores, etc.
Prefer async-aware versions when working in an asynchronous context to minimise
the risk of issues.

View File

@ -0,0 +1,93 @@
# Cancellation
What happens when a pending future is dropped?
The runtime will no longer poll it, therefore it won't make any further progress.
In other words, its execution has been **cancelled**.
In the wild, this often happens when working with timeouts.
For example:
```rust
use tokio::time::timeout;
use tokio::sync::oneshot;
use std::time::Duration;
async fn http_call() {
// [...]
}
async fn run() {
// Wrap the future with a `Timeout` set to expire in 10 milliseconds.
let duration = Duration::from_millis(10);
if let Err(_) = timeout(duration, http_call()).await {
println!("Didn't receive a value within 10 ms");
}
}
```
When the timeout expires, the future returned by `http_call` will be cancelled.
Let's imagine that this is `http_call`'s body:
```rust
use std::net::TcpStream;
async fn http_call() {
let (stream, _) = TcpStream::connect(/* */).await.unwrap();
let request: Vec<u8> = /* */;
stream.write_all(&request).await.unwrap();
}
```
Each yield point becomes a **cancellation point**.
`http_call` can't be preempted by the runtime, so it can only be discarded after
it has yielded control back to the executor via `.await`.
This applies recursively—e.g. `stream.write_all(&request)` is likely to have multiple
yield points in its implementation. It is perfectly possible to see `http_call` pushing
a _partial_ request before being cancelled, thus dropping the connection and never
finishing transmitting the body.
## Clean up
Rust's cancellation mechanism is quite powerful—it allows the caller to cancel an ongoing task
without needing any form of cooperation from the task itself.
At the same time, this can be quite dangerous. It may be desirable to perform a
**graceful cancellation**, to ensure that some clean-up tasks are performed
before aborting the operation.
For example, consider this fictional API for a SQL transaction:
```rust
async fn transfer_money(
connection: SqlConnection,
payer_id: u64,
payee_id: u64,
amount: u64
) -> Result<(), anyhow::Error> {
let transaction = connection.begin_transaction().await?;
update_balance(payer_id, amount, &transaction).await?;
decrease_balance(payee_id, amount, &transaction).await?;
transaction.commit().await?;
}
```
On cancellation, it'd be ideal to explicitly abort the pending transaction rather
than leaving it hanging.
Rust, unfortunately, doesn't provide a bullet-proof mechanism for this kind of
**asynchronous** clean up operations.
The most common strategy is to rely on the `Drop` trait to schedule the required
clean-up work. This can be by:
- Spawning a new task on the runtime
- Enqueueing a message on a channel
- Spawning a background thread
The optimal choice is contextual.
## Further reading
- Be extremely careful when using `tokio`'s `select!` macro to "race" two different futures.
Retrying the same task in a loop is dangerous unless you can ensure **cancellation safety**.
Check out [`select!`'s documentation](https://docs.rs/tokio/macro.select.html) for more details.
- Rather than "abrupt" cancellation, it can be preferable to rely
on [`CancellationToken`](https://docs.rs/tokio-util/latest/tokio_util/sync/struct.CancellationToken.html).

View File

@ -0,0 +1,34 @@
# Outro
Rust's asynchronous model is quite powerful, but it does introduce additional
complexity. Take time to know your tools: dive deep into `tokio`'s documentation
and get familiar with its primitives to make the most out of it.
Keep in mind, as well, that there is ongoing work at the language and `std` level
to streamline and "complete" Rust's asynchronous story. You may experience some
rough edges in your day-to-day work due to some of these missing pieces.
A few recommendations for a mostly-pain-free async experience:
- **Pick a runtime and stick to it.**
Some primitives (e.g. timers, I/O) are not portable across runtimes. Trying to
mix runtimes is likely to cause you pain. Trying to write code that's runtime
agnostic can significantly increase the complexity of your codebase. Avoid it
if you can.
- **There is no stable `Stream`/`AsyncIterator` interface yet.**
An `AsyncIterator` is, conceptually, an iterator that yields new items
asynchronously. There is ongoing design work, but no consensus (yet).
If you're using `tokio`, refer to [`tokio_stream`](https://docs.rs/tokio-stream/latest/tokio_stream/)
as your go-to interface.
- **Be careful with buffering.**
It is often the cause of subtle bugs. Check out
["Barbara battles buffered streams"](https://rust-lang.github.io/wg-async/vision/submitted_stories/status_quo/barbara_battles_buffered_streams.html)
for more details.
- **There is no equivalent of scoped threads for asynchronous tasks**.
Check out ["The scoped task trilemma"](https://without.boats/blog/the-scoped-task-trilemma/)
for more details.
Don't let these caveats scare you: asynchronous Rust is being used effectively
at _massive_ scale (e.g. AWS, Meta) to power foundational services.
You will have to master it if you're planning building networked applications
in Rust.

View File

@ -94,3 +94,15 @@
- [`RwLock`](07_threads/12_rw_lock.md)
- [Without channels](07_threads/13_without_channels.md)
- [`Sync` trait](07_threads/14_sync.md)
- [Futures](08_futures/00_intro.md)
- [Asynchronous functions](08_futures/01_async_fn.md)
- [Spawning tasks](08_futures/02_spawn.md)
- [Runtime](08_futures/03_runtime.md)
- [Future trait](08_futures/04_future.md)
- [Blocking the runtime](08_futures/05_blocking.md)
- [Async-aware primitives](08_futures/06_async_aware_primitives.md)
- [Cancellation](08_futures/07_cancellation.md)
- [Outro](08_futures/08_outro.md)
* [Going further](going_further.md)

54
book/src/going_further.md Normal file
View File

@ -0,0 +1,54 @@
# Epilogue
Our tour of Rust ends here.
It has been quite extensive, but by no means exhaustive: Rust is a language with
a large surface area, and an even larger ecosystem!
Don't let this scare you, though: there's **no need to learn everything**.
You'll pick up whatever is necessary to be effective in the domain
(backend, embedded, CLIs, GUIs, etc.) **while working on your projects**.
In the end, there are no shortcuts: if you want to get good at something,
you need to do it, over and over again. Throughout this course you wrote a fair
amount of Rust, enough to get the language and its syntax flowing under your
fingers. It'll take many more lines of code to feel it "yours", but that moment
will come without a doubt if you keep practicing.
## Going further
Let's close with some pointers to additional resources that you might find
useful as you move forward in your journey with Rust.
### Exercises
You can find more exercises to practice Rust in the [`rustlings`](https://github.com/rust-lang/rustlings)
project and on [exercism.io](https://exercism.io)'s Rust track.
### Introductory material
Check out [the Rust book](https://doc.rust-lang.org/book/title-page.html) and
["Programming Rust"](https://www.oreilly.com/library/view/programming-rust-2nd/9781492052586/)
if you're looking for a different perspective on the same concepts we covered throughout this course.
The material doesn't overlap perfectly, therefore you'll certainly learn something new along the
way.
### Advanced material
If you want to dive deeper into the language, refer to the [Rustonomicon](https://doc.rust-lang.org/nomicon/)
and ["Rust for Rustaceans"](https://nostarch.com/rust-rustaceans).
The ["Decrusted" series](https://www.youtube.com/playlist?list=PLqbS7AVVErFirH9armw8yXlE6dacF-A6z) is another excellent
resource to learn more about the internals of many of the most popular Rust libraries.
### Domain-specific material
If you want to use Rust for backend development,
check out ["Zero to Production in Rust"](https://zero2prod.com).
If you want to use Rust for embedded development,
check out the [Embedded Rust book](https://docs.rust-embedded.org/book/).
### Masterclasses
You can then find resources on key topics that cut across domains.
For testing, check out
["Advanced testing, going beyond the basics"](https://github.com/mainmatter/rust-advanced-testing-workshop).
For telemetry, check out ["You can't fix what you can't see"](https://github.com/mainmatter/rust-telemetry-workshop).

View File

@ -1,4 +1,4 @@
[package]
name = "intro_06"
name = "intro_07"
version = "0.1.0"
edition = "2021"

View File

@ -0,0 +1,4 @@
[package]
name = "intro_08"
version = "0.1.0"
edition = "2021"

View File

@ -0,0 +1,14 @@
fn intro() -> &'static str {
// TODO: fix me 👇
"I'm ready to _!"
}
#[cfg(test)]
mod tests {
use crate::intro;
#[test]
fn test_intro() {
assert_eq!(intro(), "I'm ready to learn about futures!");
}
}

View File

@ -0,0 +1,8 @@
[package]
name = "async_fn"
version = "0.1.0"
edition = "2021"
[dependencies]
tokio = { version = "1", features = ["full"] }
anyhow = "1.0.83"

View File

@ -0,0 +1,45 @@
use tokio::net::TcpListener;
// TODO: write an echo server that accepts incoming TCP connections and
// echoes the received data back to the client.
// `echo` should not return when it finishes processing a connection, but should
// continue to accept new connections.
//
// Hint: you should rely on `tokio`'s structs and methods to implement the echo server.
// In particular:
// - `tokio::net::TcpListener::accept` to process the next incoming connection
// - `tokio::net::TcpStream::split` to obtain a reader and a writer from the socket
// - `tokio::io::copy` to copy data from the reader to the writer
pub async fn echo(listener: TcpListener) -> Result<(), anyhow::Error> {
todo!()
}
#[cfg(test)]
mod tests {
use super::*;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
#[tokio::test]
async fn test_echo() {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
tokio::spawn(echo(listener));
let requests = vec!["hello", "world", "foo", "bar"];
for request in requests {
let mut socket = tokio::net::TcpStream::connect(addr).await.unwrap();
let (mut reader, mut writer) = socket.split();
// Send the request
writer.write_all(request.as_bytes()).await.unwrap();
// Close the write side of the socket
writer.shutdown().await.unwrap();
// Read the response
let mut buf = Vec::with_capacity(request.len());
reader.read_to_end(&mut buf).await.unwrap();
assert_eq!(&buf, request.as_bytes());
}
}
}

View File

@ -0,0 +1,8 @@
[package]
name = "spawn"
version = "0.1.0"
edition = "2021"
[dependencies]
tokio = { version = "1", features = ["full"] }
anyhow = "1.0.83"

View File

@ -0,0 +1,60 @@
use tokio::net::TcpListener;
// TODO: write an echo server that accepts TCP connections on two listeners, concurrently.
// Multiple connections (on the same listeners) should be processed concurrently.
// The received data should be echoed back to the client.
pub async fn echoes(first: TcpListener, second: TcpListener) -> Result<(), anyhow::Error> {
todo!()
}
#[cfg(test)]
mod tests {
use super::*;
use std::net::SocketAddr;
use std::panic;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::task::JoinSet;
async fn bind_random() -> (TcpListener, SocketAddr) {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
(listener, addr)
}
#[tokio::test]
async fn test_echo() {
let (first_listener, first_addr) = bind_random().await;
let (second_listener, second_addr) = bind_random().await;
tokio::spawn(echoes(first_listener, second_listener));
let requests = vec!["hello", "world", "foo", "bar"];
let mut join_set = JoinSet::new();
for request in requests.clone() {
for addr in [first_addr, second_addr] {
join_set.spawn(async move {
let mut socket = tokio::net::TcpStream::connect(addr).await.unwrap();
let (mut reader, mut writer) = socket.split();
// Send the request
writer.write_all(request.as_bytes()).await.unwrap();
// Close the write side of the socket
writer.shutdown().await.unwrap();
// Read the response
let mut buf = Vec::with_capacity(request.len());
reader.read_to_end(&mut buf).await.unwrap();
assert_eq!(&buf, request.as_bytes());
});
}
}
while let Some(outcome) = join_set.join_next().await {
if let Err(e) = outcome {
if let Ok(reason) = e.try_into_panic() {
panic::resume_unwind(reason);
}
}
}
}
}

View File

@ -0,0 +1,8 @@
[package]
name = "runtime"
version = "0.1.0"
edition = "2021"
[dependencies]
tokio = { version = "1", features = ["full"] }
anyhow = "1.0.83"

View File

@ -0,0 +1,61 @@
// TODO: Implement the `fixed_reply` function. It should accept two `TcpListener` instances,
// accept connections on both of them concurrently, and always reply clients by sending
// the `Display` representation of the `reply` argument as a response.
use std::fmt::Display;
use tokio::io::AsyncWriteExt;
use tokio::net::TcpListener;
pub async fn fixed_reply<T>(first: TcpListener, second: TcpListener, reply: T)
where
// `T` cannot be cloned. How do you share it between the two server tasks?
T: Display + Send + Sync + 'static,
{
todo!()
}
#[cfg(test)]
mod tests {
use super::*;
use std::net::SocketAddr;
use std::panic;
use tokio::io::AsyncReadExt;
use tokio::task::JoinSet;
async fn bind_random() -> (TcpListener, SocketAddr) {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
(listener, addr)
}
#[tokio::test]
async fn test_echo() {
let (first_listener, first_addr) = bind_random().await;
let (second_listener, second_addr) = bind_random().await;
let reply = "Yo";
tokio::spawn(fixed_reply(first_listener, second_listener, reply));
let mut join_set = JoinSet::new();
for _ in 0..3 {
for addr in [first_addr, second_addr] {
join_set.spawn(async move {
let mut socket = tokio::net::TcpStream::connect(addr).await.unwrap();
let (mut reader, _) = socket.split();
// Read the response
let mut buf = Vec::new();
reader.read_to_end(&mut buf).await.unwrap();
assert_eq!(&buf, reply.as_bytes());
});
}
}
while let Some(outcome) = join_set.join_next().await {
if let Err(e) = outcome {
if let Ok(reason) = e.try_into_panic() {
panic::resume_unwind(reason);
}
}
}
}
}

View File

@ -0,0 +1,7 @@
[package]
name = "future"
version = "0.1.0"
edition = "2021"
[dependencies]
tokio = { version = "1", features = ["full"] }

View File

@ -0,0 +1,15 @@
//! TODO: get the code to compile by **re-ordering** the statements
//! in the `example` function. You're not allowed to change the
//! `spawner` function nor what each line does in `example`.
use std::rc::Rc;
use tokio::task::yield_now;
fn spawner() {
tokio::spawn(example());
}
async fn example() {
let non_send = Rc::new(1);
yield_now().await;
println!("{}", non_send);
}

View File

@ -0,0 +1,8 @@
[package]
name = "blocking"
version = "0.1.0"
edition = "2021"
[dependencies]
tokio = { version = "1", features = ["full"] }
anyhow = "1.0.83"

View File

@ -0,0 +1,70 @@
// TODO: the `echo` server uses non-async primitives.
// When running the tests, you should observe that it hangs, due to a
// deadlock between the caller and the server.
// Use `spawn_blocking` inside `echo` to resolve the issue.
use std::io::{Read, Write};
use tokio::net::TcpListener;
pub async fn echo(listener: TcpListener) -> Result<(), anyhow::Error> {
loop {
let (socket, _) = listener.accept().await?;
let mut socket = socket.into_std()?;
let mut buffer = Vec::new();
while let Ok(_) = socket.read(&mut buffer) {}
socket.write_all(&buffer)?;
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::net::SocketAddr;
use std::panic;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::task::JoinSet;
async fn bind_random() -> (TcpListener, SocketAddr) {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
(listener, addr)
}
#[tokio::test]
async fn test_echo() {
let (listener, addr) = bind_random().await;
tokio::spawn(echo(listener));
let requests = vec![
"hello here we go with a long message",
"world",
"foo",
"bar",
];
let mut join_set = JoinSet::new();
for request in requests {
join_set.spawn(async move {
let mut socket = tokio::net::TcpStream::connect(addr).await.unwrap();
let (mut reader, mut writer) = socket.split();
// Send the request
writer.write_all(request.as_bytes()).await.unwrap();
// Close the write side of the socket
writer.shutdown().await.unwrap();
// Read the response
let mut buf = Vec::with_capacity(request.len());
reader.read_to_end(&mut buf).await.unwrap();
assert_eq!(&buf, request.as_bytes());
});
}
while let Some(outcome) = join_set.join_next().await {
if let Err(e) = outcome {
if let Ok(reason) = e.try_into_panic() {
panic::resume_unwind(reason);
}
}
}
}
}

View File

@ -0,0 +1,7 @@
[package]
name = "async_locks"
version = "0.1.0"
edition = "2021"
[dependencies]
tokio = { version = "1", features = ["full"] }

View File

@ -0,0 +1,53 @@
/// TODO: the code below will deadlock because it's using std's channels,
/// which are not async-aware.
/// Rewrite it to use `tokio`'s channels primitive (you'll have to touch
/// the testing code too, yes).
///
/// Can you understand the sequence of events that can lead to a deadlock?
use std::sync::mpsc;
pub struct Message {
payload: String,
response_channel: mpsc::Sender<Message>,
}
/// Replies with `pong` to any message it receives, setting up a new
/// channel to continue communicating with the caller.
pub async fn pong(mut receiver: mpsc::Receiver<Message>) {
loop {
if let Ok(msg) = receiver.recv() {
println!("Pong received: {}", msg.payload);
let (sender, new_receiver) = mpsc::channel();
msg.response_channel
.send(Message {
payload: "pong".into(),
response_channel: sender,
})
.unwrap();
receiver = new_receiver;
}
}
}
#[cfg(test)]
mod tests {
use crate::{pong, Message};
use std::sync::mpsc;
#[tokio::test]
async fn ping() {
let (sender, receiver) = mpsc::channel();
let (response_sender, response_receiver) = mpsc::channel();
sender
.send(Message {
payload: "pong".into(),
response_channel: response_sender,
})
.unwrap();
tokio::spawn(pong(receiver));
let answer = response_receiver.recv().unwrap().payload;
assert_eq!(answer, "pong");
}
}

View File

@ -0,0 +1,7 @@
[package]
name = "cancellation"
version = "0.1.0"
edition = "2021"
[dependencies]
tokio = { version = "1", features = ["full"] }

View File

@ -0,0 +1,51 @@
// TODO: fix the `assert_eq` at the end of the tests.
// Do you understand why that's the resulting output?
use std::time::Duration;
use tokio::io::AsyncReadExt;
use tokio::net::TcpListener;
pub async fn run(listener: TcpListener, n_messages: usize, timeout: Duration) -> Vec<u8> {
let mut buffer = Vec::new();
for _ in 0..n_messages {
let (mut stream, _) = listener.accept().await.unwrap();
let _ = tokio::time::timeout(timeout, async {
stream.read_to_end(&mut buffer).await.unwrap();
})
.await;
}
buffer
}
#[cfg(test)]
mod tests {
use super::*;
use tokio::io::AsyncWriteExt;
#[tokio::test]
async fn ping() {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
let messages = vec!["hello", "from", "this", "task"];
let timeout = Duration::from_millis(20);
let handle = tokio::spawn(run(listener, messages.len(), timeout.clone()));
for message in messages {
let mut socket = tokio::net::TcpStream::connect(addr).await.unwrap();
let (_, mut writer) = socket.split();
let (beginning, end) = message.split_at(message.len() / 2);
// Send first half
writer.write_all(beginning.as_bytes()).await.unwrap();
tokio::time::sleep(timeout * 2).await;
writer.write_all(end.as_bytes()).await.unwrap();
// Close the write side of the socket
writer.shutdown().await.unwrap();
}
let buffered = handle.await.unwrap();
let buffered = std::str::from_utf8(&buffered).unwrap();
assert_eq!(buffered, "");
}
}

View File

@ -0,0 +1,7 @@
[package]
name = "cancellation"
version = "0.1.0"
edition = "2021"
[dependencies]
tokio = { version = "1", features = ["full"] }

View File

@ -0,0 +1,10 @@
// This is our last exercise. Let's go down a more unstructured path!
// Try writing an **asynchronous REST API** to expose the functionality
// of the ticket management system we built throughout the course.
// It should expose endpoints to:
// - Create a ticket
// - Retrieve ticket details
// - Patch a ticket
//
// Use Rust's package registry, crates.io, to find the dependencies you need
// (if any) to build this system.