diff --git a/Cargo.lock b/Cargo.lock index 4f1e698..19f5a24 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,6 +2,27 @@ # It is not intended for manual editing. version = 3 +[[package]] +name = "addr2line" +version = "0.21.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a30b2e23b9e17a9f90641c7ab1549cd9b44f296d3ccbf309d2863cfe398a0cb" +dependencies = [ + "gimli", +] + +[[package]] +name = "adler" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" + +[[package]] +name = "anyhow" +version = "1.0.83" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "25bdb32cbbdce2b519a9cd7df3a678443100e265d5e25ca763b7572a5104f5f3" + [[package]] name = "arrays" version = "0.1.0" @@ -14,6 +35,56 @@ version = "0.1.0" name = "assoc_vs_generic" version = "0.1.0" +[[package]] +name = "async_fn" +version = "0.1.0" +dependencies = [ + "anyhow", + "tokio", +] + +[[package]] +name = "async_locks" +version = "0.1.0" +dependencies = [ + "tokio", +] + +[[package]] +name = "autocfg" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c4b4d0bd25bd0b74681c0ad21497610ce1b7c91b1022cd21c80c6fbdd9476b0" + +[[package]] +name = "backtrace" +version = "0.3.71" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "26b05800d2e817c8b3b4b54abd461726265fa9789ae34330622f2db9ee696f9d" +dependencies = [ + "addr2line", + "cc", + "cfg-if", + "libc", + "miniz_oxide", + "object", + "rustc-demangle", +] + +[[package]] +name = "bitflags" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cf4b9d6a944f767f8e5e0db018570623c85f3d925ac718db4e06d0187adb21c1" + +[[package]] +name = "blocking" +version = "0.1.0" +dependencies = [ + "anyhow", + "tokio", +] + [[package]] name = "bounded" version = "0.1.0" @@ -28,6 +99,31 @@ dependencies = [ "ticket_fields", ] +[[package]] +name = "bytes" +version = "1.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "514de17de45fdb8dc022b1a7975556c53c86f9f0aa5f534b98977b171857c2c9" + +[[package]] +name = "cancellation" +version = "0.1.0" +dependencies = [ + "tokio", +] + +[[package]] +name = "cc" +version = "1.0.97" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "099a5357d84c4c61eb35fc8eafa9a79a902c2f76911e5747ced4e032edd8d9b4" + +[[package]] +name = "cfg-if" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" + [[package]] name = "channels" version = "0.1.0" @@ -126,6 +222,19 @@ version = "0.1.0" name = "from" version = "0.1.0" +[[package]] +name = "future" +version = "0.1.0" +dependencies = [ + "tokio", +] + +[[package]] +name = "gimli" +version = "0.28.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4271d37baee1b8c7e4b708028c57d816cf9d2434acb33a549475f78c181f6253" + [[package]] name = "hashmap" version = "0.1.0" @@ -137,6 +246,12 @@ dependencies = [ name = "heap" version = "0.1.0" +[[package]] +name = "hermit-abi" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d231dfb89cfffdbc30e7fc41579ed6066ad03abda9e567ccafae602b97ec5024" + [[package]] name = "if_else" version = "0.1.0" @@ -202,7 +317,11 @@ name = "intro_05" version = "0.1.0" [[package]] -name = "intro_06" +name = "intro_07" +version = "0.1.0" + +[[package]] +name = "intro_08" version = "0.1.0" [[package]] @@ -223,6 +342,12 @@ dependencies = [ name = "leaking" version = "0.1.0" +[[package]] +name = "libc" +version = "0.2.154" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ae743338b92ff9146ce83992f766a31066a91a8c84a45e0e9f21e7cf6de6d346" + [[package]] name = "lifetime" version = "0.1.0" @@ -230,6 +355,16 @@ dependencies = [ "ticket_fields", ] +[[package]] +name = "lock_api" +version = "0.4.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "07af8b9cdd281b7915f413fa73f29ebd5d55d0d3f0155584dade1ff18cea1b17" +dependencies = [ + "autocfg", + "scopeguard", +] + [[package]] name = "locks" version = "0.1.0" @@ -242,6 +377,32 @@ dependencies = [ name = "match_" version = "0.1.0" +[[package]] +name = "memchr" +version = "2.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c8640c5d730cb13ebd907d8d04b52f55ac9a2eec55b440c8892f40d56c76c1d" + +[[package]] +name = "miniz_oxide" +version = "0.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d811f3e15f28568be3407c8e7fdb6514c1cda3cb30683f15b6a1a1dc4ea14a7" +dependencies = [ + "adler", +] + +[[package]] +name = "mio" +version = "0.8.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a4a650543ca06a924e8b371db273b2756685faae30f8487da1b56505a8f78b0c" +dependencies = [ + "libc", + "wasi", + "windows-sys 0.48.0", +] + [[package]] name = "modules" version = "0.1.0" @@ -257,6 +418,25 @@ dependencies = [ "common", ] +[[package]] +name = "num_cpus" +version = "1.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4161fcb6d602d4d2081af7c3a45852d875a03dd337a6bfdd6e06407b61342a43" +dependencies = [ + "hermit-abi", + "libc", +] + +[[package]] +name = "object" +version = "0.32.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a6a622008b6e321afc04970976f62ee297fdbaa6f95318ca343e3eebb9648441" +dependencies = [ + "memchr", +] + [[package]] name = "orphan" version = "0.1.0" @@ -293,6 +473,29 @@ version = "0.1.0" name = "panics" version = "0.1.0" +[[package]] +name = "parking_lot" +version = "0.12.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7e4af0ca4f6caed20e900d564c242b8e5d4903fdacf31d3daf527b66fe6f42fb" +dependencies = [ + "lock_api", + "parking_lot_core", +] + +[[package]] +name = "parking_lot_core" +version = "0.9.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e401f977ab385c9e4e3ab30627d6f26d00e2c73eef317493c4ec6d468726cf8" +dependencies = [ + "cfg-if", + "libc", + "redox_syscall", + "smallvec", + "windows-targets 0.52.5", +] + [[package]] name = "patch" version = "0.1.0" @@ -301,6 +504,12 @@ dependencies = [ "ticket_fields", ] +[[package]] +name = "pin-project-lite" +version = "0.2.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bda66fc9667c18cb2758a2ac84d1167245054bcf85d5d1aaa6923f45801bdd02" + [[package]] name = "proc-macro2" version = "1.0.81" @@ -319,6 +528,15 @@ dependencies = [ "proc-macro2", ] +[[package]] +name = "redox_syscall" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "469052894dcb553421e483e4209ee581a45100d31b4018de03e5a7ad86374a7e" +dependencies = [ + "bitflags", +] + [[package]] name = "references_in_memory" version = "0.1.0" @@ -334,6 +552,20 @@ dependencies = [ "ticket_fields", ] +[[package]] +name = "runtime" +version = "0.1.0" +dependencies = [ + "anyhow", + "tokio", +] + +[[package]] +name = "rustc-demangle" +version = "0.1.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "719b953e2095829ee67db738b3bfa9fa368c94900df327b3f07fe6e794d2fe1f" + [[package]] name = "rwlock" version = "0.1.0" @@ -350,6 +582,12 @@ version = "0.1.0" name = "scoped_threads" version = "0.1.0" +[[package]] +name = "scopeguard" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" + [[package]] name = "setters" version = "0.1.0" @@ -357,6 +595,15 @@ dependencies = [ "common", ] +[[package]] +name = "signal-hook-registry" +version = "1.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a9e9e0b4211b72e7b8b6e85c807d36c212bdb33ea8587f7569562a84df5465b1" +dependencies = [ + "libc", +] + [[package]] name = "sized" version = "0.1.0" @@ -365,6 +612,22 @@ version = "0.1.0" name = "slice" version = "0.1.0" +[[package]] +name = "smallvec" +version = "1.13.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3c5e1a9a646d36c3599cd173a41282daf47c44583ad367b8e6837255952e5c67" + +[[package]] +name = "socket2" +version = "0.5.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ce305eb0b4296696835b71df73eb912e0f1ffd2556a501fcede6e0c50349191c" +dependencies = [ + "libc", + "windows-sys 0.52.0", +] + [[package]] name = "source" version = "0.1.0" @@ -373,6 +636,14 @@ dependencies = [ "thiserror", ] +[[package]] +name = "spawn" +version = "0.1.0" +dependencies = [ + "anyhow", + "tokio", +] + [[package]] name = "stack" version = "0.1.0" @@ -456,6 +727,36 @@ dependencies = [ "thiserror", ] +[[package]] +name = "tokio" +version = "1.37.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1adbebffeca75fcfd058afa480fb6c0b81e165a0323f9c9d39c9697e37c46787" +dependencies = [ + "backtrace", + "bytes", + "libc", + "mio", + "num_cpus", + "parking_lot", + "pin-project-lite", + "signal-hook-registry", + "socket2", + "tokio-macros", + "windows-sys 0.48.0", +] + +[[package]] +name = "tokio-macros" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b8a1e28f2deaa14e508979454cb3a223b10b938b45af148bc0986de36f1923b" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "trait_" version = "0.1.0" @@ -510,6 +811,12 @@ version = "0.1.0" name = "visibility" version = "0.1.0" +[[package]] +name = "wasi" +version = "0.11.0+wasi-snapshot-preview1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" + [[package]] name = "welcome_00" version = "0.1.0" @@ -518,6 +825,145 @@ version = "0.1.0" name = "while_" version = "0.1.0" +[[package]] +name = "windows-sys" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "677d2418bec65e3338edb076e806bc1ec15693c5d0104683f2efe857f61056a9" +dependencies = [ + "windows-targets 0.48.5", +] + +[[package]] +name = "windows-sys" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "282be5f36a8ce781fad8c8ae18fa3f9beff57ec1b52cb3de0789201425d9a33d" +dependencies = [ + "windows-targets 0.52.5", +] + +[[package]] +name = "windows-targets" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a2fa6e2155d7247be68c096456083145c183cbbbc2764150dda45a87197940c" +dependencies = [ + "windows_aarch64_gnullvm 0.48.5", + "windows_aarch64_msvc 0.48.5", + "windows_i686_gnu 0.48.5", + "windows_i686_msvc 0.48.5", + "windows_x86_64_gnu 0.48.5", + "windows_x86_64_gnullvm 0.48.5", + "windows_x86_64_msvc 0.48.5", +] + +[[package]] +name = "windows-targets" +version = "0.52.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6f0713a46559409d202e70e28227288446bf7841d3211583a4b53e3f6d96e7eb" +dependencies = [ + "windows_aarch64_gnullvm 0.52.5", + "windows_aarch64_msvc 0.52.5", + "windows_i686_gnu 0.52.5", + "windows_i686_gnullvm", + "windows_i686_msvc 0.52.5", + "windows_x86_64_gnu 0.52.5", + "windows_x86_64_gnullvm 0.52.5", + "windows_x86_64_msvc 0.52.5", +] + +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b38e32f0abccf9987a4e3079dfb67dcd799fb61361e53e2882c3cbaf0d905d8" + +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.52.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7088eed71e8b8dda258ecc8bac5fb1153c5cffaf2578fc8ff5d61e23578d3263" + +[[package]] +name = "windows_aarch64_msvc" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dc35310971f3b2dbbf3f0690a219f40e2d9afcf64f9ab7cc1be722937c26b4bc" + +[[package]] +name = "windows_aarch64_msvc" +version = "0.52.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9985fd1504e250c615ca5f281c3f7a6da76213ebd5ccc9561496568a2752afb6" + +[[package]] +name = "windows_i686_gnu" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a75915e7def60c94dcef72200b9a8e58e5091744960da64ec734a6c6e9b3743e" + +[[package]] +name = "windows_i686_gnu" +version = "0.52.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "88ba073cf16d5372720ec942a8ccbf61626074c6d4dd2e745299726ce8b89670" + +[[package]] +name = "windows_i686_gnullvm" +version = "0.52.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87f4261229030a858f36b459e748ae97545d6f1ec60e5e0d6a3d32e0dc232ee9" + +[[package]] +name = "windows_i686_msvc" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f55c233f70c4b27f66c523580f78f1004e8b5a8b659e05a4eb49d4166cca406" + +[[package]] +name = "windows_i686_msvc" +version = "0.52.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "db3c2bf3d13d5b658be73463284eaf12830ac9a26a90c717b7f771dfe97487bf" + +[[package]] +name = "windows_x86_64_gnu" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "53d40abd2583d23e4718fddf1ebec84dbff8381c07cae67ff7768bbf19c6718e" + +[[package]] +name = "windows_x86_64_gnu" +version = "0.52.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4e4246f76bdeff09eb48875a0fd3e2af6aada79d409d33011886d3e1581517d9" + +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b7b52767868a23d5bab768e390dc5f5c55825b6d30b86c844ff2dc7414044cc" + +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.52.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "852298e482cd67c356ddd9570386e2862b5673c85bd5f88df9ab6802b334c596" + +[[package]] +name = "windows_x86_64_msvc" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ed94fce61571a4006852b7389a063ab983c02eb1bb37b47f8272ce92d06d9538" + +[[package]] +name = "windows_x86_64_msvc" +version = "0.52.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bec47e5bfd1bff0eeaf6d8b485cc1074891a197ab4225d504cb7a1ab88b02bf0" + [[package]] name = "without_channels" version = "0.1.0" diff --git a/README.md b/README.md index 1c39f9e..e6903b9 100644 --- a/README.md +++ b/README.md @@ -23,7 +23,7 @@ to get started with the course. - **Rust** (follow instructions [here](https://www.rust-lang.org/tools/install)). If `rustup` is already installed on your system, run `rustup update` (or another appropriate command depending on how you installed Rust on your system) - to make your running on the latest version. + to make sure you're running on the latest stable version. - _(Optional but recommended)_ An IDE with Rust autocompletion support. We recommend one of the following: - [RustRover](https://www.jetbrains.com/rust/); diff --git a/book/src/07_threads/00_intro.md b/book/src/07_threads/00_intro.md index e697223..0100397 100644 --- a/book/src/07_threads/00_intro.md +++ b/book/src/07_threads/00_intro.md @@ -1,13 +1,10 @@ # Intro -One of Rust's big promises is *fearless concurrency*: making it easier to write safe, concurrent programs. -We haven't seen much of that yet. All the work we've done so far has been single-threaded: -instructions executed one after the other, with strict sequencing. Time to change that! +One of Rust's big promises is *fearless concurrency*: making it easier to write safe, concurrent programs. +We haven't seen much of that yet. All the work we've done so far has been single-threaded. +Time to change that! In this chapter we'll make our ticket store multithreaded. -We will start by allowing multiple users to interface with the same store at the same time. We'll then progress -to having multiple instances of the store running concurrently while sharing the same data. - We'll have the opportunity to touch most of Rust's core concurrency features, including: - Threads, using the `std::thread` module diff --git a/book/src/08_futures/00_intro.md b/book/src/08_futures/00_intro.md new file mode 100644 index 0000000..45dd5fc --- /dev/null +++ b/book/src/08_futures/00_intro.md @@ -0,0 +1,11 @@ +# Async Rust + +Threads are not the only way to write concurrent programs in Rust. +In this chapter we'll explore another approach: **asynchronous programming**. + +In particular, you'll get an introduction to: + +- The `async`/`.await` keywords, to write asynchronous code effortlessly +- The `Future` trait, to represent computations that may not be complete yet +- `tokio`, the most popular runtime for running asynchronous code +- The cooperative nature of Rust asynchronous model, and how this affects your code \ No newline at end of file diff --git a/book/src/08_futures/01_async_fn.md b/book/src/08_futures/01_async_fn.md new file mode 100644 index 0000000..173e0f1 --- /dev/null +++ b/book/src/08_futures/01_async_fn.md @@ -0,0 +1,144 @@ +# Asynchronous functions + +All the functions and methods you've written so far were eager. +Nothing happened until you invoked them. But once you did, they ran to +completion: they did **all** their work, and then returned their output. + +Sometimes that's undesirable. +For example, if you're writing an HTTP server, there might be a lot of +**waiting**: waiting for the request body to arrive, waiting for the +database to respond, waiting for a downstream service to reply, etc. + +What if you could do something else while you're waiting? +What if you could choose to give up midway through a computation? +What if you could choose to prioritise another task over the current one? + +That's where **asynchronous functions** come in. + +## `async fn` + +You use the `async` keyword to define an asynchronous function: + +```rust +use tokio::net::TcpListener; + +// This function is asynchronous +async fn bind_random() -> TcpListener { + // [...] +} +``` + +What happens if you call `bind_random` as you would a regular function? + +```rust +fn run() { + // Invoke `bind_random` + let listener = bind_random(); + // Now what? +} +``` + +Nothing happens! +Rust doesn't start executing `bind_random` when you call it, +not even as a background task (as you might expect based on your experience +with other languages). +Asynchronous functions in Rust are **lazy**: they don't do any work until you +explicitly ask them to. +Using Rust's terminology, we say that `bind_random` returns a **future**, a type +that represents a computation that may complete later. They're called futures +because they implement the `Future` trait, an interface that we'll examine in +detail later on in this chapter. + +## `.await` + +The most common way to ask an asynchronous function to do some work is to use +the `.await` keyword: + +```rust +use tokio::net::TcpListener; + +async fn bind_random() -> TcpListener { + // [...] +} + +async fn run() { + // Invoke `bind_random` and wait for it to complete + let listener = bind_random().await; + // Now `listener` is ready +} +``` + +`.await` doesn't return control to the caller until the asynchronous function +has run to completion—e.g. until the `TcpListener` has been created in the example above. + +## Runtimes + +If you're puzzled, you're right to be! +We've just said that the perk of asynchronous functions +is that they don't do **all** their work at once. We then introduced `.await`, which +doesn't return until the asynchronous function has run to completion. Haven't we +just re-introduced the problem we were trying to solve? What's the point? + +Not quite! A lot happens behind the scenes when you call `.await`! +You're yielding control to an **async runtime**, also known as an **async executor**. +Executors are where the magic happens: they are in charge of managing all your +ongoing asynchronous **tasks**. In particular, they balance two different goals: + +- **Progress**: they make sure that tasks make progress whenever they can. +- **Efficiency**: if a task is waiting for something, they try to make sure that + another task can run in the meantime, fully utilising the available resources. + +### No default runtime + +Rust is fairly unique in its approach to asynchronous programing: there is +no default runtime. The standard library doesn't ship with one. You need to +bring your own! + +In most cases, you'll choose one of the options available in the ecosystem. +Some runtimes are designed to be broadly applicable, a solid option for most applications. +`tokio` and `async-std` belong to this category. Other runtimes are optimised for +specific use cases—e.g. `embassy` for embedded systems. + +Throughout this course we'll rely on `tokio`, the most popular runtime for general-purpose +asynchronous programming in Rust. + +### `#[tokio::main]` + +The entrypoint of your executable, the `main` function, must be a synchronous function. +That's where you're supposed to set up and launch your chosen async runtime. + +Most runtimes provides a macro to make this easier. For `tokio`, it's `tokio::main`: + +```rust +#[tokio::main] +async fn main() { + // Your async code goes here +} +``` + +which expands to: + +```rust +fn main() { + let rt = tokio::runtime::Runtime::new().unwrap(); + rt.block_on( + // Your async function goes here + // [...] + ); +} +``` + +### `#[tokio::test]` + +The same goes for tests: they must be synchronous functions. +Each test function is run in its own thread, and you're responsible for +setting up and launching an async runtime if you need to run async code +in your tests. +`tokio` provides a `#[tokio::test]` macro to make this easier: + +```rust +#[tokio::test] +async fn my_test() { + // Your async test code goes here +} +``` \ No newline at end of file diff --git a/book/src/08_futures/02_spawn.md b/book/src/08_futures/02_spawn.md new file mode 100644 index 0000000..e6a1171 --- /dev/null +++ b/book/src/08_futures/02_spawn.md @@ -0,0 +1,94 @@ +# Spawning tasks + +Your solution to the previous exercise should look something like this: + +```rust +pub async fn echo(listener: TcpListener) -> Result<(), anyhow::Error> { + loop { + let (mut socket, _) = listener.accept().await?; + let (mut reader, mut writer) = socket.split(); + tokio::io::copy(&mut reader, &mut writer).await?; + } +} +``` + +This is not bad! +If a long time passes between two incoming connections, the `echo` function will be idle +(since `TcpListener::accept` is an asynchronous function), thus allowing the executor +to run other tasks in the meantime. + +But how can we actually have multiple tasks running concurrently? +If we always run our asynchronous functions until completion (by using `.await`), we'll never +have more than one task running at a time. + +This is where the `tokio::spawn` function comes in. + +## `tokio::spawn` + +`tokio::spawn` allows you to hand off a task to the executor, **without waiting for it to complete**. +Whenever you invoke `tokio::spawn`, you're telling `tokio` to continue running +the spawned task, in the background, **concurrently** with the task that spawned it. + +Here's how you can use it to process multiple connections concurrently: + +```rust +use tokio::net::TcpListener; + +pub async fn echo(listener: TcpListener) -> Result<(), anyhow::Error> { + loop { + let (mut socket, _) = listener.accept().await?; + // Spawn a background task to handle the connection + // thus allowing the main task to immediately start + // accepting new connections + tokio::spawn(async move { + let (mut reader, mut writer) = socket.split(); + tokio::io::copy(&mut reader, &mut writer).await?; + }); + } +} +``` + +### Asynchronous blocks + +In this example, we've passed an **asynchronous block** to `tokio::spawn`: `async move { /* */ }` +Asynchronous blocks are a quick way to mark a region of code as asynchronous without having +to define a separate async function. + +### `JoinHandle` + +`tokio::spawn` returns a `JoinHandle`. +You can use `JoinHandle` to `.await` the background task, in the same way +we used `join` for spawned threads. + +```rust +pub async fn run() { + // Spawn a background task to ship telemetry data + // to a remote server + let handle = tokio::spawn(emit_telemetry()); + // In the meantime, do some other useful work + do_work().await; + // But don't return to the caller until + // the telemetry data has been successfully delivered + handle.await; +} + +pub async fn emit_telemetry() { + // [...] +} + +pub async fn do_work() { + // [...] +} + +``` + +### `std::thread::spawn` vs `tokio::spawn` + +You can think of `tokio::spawn` as the asynchronous sibling of `std::spawn::thread`. + +Notice a key difference: with `std::thread::spawn`, you're delegating control to the OS scheduler. +You're not in control of how threads are scheduled. + +With `tokio::spawn`, you're delegating to an async executor that runs entirely in +user space. The underlying OS scheduler is not involved in the decision of which task +to run next. We're in charge of that decision now, via the executor we chose to use. \ No newline at end of file diff --git a/book/src/08_futures/03_runtime.md b/book/src/08_futures/03_runtime.md new file mode 100644 index 0000000..1dca08a --- /dev/null +++ b/book/src/08_futures/03_runtime.md @@ -0,0 +1,88 @@ +# Runtime architecture + +So far we've been talking about async runtimes as an abstract concept. +Let's dig a bit deeper into the way they are implemented—as you'll see soon enough, +it has an impact on our code. + +## Flavors + +`tokio` ships two different runtime _flavors_. + +You can configure your runtime via `tokio::runtime::Builder`: + +- `Builder::new_multi_thread` gives you a **multithreaded `tokio` runtime** +- `Builder::new_current_thread` will instead rely on the **current thread** for execution. + +`#[tokio::main]` returns a multithreaded runtime by default, while +`#[tokio::test]` uses a current thread runtime out of the box. + +### Current thread runtime + +The current-thread runtime, as the name implies, relies exclusively on the OS thread +it was launched on to schedule and execute tasks. +When using the current-thread runtime, you have **concurrency** but no **parallelism**: +asynchronous tasks will be interleaved, but there will always be at most one task running +at any given time. + +### Multithreaded runtime + +When using the multithreaded runtime, instead, there can up to `N` tasks running +_in parallel_ at any given time, where `N` is the number of threads used by the +runtime. By default, `N` matches the number of available CPU cores. + +There's more: `tokio` performs **work-stealing**. +If a thread is idle, it won't wait around: it'll try to find a new task that's ready for +execution, either from a global queue or by stealing it from the local queue of another +thread. +Work-stealing can have significant performance benefits, especially on tail latencies, +whenever your application is dealing with workloads that are not perfectly balanced +across threads. + +## Implications + +`tokio::spawn` is flavor-agnostic: it'll work no matter if you're running on the multithreaded +or current-thread runtime. The downside is that the signature assume the worst case +(i.e. multithreaded) and is constrained accordingly: + +```rust +pub fn spawn(future: F) -> JoinHandle +where + F: Future + Send + 'static, + F::Output: Send + 'static, +{ /* */ } +``` + +Let's ignore the `Future` trait for now to focus on the rest. +`spawn` is asking all its inputs to be `Send` and have a `'static` lifetime. + +The `'static` constraint follows the same rationale of the `'static` constraint +on `std::thread::spawn`: the spawned task may outlive the context it was spawned +from, therefore it shouldn't depend on any local data that may be de-allocated +after the spawning context is destroyed. + +```rust +fn spawner() { + let v = vec![1, 2, 3]; + // This won't work, since `&v` doesn't + // live long enough. + tokio::spawn(async { + for x in &v { + println!("{x}") + } + }) +} +``` + +`Send`, on the other hand, is a direct consequence of `tokio`'s work-stealing strategy: +a task that was spawned on thread `A` may end up being moved to thread `B` if that's idle, +thus requiring a `Send` bound since we're crossing thread boundaries. + +```rust +fn spawner(input: Rc) { + // This won't work either, because + // `Rc` isn't `Send`. + tokio::spawn(async move { + println!("{}", input); + }) +} +``` \ No newline at end of file diff --git a/book/src/08_futures/04_future.md b/book/src/08_futures/04_future.md new file mode 100644 index 0000000..51b0290 --- /dev/null +++ b/book/src/08_futures/04_future.md @@ -0,0 +1,165 @@ +# The `Future` trait + +## The local `Rc` problem + +Let's go back to `tokio::spawn`'s signature: + +```rust +pub fn spawn(future: F) -> JoinHandle + where + F: Future + Send + 'static, + F::Output: Send + 'static, +{ /* */ } +``` + +What does it _actually_ mean for `F` to be `Send`? +It implies, as we saw in the previous section, that whatever value it captures from the +spawning environment has to be `Send`. But it goes further than that. + +Any value that's _held across a .await point_ has to be `Send`. +Let's look at an example: + +```rust +use std::rc::Rc; +use tokio::task::yield_now; + +fn spawner() { + tokio::spawn(example()); +} + +async fn example() { + // A value that's not `Send`, + // created _inside_ the async function + let non_send = Rc::new(1); + + // A `.await` point that does nothing + yield_now().await; + + // The local non-`Send` value is still needed + // after the `.await` + println!("{}", non_send); +} +``` + +The compiler will reject this code: + +```text +error: future cannot be sent between threads safely + | +5 | tokio::spawn(example()); + | ^^^^^^^^^ future returned by `example` is not `Send` + | +note: future is not `Send` as this value is used across an await + | +11 | let non_send = Rc::new(1); + | -------- has type `Rc` which is not `Send` +12 | // A `.await` point +13 | yield_now().await; + | ^^^^^ await occurs here, with `non_send` maybe used later +note: required by a bound in `tokio::spawn` + | +164 | pub fn spawn(future: F) -> JoinHandle + | ----- required by a bound in this function +165 | where +166 | F: Future + Send + 'static, + | ^^^^ required by this bound in `spawn` +``` + +To understand why that's the case, we need to refine our understanding of +Rust's asynchronous model. + +## The `Future` trait + +We stated early on that `async` functions return **futures**, types that implement +the `Future` trait. You can think of a future as a **state machine**. +It's in one of two states: + +- **pending**: the computation has not finished yet. +- **ready**: the computation has finished, here's the output. + +This is encoded in the trait definition: + +```rust +trait Future { + type Output; + + // Ignore `Pin` and `Context` for now + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll; +} +``` + +### `poll` + +The `poll` method is the heart of the `Future` trait. +A future on its own doesn't do anything. It needs to be **polled** to make progress. +When you call `poll`, you're asking the future to do some work. +`poll` tries to make progress, and then returns one of the following: + +- `Poll::Pending`: the future is not ready yet. You need to call `poll` again later. +- `Poll::Ready(value)`: the future has finished. `value` is the result of the computation, + of type `Self::Output`. + +Once `Future::poll` returns `Poll::Ready`, it should not be polled again: the future has +completed, there's nothing left to do. + +### The role of the runtime + +You'll rarely, if ever, be calling poll directly. +That's the job of your async runtime: it has all the required information (the `Context` +in `poll`'s signature) to ensure that your futures are making progress whenever they can. + +## `async fn` and futures + +We've worked with the high-level interface, asynchronous functions. +We've now looked at the low-level primitive, the `Future trait`. + +How are they related? + +Every time you mark a function as asynchronous, that function will return a future. +The compiler will transform the body of your asynchronous function into a **state machine**: +one state for each `.await` point. + +Going back to our `Rc` example: + +```rust +use std::rc::Rc; +use tokio::task::yield_now; + +async fn example() { + let non_send = Rc::new(1); + yield_now().await; + println!("{}", non_send); +} +``` + +The compiler would transform it into an enum that looks somewhat like this: + +```rust +pub enum ExampleFuture { + NotStarted, + YieldNow(Rc), + Terminated, +} +``` + +When `example` is called, it returns `ExampleFuture::NotStarted`. The future has never +been polled yet, so nothing has happened. +When the runtime polls it the first time, `ExampleFuture` will advance until the next +`.await` point: it'll stop at the `ExampleFuture::YieldNow(Rc)` stage of the state +machine, returning `Poll::Pending`. +When it's polled again, it'll execute the remaining code (`println!`) and +return `Poll::Ready(())`. + +When you look at its state machine representation, `ExampleFuture`, +it is now clear why `example` is not `Send`: it holds an `Rc`, therefore +it cannot be `Send`. + +## Yield points + +As you've just seen with `example`, every `.await` point creates a new intermediate +state in the lifecycle of a future. +That's why `.await` points are also known as **yield points**: your future _yields control_ +back to the runtime that was polling it, allowing the runtime to pause it and (if necessary) +schedule another task for execution, thus making progress on multiple fronts concurrently. + +We'll come back to the importance of yielding in a later section. \ No newline at end of file diff --git a/book/src/08_futures/05_blocking.md b/book/src/08_futures/05_blocking.md new file mode 100644 index 0000000..0b6c474 --- /dev/null +++ b/book/src/08_futures/05_blocking.md @@ -0,0 +1,79 @@ +# Don't block the runtime + +Let's circle back to yield points. +Unlike threads, **Rust tasks cannot be preempted**. + +`tokio` cannot, on its own, decide to pause a task and run another one in its place. +The control goes back to the executor **exclusively** when the task yields—i.e. +when `Future::poll` returns `Poll::Pending` or, in the case of `async fn`, when +you `.await` a future. + +This exposes the runtime to a risk: if a task never yields, the runtime will never +be able to run another task. This is called **blocking the runtime**. + +## What is blocking? + +How long is too long? How much time can a task spend without yielding before it +becomes a problem? + +It depends on the runtime, the application, the number of in-flight tasks, and +many other factors. But, as a general rule of thumb, try to spend less than 100 +microseconds between yield points. + +## Consequences + +Blocking the runtime can lead to: + +- **Deadlocks**: if the task that's not yielding is waiting for another task to + complete, and that task is waiting for the first one to yield, you have a deadlock. + No progress can be made, unless the runtime is able to schedule the other task on + a different thread. +- **Starvation**: other tasks might not be able to run, or might run after a long + delay, which can lead to poor performances (e.g. high tail latencies). + +## Blocking is not always obvious + +Some types of operations should generally be avoided in async code, like: + +- Synchronous I/O. You can't predict how long it will take, and it's likely to be + longer than 100 microseconds. +- Expensive CPU-bound computations. + +The latter category is not always obvious though. For example, sorting a vector with +a few elements is not a problem; that evaluation changes if the vector has billions +of entries. + +## How to avoid blocking + +OK, so how do you avoid blocking the runtime assuming you _must_ perform an operation +that qualifies or risks qualifying as blocking? +You need to move the work to a different thread. You don't want to use the so-called +runtime threads, the ones used by `tokio` to run tasks. + +`tokio` provides a dedicated threadpool for this purpose, called the **blocking pool**. +You can spawn a synchronous operation on the blocking pool using the +`tokio::task::spawn_blocking` function. `spawn_blocking` returns a future that resolves +to the result of the operation when it completes. + +```rust +use tokio::task; + +fn expensive_computation() -> u64 { + // [...] +} + +async fn run() { + let handle = task::spawn_blocking(expensive_computation); + // Do other stuff in the meantime + let result = handle.await.unwrap(); +} +``` + +The blocking pool is long-lived. `spawn_blocking` should be faster +than creating a new thread directly via `std::thread::spawn` +because the cost of thread initialization is amortized over multiple calls. + +## Further reading + +- Check out [Alice Ryhl's blog post](https://ryhl.io/blog/async-what-is-blocking/) + on the topic. \ No newline at end of file diff --git a/book/src/08_futures/06_async_aware_primitives.md b/book/src/08_futures/06_async_aware_primitives.md new file mode 100644 index 0000000..7e7e01f --- /dev/null +++ b/book/src/08_futures/06_async_aware_primitives.md @@ -0,0 +1,129 @@ +# Async-aware primitives + +If you browse `tokio`'s documentation, you'll notice that it provides a lot of types +that "mirror" the ones in the standard library, but with an asynchronous twist: +locks, channels, timers, and more. + +When working in an asynchronous context, you should prefer these asynchronous alternatives +to their synchronous counterparts. + +To understand why, let's take a look at `Mutex`, the mutually exclusive lock we explored +in the previous chapter. + +## Case study: `Mutex` + +Let's look at a simple example: + +```rust +use std::sync::{Arc, Mutex}; + +async fn run(m: Arc>>) { + let guard = m.lock().unwrap(); + http_call(&guard).await; + println!("Sent {:?} to the server", &guard); + // `guard` is dropped here +} + +/// Use `v` as the body of an HTTP call. +async fn http_call(v: &[u64]) { + // [...] +} +``` + +### `std::sync::MutexGuard` and yield points + +This code will compile, but it's dangerous. + +We try to acquire a lock over a `Mutex` from `std` in an asynchronous context. +We then hold on to the resulting `MutexGuard` across a yield point (the `.await` on +`http_call`). + +Let's imagine that there are two tasks executing `run`, concurrently, on a single-threaded +runtime. We observe the following sequence of scheduling events: + +```text + Task A Task B + | + Acquire lock + Yields to runtime + | + +--------------+ + | + Tries to acquire lock +``` + +We have a deadlock. Task B we'll never manage to acquire the lock, because the lock +is currently held by task A, which has yielded to the runtime before releasing the +lock and won't be scheduled again because the runtime cannot preempt task B. + +### `tokio::sync::Mutex` + +You can solve the issue by switching to `tokio::sync::Mutex`: + +```rust +use std::sync::Arc; +use tokio::sync::Mutex; + +async fn run(m: Arc>>) { + let guard = m.lock().await; + http_call(&guard).await; + println!("Sent {:?} to the server", &guard); + // `guard` is dropped here +} +``` + +Acquiring the lock is now an asynchronous operation, which yields back to the runtime +if it can't make progress. +Going back to the previous scenario, the following would happen: + +```text + Task A Task B + | + Acquires the lock + Starts `http_call` + Yields to runtime + | + +--------------+ + | + Tries to acquire the lock + Cannot acquire the lock + Yields to runtime + | + +--------------+ + | + `http_call` completes + Releases the lock + Yield to runtime + | + +--------------+ + | + Acquires the lock + [...] +``` + +All good! + +### Multithreaded won't save you + +We've used a single-threaded runtime as the execution context in our +previous example, but the same risk persists even when using a multithreaded +runtime. +The only difference is in the number of concurrent tasks required to create the deadlock: +in a single-threaded runtime, 2 are enough; in a multithreaded runtime, we +would need `N+1` tasks, where `N` is the number of runtime threads. + +### Downsides + +Having an async-aware `Mutex` comes with a performance penalty. +If you're confident that the lock isn't under significant contention +_and_ you're careful to never hold it across a yield point, you can +still use `std::sync::Mutex` in an asynchronous context. + +But weigh the performance benefit against the liveness risk you +will incur. + +## Other primitives + +We used `Mutex` as an example, but the same applies to `RwLock`, semaphores, etc. +Prefer async-aware versions when working in an asynchronous context to minimise +the risk of issues. \ No newline at end of file diff --git a/book/src/08_futures/07_cancellation.md b/book/src/08_futures/07_cancellation.md new file mode 100644 index 0000000..4b98c26 --- /dev/null +++ b/book/src/08_futures/07_cancellation.md @@ -0,0 +1,93 @@ +# Cancellation + +What happens when a pending future is dropped? +The runtime will no longer poll it, therefore it won't make any further progress. +In other words, its execution has been **cancelled**. + +In the wild, this often happens when working with timeouts. +For example: + +```rust +use tokio::time::timeout; +use tokio::sync::oneshot; +use std::time::Duration; + +async fn http_call() { + // [...] +} + +async fn run() { + // Wrap the future with a `Timeout` set to expire in 10 milliseconds. + let duration = Duration::from_millis(10); + if let Err(_) = timeout(duration, http_call()).await { + println!("Didn't receive a value within 10 ms"); + } +} +``` + +When the timeout expires, the future returned by `http_call` will be cancelled. +Let's imagine that this is `http_call`'s body: + +```rust +use std::net::TcpStream; + +async fn http_call() { + let (stream, _) = TcpStream::connect(/* */).await.unwrap(); + let request: Vec = /* */; + stream.write_all(&request).await.unwrap(); +} +``` + +Each yield point becomes a **cancellation point**. +`http_call` can't be preempted by the runtime, so it can only be discarded after +it has yielded control back to the executor via `.await`. +This applies recursively—e.g. `stream.write_all(&request)` is likely to have multiple +yield points in its implementation. It is perfectly possible to see `http_call` pushing +a _partial_ request before being cancelled, thus dropping the connection and never +finishing transmitting the body. + +## Clean up + +Rust's cancellation mechanism is quite powerful—it allows the caller to cancel an ongoing task +without needing any form of cooperation from the task itself. +At the same time, this can be quite dangerous. It may be desirable to perform a +**graceful cancellation**, to ensure that some clean-up tasks are performed +before aborting the operation. + +For example, consider this fictional API for a SQL transaction: + +```rust +async fn transfer_money( + connection: SqlConnection, + payer_id: u64, + payee_id: u64, + amount: u64 +) -> Result<(), anyhow::Error> { + let transaction = connection.begin_transaction().await?; + update_balance(payer_id, amount, &transaction).await?; + decrease_balance(payee_id, amount, &transaction).await?; + transaction.commit().await?; +} +``` + +On cancellation, it'd be ideal to explicitly abort the pending transaction rather +than leaving it hanging. +Rust, unfortunately, doesn't provide a bullet-proof mechanism for this kind of +**asynchronous** clean up operations. + +The most common strategy is to rely on the `Drop` trait to schedule the required +clean-up work. This can be by: + +- Spawning a new task on the runtime +- Enqueueing a message on a channel +- Spawning a background thread + +The optimal choice is contextual. + +## Further reading + +- Be extremely careful when using `tokio`'s `select!` macro to "race" two different futures. + Retrying the same task in a loop is dangerous unless you can ensure **cancellation safety**. + Check out [`select!`'s documentation](https://docs.rs/tokio/macro.select.html) for more details. +- Rather than "abrupt" cancellation, it can be preferable to rely + on [`CancellationToken`](https://docs.rs/tokio-util/latest/tokio_util/sync/struct.CancellationToken.html). \ No newline at end of file diff --git a/book/src/08_futures/08_outro.md b/book/src/08_futures/08_outro.md new file mode 100644 index 0000000..4b480a8 --- /dev/null +++ b/book/src/08_futures/08_outro.md @@ -0,0 +1,34 @@ +# Outro + +Rust's asynchronous model is quite powerful, but it does introduce additional +complexity. Take time to know your tools: dive deep into `tokio`'s documentation +and get familiar with its primitives to make the most out of it. + +Keep in mind, as well, that there is ongoing work at the language and `std` level +to streamline and "complete" Rust's asynchronous story. You may experience some +rough edges in your day-to-day work due to some of these missing pieces. + +A few recommendations for a mostly-pain-free async experience: + +- **Pick a runtime and stick to it.** + Some primitives (e.g. timers, I/O) are not portable across runtimes. Trying to + mix runtimes is likely to cause you pain. Trying to write code that's runtime + agnostic can significantly increase the complexity of your codebase. Avoid it + if you can. +- **There is no stable `Stream`/`AsyncIterator` interface yet.** + An `AsyncIterator` is, conceptually, an iterator that yields new items + asynchronously. There is ongoing design work, but no consensus (yet). + If you're using `tokio`, refer to [`tokio_stream`](https://docs.rs/tokio-stream/latest/tokio_stream/) + as your go-to interface. +- **Be careful with buffering.** + It is often the cause of subtle bugs. Check out + ["Barbara battles buffered streams"](https://rust-lang.github.io/wg-async/vision/submitted_stories/status_quo/barbara_battles_buffered_streams.html) + for more details. +- **There is no equivalent of scoped threads for asynchronous tasks**. + Check out ["The scoped task trilemma"](https://without.boats/blog/the-scoped-task-trilemma/) + for more details. + +Don't let these caveats scare you: asynchronous Rust is being used effectively +at _massive_ scale (e.g. AWS, Meta) to power foundational services. +You will have to master it if you're planning building networked applications +in Rust. diff --git a/book/src/SUMMARY.md b/book/src/SUMMARY.md index 8e7f2c1..b3e92af 100644 --- a/book/src/SUMMARY.md +++ b/book/src/SUMMARY.md @@ -93,4 +93,16 @@ - [`Mutex`, `Send` and `Arc`](07_threads/11_locks.md) - [`RwLock`](07_threads/12_rw_lock.md) - [Without channels](07_threads/13_without_channels.md) - - [`Sync` trait](07_threads/14_sync.md) \ No newline at end of file + - [`Sync` trait](07_threads/14_sync.md) + +- [Futures](08_futures/00_intro.md) + - [Asynchronous functions](08_futures/01_async_fn.md) + - [Spawning tasks](08_futures/02_spawn.md) + - [Runtime](08_futures/03_runtime.md) + - [Future trait](08_futures/04_future.md) + - [Blocking the runtime](08_futures/05_blocking.md) + - [Async-aware primitives](08_futures/06_async_aware_primitives.md) + - [Cancellation](08_futures/07_cancellation.md) + - [Outro](08_futures/08_outro.md) + +* [Going further](going_further.md) diff --git a/book/src/going_further.md b/book/src/going_further.md new file mode 100644 index 0000000..452aac4 --- /dev/null +++ b/book/src/going_further.md @@ -0,0 +1,54 @@ +# Epilogue + +Our tour of Rust ends here. +It has been quite extensive, but by no means exhaustive: Rust is a language with +a large surface area, and an even larger ecosystem! +Don't let this scare you, though: there's **no need to learn everything**. +You'll pick up whatever is necessary to be effective in the domain +(backend, embedded, CLIs, GUIs, etc.) **while working on your projects**. + +In the end, there are no shortcuts: if you want to get good at something, +you need to do it, over and over again. Throughout this course you wrote a fair +amount of Rust, enough to get the language and its syntax flowing under your +fingers. It'll take many more lines of code to feel it "yours", but that moment +will come without a doubt if you keep practicing. + +## Going further + +Let's close with some pointers to additional resources that you might find +useful as you move forward in your journey with Rust. + +### Exercises + +You can find more exercises to practice Rust in the [`rustlings`](https://github.com/rust-lang/rustlings) +project and on [exercism.io](https://exercism.io)'s Rust track. + +### Introductory material + +Check out [the Rust book](https://doc.rust-lang.org/book/title-page.html) and +["Programming Rust"](https://www.oreilly.com/library/view/programming-rust-2nd/9781492052586/) +if you're looking for a different perspective on the same concepts we covered throughout this course. +The material doesn't overlap perfectly, therefore you'll certainly learn something new along the +way. + +### Advanced material + +If you want to dive deeper into the language, refer to the [Rustonomicon](https://doc.rust-lang.org/nomicon/) +and ["Rust for Rustaceans"](https://nostarch.com/rust-rustaceans). +The ["Decrusted" series](https://www.youtube.com/playlist?list=PLqbS7AVVErFirH9armw8yXlE6dacF-A6z) is another excellent +resource to learn more about the internals of many of the most popular Rust libraries. + +### Domain-specific material + +If you want to use Rust for backend development, +check out ["Zero to Production in Rust"](https://zero2prod.com). +If you want to use Rust for embedded development, +check out the [Embedded Rust book](https://docs.rust-embedded.org/book/). + +### Masterclasses + +You can then find resources on key topics that cut across domains. +For testing, check out +["Advanced testing, going beyond the basics"](https://github.com/mainmatter/rust-advanced-testing-workshop). +For telemetry, check out ["You can't fix what you can't see"](https://github.com/mainmatter/rust-telemetry-workshop). + diff --git a/exercises/07_threads/00_intro/Cargo.toml b/exercises/07_threads/00_intro/Cargo.toml index 4749a4a..2d4e68e 100644 --- a/exercises/07_threads/00_intro/Cargo.toml +++ b/exercises/07_threads/00_intro/Cargo.toml @@ -1,4 +1,4 @@ [package] -name = "intro_06" +name = "intro_07" version = "0.1.0" edition = "2021" diff --git a/exercises/08_futures/00_intro/Cargo.toml b/exercises/08_futures/00_intro/Cargo.toml new file mode 100644 index 0000000..907f35e --- /dev/null +++ b/exercises/08_futures/00_intro/Cargo.toml @@ -0,0 +1,4 @@ +[package] +name = "intro_08" +version = "0.1.0" +edition = "2021" diff --git a/exercises/08_futures/00_intro/src/lib.rs b/exercises/08_futures/00_intro/src/lib.rs new file mode 100644 index 0000000..c730220 --- /dev/null +++ b/exercises/08_futures/00_intro/src/lib.rs @@ -0,0 +1,14 @@ +fn intro() -> &'static str { + // TODO: fix me 👇 + "I'm ready to _!" +} + +#[cfg(test)] +mod tests { + use crate::intro; + + #[test] + fn test_intro() { + assert_eq!(intro(), "I'm ready to learn about futures!"); + } +} diff --git a/exercises/08_futures/01_async_fn/Cargo.toml b/exercises/08_futures/01_async_fn/Cargo.toml new file mode 100644 index 0000000..330b746 --- /dev/null +++ b/exercises/08_futures/01_async_fn/Cargo.toml @@ -0,0 +1,8 @@ +[package] +name = "async_fn" +version = "0.1.0" +edition = "2021" + +[dependencies] +tokio = { version = "1", features = ["full"] } +anyhow = "1.0.83" \ No newline at end of file diff --git a/exercises/08_futures/01_async_fn/src/lib.rs b/exercises/08_futures/01_async_fn/src/lib.rs new file mode 100644 index 0000000..b8a83d8 --- /dev/null +++ b/exercises/08_futures/01_async_fn/src/lib.rs @@ -0,0 +1,45 @@ +use tokio::net::TcpListener; + +// TODO: write an echo server that accepts incoming TCP connections and +// echoes the received data back to the client. +// `echo` should not return when it finishes processing a connection, but should +// continue to accept new connections. +// +// Hint: you should rely on `tokio`'s structs and methods to implement the echo server. +// In particular: +// - `tokio::net::TcpListener::accept` to process the next incoming connection +// - `tokio::net::TcpStream::split` to obtain a reader and a writer from the socket +// - `tokio::io::copy` to copy data from the reader to the writer +pub async fn echo(listener: TcpListener) -> Result<(), anyhow::Error> { + todo!() +} + +#[cfg(test)] +mod tests { + use super::*; + use tokio::io::{AsyncReadExt, AsyncWriteExt}; + + #[tokio::test] + async fn test_echo() { + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + tokio::spawn(echo(listener)); + + let requests = vec!["hello", "world", "foo", "bar"]; + + for request in requests { + let mut socket = tokio::net::TcpStream::connect(addr).await.unwrap(); + let (mut reader, mut writer) = socket.split(); + + // Send the request + writer.write_all(request.as_bytes()).await.unwrap(); + // Close the write side of the socket + writer.shutdown().await.unwrap(); + + // Read the response + let mut buf = Vec::with_capacity(request.len()); + reader.read_to_end(&mut buf).await.unwrap(); + assert_eq!(&buf, request.as_bytes()); + } + } +} diff --git a/exercises/08_futures/02_spawn/Cargo.toml b/exercises/08_futures/02_spawn/Cargo.toml new file mode 100644 index 0000000..81433f8 --- /dev/null +++ b/exercises/08_futures/02_spawn/Cargo.toml @@ -0,0 +1,8 @@ +[package] +name = "spawn" +version = "0.1.0" +edition = "2021" + +[dependencies] +tokio = { version = "1", features = ["full"] } +anyhow = "1.0.83" \ No newline at end of file diff --git a/exercises/08_futures/02_spawn/src/lib.rs b/exercises/08_futures/02_spawn/src/lib.rs new file mode 100644 index 0000000..9e00f36 --- /dev/null +++ b/exercises/08_futures/02_spawn/src/lib.rs @@ -0,0 +1,60 @@ +use tokio::net::TcpListener; + +// TODO: write an echo server that accepts TCP connections on two listeners, concurrently. +// Multiple connections (on the same listeners) should be processed concurrently. +// The received data should be echoed back to the client. +pub async fn echoes(first: TcpListener, second: TcpListener) -> Result<(), anyhow::Error> { + todo!() +} + +#[cfg(test)] +mod tests { + use super::*; + use std::net::SocketAddr; + use std::panic; + use tokio::io::{AsyncReadExt, AsyncWriteExt}; + use tokio::task::JoinSet; + + async fn bind_random() -> (TcpListener, SocketAddr) { + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + (listener, addr) + } + + #[tokio::test] + async fn test_echo() { + let (first_listener, first_addr) = bind_random().await; + let (second_listener, second_addr) = bind_random().await; + tokio::spawn(echoes(first_listener, second_listener)); + + let requests = vec!["hello", "world", "foo", "bar"]; + let mut join_set = JoinSet::new(); + + for request in requests.clone() { + for addr in [first_addr, second_addr] { + join_set.spawn(async move { + let mut socket = tokio::net::TcpStream::connect(addr).await.unwrap(); + let (mut reader, mut writer) = socket.split(); + + // Send the request + writer.write_all(request.as_bytes()).await.unwrap(); + // Close the write side of the socket + writer.shutdown().await.unwrap(); + + // Read the response + let mut buf = Vec::with_capacity(request.len()); + reader.read_to_end(&mut buf).await.unwrap(); + assert_eq!(&buf, request.as_bytes()); + }); + } + } + + while let Some(outcome) = join_set.join_next().await { + if let Err(e) = outcome { + if let Ok(reason) = e.try_into_panic() { + panic::resume_unwind(reason); + } + } + } + } +} diff --git a/exercises/08_futures/03_runtime/Cargo.toml b/exercises/08_futures/03_runtime/Cargo.toml new file mode 100644 index 0000000..14e35aa --- /dev/null +++ b/exercises/08_futures/03_runtime/Cargo.toml @@ -0,0 +1,8 @@ +[package] +name = "runtime" +version = "0.1.0" +edition = "2021" + +[dependencies] +tokio = { version = "1", features = ["full"] } +anyhow = "1.0.83" \ No newline at end of file diff --git a/exercises/08_futures/03_runtime/src/lib.rs b/exercises/08_futures/03_runtime/src/lib.rs new file mode 100644 index 0000000..f8840b9 --- /dev/null +++ b/exercises/08_futures/03_runtime/src/lib.rs @@ -0,0 +1,61 @@ +// TODO: Implement the `fixed_reply` function. It should accept two `TcpListener` instances, +// accept connections on both of them concurrently, and always reply clients by sending +// the `Display` representation of the `reply` argument as a response. +use std::fmt::Display; +use tokio::io::AsyncWriteExt; +use tokio::net::TcpListener; + +pub async fn fixed_reply(first: TcpListener, second: TcpListener, reply: T) +where + // `T` cannot be cloned. How do you share it between the two server tasks? + T: Display + Send + Sync + 'static, +{ + todo!() +} + +#[cfg(test)] +mod tests { + use super::*; + use std::net::SocketAddr; + use std::panic; + use tokio::io::AsyncReadExt; + use tokio::task::JoinSet; + + async fn bind_random() -> (TcpListener, SocketAddr) { + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + (listener, addr) + } + + #[tokio::test] + async fn test_echo() { + let (first_listener, first_addr) = bind_random().await; + let (second_listener, second_addr) = bind_random().await; + let reply = "Yo"; + tokio::spawn(fixed_reply(first_listener, second_listener, reply)); + + let mut join_set = JoinSet::new(); + + for _ in 0..3 { + for addr in [first_addr, second_addr] { + join_set.spawn(async move { + let mut socket = tokio::net::TcpStream::connect(addr).await.unwrap(); + let (mut reader, _) = socket.split(); + + // Read the response + let mut buf = Vec::new(); + reader.read_to_end(&mut buf).await.unwrap(); + assert_eq!(&buf, reply.as_bytes()); + }); + } + } + + while let Some(outcome) = join_set.join_next().await { + if let Err(e) = outcome { + if let Ok(reason) = e.try_into_panic() { + panic::resume_unwind(reason); + } + } + } + } +} diff --git a/exercises/08_futures/04_future/Cargo.toml b/exercises/08_futures/04_future/Cargo.toml new file mode 100644 index 0000000..e272e49 --- /dev/null +++ b/exercises/08_futures/04_future/Cargo.toml @@ -0,0 +1,7 @@ +[package] +name = "future" +version = "0.1.0" +edition = "2021" + +[dependencies] +tokio = { version = "1", features = ["full"] } \ No newline at end of file diff --git a/exercises/08_futures/04_future/src/lib.rs b/exercises/08_futures/04_future/src/lib.rs new file mode 100644 index 0000000..9330b72 --- /dev/null +++ b/exercises/08_futures/04_future/src/lib.rs @@ -0,0 +1,15 @@ +//! TODO: get the code to compile by **re-ordering** the statements +//! in the `example` function. You're not allowed to change the +//! `spawner` function nor what each line does in `example`. +use std::rc::Rc; +use tokio::task::yield_now; + +fn spawner() { + tokio::spawn(example()); +} + +async fn example() { + let non_send = Rc::new(1); + yield_now().await; + println!("{}", non_send); +} diff --git a/exercises/08_futures/05_blocking/Cargo.toml b/exercises/08_futures/05_blocking/Cargo.toml new file mode 100644 index 0000000..32d8ee4 --- /dev/null +++ b/exercises/08_futures/05_blocking/Cargo.toml @@ -0,0 +1,8 @@ +[package] +name = "blocking" +version = "0.1.0" +edition = "2021" + +[dependencies] +tokio = { version = "1", features = ["full"] } +anyhow = "1.0.83" \ No newline at end of file diff --git a/exercises/08_futures/05_blocking/src/lib.rs b/exercises/08_futures/05_blocking/src/lib.rs new file mode 100644 index 0000000..ced01ab --- /dev/null +++ b/exercises/08_futures/05_blocking/src/lib.rs @@ -0,0 +1,70 @@ +// TODO: the `echo` server uses non-async primitives. +// When running the tests, you should observe that it hangs, due to a +// deadlock between the caller and the server. +// Use `spawn_blocking` inside `echo` to resolve the issue. +use std::io::{Read, Write}; +use tokio::net::TcpListener; + +pub async fn echo(listener: TcpListener) -> Result<(), anyhow::Error> { + loop { + let (socket, _) = listener.accept().await?; + let mut socket = socket.into_std()?; + let mut buffer = Vec::new(); + while let Ok(_) = socket.read(&mut buffer) {} + socket.write_all(&buffer)?; + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::net::SocketAddr; + use std::panic; + use tokio::io::{AsyncReadExt, AsyncWriteExt}; + use tokio::task::JoinSet; + + async fn bind_random() -> (TcpListener, SocketAddr) { + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + (listener, addr) + } + + #[tokio::test] + async fn test_echo() { + let (listener, addr) = bind_random().await; + tokio::spawn(echo(listener)); + + let requests = vec![ + "hello here we go with a long message", + "world", + "foo", + "bar", + ]; + let mut join_set = JoinSet::new(); + + for request in requests { + join_set.spawn(async move { + let mut socket = tokio::net::TcpStream::connect(addr).await.unwrap(); + let (mut reader, mut writer) = socket.split(); + + // Send the request + writer.write_all(request.as_bytes()).await.unwrap(); + // Close the write side of the socket + writer.shutdown().await.unwrap(); + + // Read the response + let mut buf = Vec::with_capacity(request.len()); + reader.read_to_end(&mut buf).await.unwrap(); + assert_eq!(&buf, request.as_bytes()); + }); + } + + while let Some(outcome) = join_set.join_next().await { + if let Err(e) = outcome { + if let Ok(reason) = e.try_into_panic() { + panic::resume_unwind(reason); + } + } + } + } +} diff --git a/exercises/08_futures/06_async_aware_primitives/Cargo.toml b/exercises/08_futures/06_async_aware_primitives/Cargo.toml new file mode 100644 index 0000000..ef7788f --- /dev/null +++ b/exercises/08_futures/06_async_aware_primitives/Cargo.toml @@ -0,0 +1,7 @@ +[package] +name = "async_locks" +version = "0.1.0" +edition = "2021" + +[dependencies] +tokio = { version = "1", features = ["full"] } \ No newline at end of file diff --git a/exercises/08_futures/06_async_aware_primitives/src/lib.rs b/exercises/08_futures/06_async_aware_primitives/src/lib.rs new file mode 100644 index 0000000..16efb4a --- /dev/null +++ b/exercises/08_futures/06_async_aware_primitives/src/lib.rs @@ -0,0 +1,53 @@ +/// TODO: the code below will deadlock because it's using std's channels, +/// which are not async-aware. +/// Rewrite it to use `tokio`'s channels primitive (you'll have to touch +/// the testing code too, yes). +/// +/// Can you understand the sequence of events that can lead to a deadlock? +use std::sync::mpsc; + +pub struct Message { + payload: String, + response_channel: mpsc::Sender, +} + +/// Replies with `pong` to any message it receives, setting up a new +/// channel to continue communicating with the caller. +pub async fn pong(mut receiver: mpsc::Receiver) { + loop { + if let Ok(msg) = receiver.recv() { + println!("Pong received: {}", msg.payload); + let (sender, new_receiver) = mpsc::channel(); + msg.response_channel + .send(Message { + payload: "pong".into(), + response_channel: sender, + }) + .unwrap(); + receiver = new_receiver; + } + } +} + +#[cfg(test)] +mod tests { + use crate::{pong, Message}; + use std::sync::mpsc; + + #[tokio::test] + async fn ping() { + let (sender, receiver) = mpsc::channel(); + let (response_sender, response_receiver) = mpsc::channel(); + sender + .send(Message { + payload: "pong".into(), + response_channel: response_sender, + }) + .unwrap(); + + tokio::spawn(pong(receiver)); + + let answer = response_receiver.recv().unwrap().payload; + assert_eq!(answer, "pong"); + } +} diff --git a/exercises/08_futures/07_cancellation/Cargo.toml b/exercises/08_futures/07_cancellation/Cargo.toml new file mode 100644 index 0000000..3a19802 --- /dev/null +++ b/exercises/08_futures/07_cancellation/Cargo.toml @@ -0,0 +1,7 @@ +[package] +name = "cancellation" +version = "0.1.0" +edition = "2021" + +[dependencies] +tokio = { version = "1", features = ["full"] } \ No newline at end of file diff --git a/exercises/08_futures/07_cancellation/src/lib.rs b/exercises/08_futures/07_cancellation/src/lib.rs new file mode 100644 index 0000000..20e914d --- /dev/null +++ b/exercises/08_futures/07_cancellation/src/lib.rs @@ -0,0 +1,51 @@ +// TODO: fix the `assert_eq` at the end of the tests. +// Do you understand why that's the resulting output? +use std::time::Duration; +use tokio::io::AsyncReadExt; +use tokio::net::TcpListener; + +pub async fn run(listener: TcpListener, n_messages: usize, timeout: Duration) -> Vec { + let mut buffer = Vec::new(); + for _ in 0..n_messages { + let (mut stream, _) = listener.accept().await.unwrap(); + let _ = tokio::time::timeout(timeout, async { + stream.read_to_end(&mut buffer).await.unwrap(); + }) + .await; + } + buffer +} + +#[cfg(test)] +mod tests { + use super::*; + use tokio::io::AsyncWriteExt; + + #[tokio::test] + async fn ping() { + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + let messages = vec!["hello", "from", "this", "task"]; + let timeout = Duration::from_millis(20); + let handle = tokio::spawn(run(listener, messages.len(), timeout.clone())); + + for message in messages { + let mut socket = tokio::net::TcpStream::connect(addr).await.unwrap(); + let (_, mut writer) = socket.split(); + + let (beginning, end) = message.split_at(message.len() / 2); + + // Send first half + writer.write_all(beginning.as_bytes()).await.unwrap(); + tokio::time::sleep(timeout * 2).await; + writer.write_all(end.as_bytes()).await.unwrap(); + + // Close the write side of the socket + writer.shutdown().await.unwrap(); + } + + let buffered = handle.await.unwrap(); + let buffered = std::str::from_utf8(&buffered).unwrap(); + assert_eq!(buffered, ""); + } +} diff --git a/exercises/08_futures/08_outro/Cargo.toml b/exercises/08_futures/08_outro/Cargo.toml new file mode 100644 index 0000000..3a19802 --- /dev/null +++ b/exercises/08_futures/08_outro/Cargo.toml @@ -0,0 +1,7 @@ +[package] +name = "cancellation" +version = "0.1.0" +edition = "2021" + +[dependencies] +tokio = { version = "1", features = ["full"] } \ No newline at end of file diff --git a/exercises/08_futures/08_outro/src/lib.rs b/exercises/08_futures/08_outro/src/lib.rs new file mode 100644 index 0000000..8a8f07a --- /dev/null +++ b/exercises/08_futures/08_outro/src/lib.rs @@ -0,0 +1,10 @@ +// This is our last exercise. Let's go down a more unstructured path! +// Try writing an **asynchronous REST API** to expose the functionality +// of the ticket management system we built throughout the course. +// It should expose endpoints to: +// - Create a ticket +// - Retrieve ticket details +// - Patch a ticket +// +// Use Rust's package registry, crates.io, to find the dependencies you need +// (if any) to build this system.