A journey with Rust by side roads

This is the companion book for the Rust Language Code Gallery covering a variety of fields and programming styles

Image

Acknowledgement

This book use the same organization of this rust-cookbook fork, as a consequence :

  • All examples are runnable via cargo run --example
  • All examples are also run via cargo test
  • This makes it easier for users to run the examples that are incompatible with the Rust Playground

Source code

Not obvious at first

Clarification never hurt even for trivial things.

Mutable reference

A mutable reference can be store in an immutable variable

 let mut s = String::from("hello");
 let r1 = &mut s;

r1 is inferred as &mut String but using an immutable type is allowed by the compiler and

 let r1: &String = &mut s;

compile without errors.

Does it compile to the same code if we use immutable reference ?

 let r1 = &s;

Let's compare

pub fn review() {
    let mut s = String::from("hello");
    let r1: &String = &mut s;
    println!("{}",r1);
}

and

pub fn review() {
    let mut s = String::from("hello");
    let r1 = &s; 
    println!("{}",r1);
}

with cargo-asm asm experiment::review

As intended the asm code is exactly the same in both cases :

 push    rbp
 mov     rbp, rsp
 push    rbx
 sub     rsp, 104
 mov     edi, 5
 mov     esi, 1
 call    ___rust_alloc
 test    rax, rax
 je      LBB2_6
 mov     cl, byte, ptr, [rip, +, l_anon.59752abc5bcc54f35c396f0afb4d0f15.1+4]
 mov     byte, ptr, [rax, +, 4], cl
 mov     ecx, dword, ptr, [rip, +, l_anon.59752abc5bcc54f35c396f0afb4d0f15.1]
 mov     dword, ptr, [rax], ecx
 mov     qword, ptr, [rbp, -, 32], rax
 movaps  xmm0, xmmword, ptr, [rip, +, LCPI2_0]
 movups  xmmword, ptr, [rbp, -, 24], xmm0
 lea     rax, [rbp, -, 32]
 mov     qword, ptr, [rbp, -, 40], rax
 lea     rax, [rbp, -, 40]
 mov     qword, ptr, [rbp, -, 56], rax
 lea     rax, [rip, +, __ZN44_$LT$$RF$T$u20$as$u20$core..fmt..Display$GT$3fmt17h75513a3126905787E]
 mov     qword, ptr, [rbp, -, 48], rax
 lea     rax, [rip, +, l_anon.59752abc5bcc54f35c396f0afb4d0f15.3]
 mov     qword, ptr, [rbp, -, 104], rax
 mov     qword, ptr, [rbp, -, 96], 2
 mov     qword, ptr, [rbp, -, 88], 0
 lea     rax, [rbp, -, 56]
 mov     qword, ptr, [rbp, -, 72], rax
 mov     qword, ptr, [rbp, -, 64], 1
 lea     rdi, [rbp, -, 104]
 call    std::io::stdio::_print
 mov     rsi, qword, ptr, [rbp, -, 24]
 test    rsi, rsi
 je      LBB2_4
 mov     rdi, qword, ptr, [rbp, -, 32]
 mov     edx, 1
 call    ___rust_dealloc
LBB2_4:
 add     rsp, 104
 pop     rbx
 pop     rbp
 ret
LBB2_6:
 mov     edi, 5
 mov     esi, 1
 call    alloc::alloc::handle_alloc_error
LBB2_5:
 mov     rbx, rax
 lea     rdi, [rbp, -, 32]
 call    core::ptr::drop_in_place
 mov     rdi, rbx
 call    __Unwind_Resume
 ud2

Tokio vs Futures

There are slight but sometimes confusing differences between tokio and the standard library. futures_util::io::AsyncBufReadExt.lines() is an iterator but surprisingly tokio::io::util::async_buf_read_ext::AsyncBufReadExt.lines() is not.
Fortunatly tokio_stream::wrappers::LinesStream is a wrapper around tokio::io::Lines that implements Stream.

Implementation with Futures

async-compression-badge async-fs-badge futures-badge

use std::future;

use async_compression::futures::bufread::GzipDecoder;
use futures::{AsyncBufReadExt, StreamExt, io::BufReader};

use async_fs::File;
use gallery_rs_utils::locate;

#[tokio::main]
async fn main() {
    let file = File::open(locate("123.gz")).await.unwrap();
    let decoder = BufReader::new(file);

    //this is an iterator
    let lines = BufReader::new(GzipDecoder::new(decoder)).lines();

    // let res = lines.for_each(|l| {
    //     println!("{:?}", l);
    //     future::ready(())
    // });
    // res.await
    let sum = lines.fold(0, |acc, n| async move {
        acc + n.unwrap().trim().parse::<u32>().unwrap()
    });
    assert_eq!(sum.await, 6);
}

Implementation with Tokio

async-compression-badge tokio-badge

use async_compression::tokio::bufread::GzipDecoder;

use gallery_rs_utils::locate;
use tokio::{
    fs::File,
    io::{AsyncBufReadExt, BufReader},
};

#[tokio::main]
async fn main() {
    let file = File::open(locate("123.gz")).await.unwrap();
    let decoder = BufReader::new(file);

    //this is not an iterator
    let mut lines = BufReader::new(GzipDecoder::new(decoder)).lines();

    assert_eq!(lines.next_line().await.unwrap().unwrap(), "1");
    assert_eq!(lines.next_line().await.unwrap().unwrap(), "2");
    assert_eq!(lines.next_line().await.unwrap().unwrap(), "3");
}

Implementation with Tokio Stream

async-compression-badge tokio-badge tokio-stream-badge

use async_compression::tokio::bufread::GzipDecoder;

use futures::StreamExt;
use gallery_rs_utils::locate;
use tokio::{
    fs::File,
    io::{AsyncBufReadExt, BufReader},
};
use tokio_stream::wrappers::LinesStream;

#[tokio::main]
async fn main() {
    let file = File::open(locate("123.gz")).await.unwrap();
    let decoder = BufReader::new(file);

    //this is not an iterator
    let lines = BufReader::new(GzipDecoder::new(decoder)).lines();
    //but we can wrap it to be one
    let stream = LinesStream::new(lines);

    let sum = stream.fold(0, |acc, n| async move {
        acc + n.unwrap().trim().parse::<u32>().unwrap()
    });
    assert_eq!(sum.await, 6);
}

Spurious stream lock

Without extra care, a simple stream of String can be viciously locked at runtime if not fully UTF-8 compliant.

Update: This turn out to be a real bug in futures-0.3.28-badge and was fixed in futures-0.3.29-badge

Simple stream of String

Does show an error when unvalid UTF-8 line is encountered

use gallery_rs_utils::locate;

use std::fs::File;
use std::io::{self, BufRead};

fn main() {
    let file = File::open(locate("corrupted_utf8")).unwrap();
    let stream = io::BufReader::new(file).lines();
    stream.for_each(|line| println!("{}", line.unwrap()));
}

Does filter bad lines when unvalid UTF-8 line is encountered

tokio-badge tokio-stream-badge

use gallery_rs_utils::locate;

use futures::{Stream, StreamExt, pin_mut, stream};
use tokio::fs::File;
use tokio::io::{AsyncBufReadExt, BufReader};
use tokio_stream::wrappers::LinesStream;

//lock not reproduced
#[tokio::main]
async fn main() {
    let file = File::open(locate("corrupted_utf8")).await.unwrap();

    let decoder = BufReader::new(file).lines();
    let lines = LinesStream::new(decoder);
    let stream = lines.flat_map(stream::iter);

    process(stream).await;
}

pub async fn process(stream: impl Stream<Item = String>) {
    pin_mut!(stream);
    let mut total = 0;
    while let Some(line) = tokio_stream::StreamExt::next(&mut stream).await {
        println!("{} {}", total, line);
        total += 1;
    }
}

Does lock when unvalid UTF-8 line is encountered

futures-0.3.28-badge

use gallery_rs_utils::locate;

use async_fs::File;
use futures::{AsyncBufReadExt, Stream, StreamExt, io::BufReader, pin_mut, stream};

//lock reproduced
#[tokio::main]
async fn main() {
    let file = File::open(locate("corrupted_utf8")).await.unwrap();

    let stream = BufReader::new(file).lines().flat_map(stream::iter);

    process(stream).await;
}

pub async fn process(stream: impl Stream<Item = String>) {
    pin_mut!(stream);
    let mut total = 0;
    while let Some(line) = tokio_stream::StreamExt::next(&mut stream).await {
        println!("{} {}", total, line);
        total += 1;
    }
}

Tokio spawn & lifetime

Excerpt from the Tokio documentation

When you spawn a task on the Tokio runtime, its type's lifetime must be 'static. This means that the spawned task must not contain any references to data owned outside the task.

Run this to show the compiler error when this requirement is no met.

tokio-badge

use tokio::task::JoinSet;
use tokio::time::{sleep, Duration};

async fn compute(item: &str) {
    println!("{}", item);
}

#[tokio::main]
async fn main() {
    let v: Vec<String> = vec![String::from("john"), String::from("doe")];

    let mut v_ref = Vec::<&str>::with_capacity(v.len());
    for item in &v {
        v_ref.push(item);
    }

    let mut join_set = JoinSet::new();
    for item in v_ref {
        join_set.spawn(async { compute(item).await });
    }
    sleep(Duration::from_millis(100)).await;
}

The fact that the drop of v after main is not compatible with "argument requires that `v` is borrowed for `'static`" is not obvious.
The issue is that v has the lifetime defined by the scope of main() and not the 'static lifetime required by the Trait bound1 of the spawn function.

The solution for Tokio API is Arc.

tokio-badge

use std::sync::Arc;

use tokio::task::JoinSet;
use tokio::time::{Duration, sleep};

async fn compute(item: Arc<str>) {
    println!("{}", item)
}

#[tokio::main]
async fn main() {
    let v: Vec<String> = vec![String::from("john"), String::from("doe")];

    let mut v_ref = Vec::<Arc<str>>::with_capacity(v.len());
    for item in &v {
        v_ref.push(Arc::from(item.as_str()));
    }

    let mut join_set = JoinSet::new();
    for item in v_ref {
        join_set.spawn(async { compute(item).await });
    }
    sleep(Duration::from_millis(100)).await;
}

We can minimize the same issue without Tokio.

use std::fmt::Debug;

fn compute(item: impl Debug + 'static) {
    println!("{:?}", item);
}

fn main() {
    let v: Vec<String> = vec![String::from("john"), String::from("doe")];

    let mut v_ref = Vec::<&str>::with_capacity(v.len());
    for item in &v {
        v_ref.push(item);
    }

    for item in v_ref {
        compute(item);
    }
}

And solved it the same way with Arc.

use std::{fmt::Debug, sync::Arc};

fn compute(item: impl Debug + 'static) {
    println!("{:?}", item);
}

fn main() {
    let v: Vec<String> = vec![String::from("john"), String::from("doe")];

    let mut v_ref = Vec::<Arc<str>>::with_capacity(v.len());
    for item in &v {
        v_ref.push(Arc::from(item.as_str()));
    }

    for item in v_ref {
        compute(item);
    }
}

But why is the Tokio requirement so strong ?
If the requirement was simply a regular lifetime we can do :

fn compute<'a>(item: impl std::fmt::Debug + 'a) {
    println!("{:?}", item)
}

fn main() {
    let v: Vec<String> = vec![String::from("john"), String::from("doe")];

    let mut v_ref = Vec::<&str>::with_capacity(v.len());
    for item in &v {
        v_ref.push(item);
    }

    for item in v_ref {
        compute(item);
    }
}

It turn out2 that the async executor has exactly this requirement.
spawn in smol is defined ike this :

fn spawn<T: Send + 'a>(&self, future: impl Future<Output = T> + Send + 'a) -> Task<T>

whereas Tokio spawn is defined ike this :

    pub fn spawn<F>(&mut self, task: F) -> AbortHandle
    where
        F: Future<Output = T>,
        F: Send + 'static,
        T: Send,

So I would expect this code to compile :

async-executor-badge

use async_executor::Executor;

async fn compute<'a>(item: impl std::fmt::Debug + 'a) {
    println!("{:?}", item)
}

fn main() {
    let ex: Executor = Executor::new();
    let v: Vec<String> = vec![String::from("john"), String::from("doe")];

    let mut v_ref: Vec<&str> = Vec::<&str>::with_capacity(v.len());
    for item in &v {
        v_ref.push(item);
    }

    for item in v_ref {
        let _task = ex.spawn(async {
            compute(item).await;
        });
    }
}

  1. Lifetime as a trait bound

  2. How to deal with tokio::spawn closure required to be 'static and &self?

Type-Driven API

This is from Will Crichton talk at the Strange Loop Conference 2021

use std::{thread::sleep, time::Duration};

const CLEAR: &str = "\x1B[2J\x1B[1;1H";

struct Unbounded;
struct Bounded {
    bound: usize,
    delims: (char, char),
}

struct Progress<Iter, Bound> {
    iter: Iter,
    i: usize,
    bound: Bound,
}

trait ProgressDisplay: Sized {
    fn display<Iter>(&self, progress: &Progress<Iter, Self>);
}

impl ProgressDisplay for Unbounded {
    fn display<Iter>(&self, progress: &Progress<Iter, Self>) {
        print!("{}", "*".repeat(progress.i));
    }
}

impl ProgressDisplay for Bounded {
    fn display<Iter>(&self, progress: &Progress<Iter, Self>) {
        print!(
            "{}{}{}{}",
            self.delims.0,
            "*".repeat(progress.i),
            " ".repeat(self.bound - progress.i),
            self.delims.1
        );
        std::io::Write::flush(&mut std::io::stdout()).unwrap();
    }
}

impl<Iter> Progress<Iter, Unbounded> {
    pub fn new(iter: Iter) -> Self {
        Progress {
            iter,
            i: 0,
            bound: Unbounded,
        }
    }
}

impl<Iter> Progress<Iter, Unbounded>
where
    Iter: ExactSizeIterator,
{
    pub fn with_bound(self) -> Progress<Iter, Bounded> {
        let bound = Bounded {
            bound: self.iter.len(),
            delims: ('[', ']'),
        };
        Progress {
            i: self.i,
            iter: self.iter,
            bound,
        }
    }
}

impl<Iter> Progress<Iter, Bounded> {
    pub fn with_delims(mut self, delims: (char, char)) -> Self {
        self.bound.delims = delims;
        self
    }
}

impl<Iter, Bound> Iterator for Progress<Iter, Bound>
where
    Iter: Iterator,
    Bound: ProgressDisplay,
{
    type Item = Iter::Item;

    fn next(&mut self) -> Option<Self::Item> {
        print!("{}", CLEAR);
        self.bound.display(&self);
        self.i += 1;

        self.iter.next()
    }
}

trait ProgressIteratorExt: Sized {
    fn progress(self) -> Progress<Self, Unbounded>;
}

impl<Iter> ProgressIteratorExt for Iter {
    fn progress(self) -> Progress<Self, Unbounded> {
        Progress::new(self)
    }
}

fn expensive_calculations(_n: &i32) {
    sleep(Duration::from_secs(1));
}

fn main() {
    let brkts = ('<', '>');
    //  for n in (0 .. ).progress().with_delims(btkts) {
    //     expensive_calculations(&n);
    //  }

    let v = vec![1, 2, 3];
    for n in v.iter().progress().with_bound().with_delims(brkts) {
        expensive_calculations(n);
    }
}

See also https://willcrichton.net/rust-api-type-patterns

Rustification

Articles adapted to rust.

Option Cheat Sheet

Adapted from λ Tony's blog λ scala.Option Cheat Sheet


and_then (aka flatMap in Scala)

match option {
     None => None,
     Some(x) => foo(x)
 }

This code is equivalent to :

option.and_then(foo)

flatten

match option {
     None => None,
     Some(x) => x
 }

This code is equivalent to :

option.flatten() //(and not unwrap() that would fail for option = None)

map

match option {
     None => None,
     Some(x) => Some(foo(x))
 }

This code is equivalent to :

option.map(foo)

for_each

match option {
     None => (),
     Some(x) => foo(x)
 }

This code is equivalent to :

option.into_iter().for_each(foo);

is_some

match option {
      None => false,
      Some(_) => true
 }

This code is equivalent to :

option.is_some()

is_none

match option {
      None => true,
      Some(_) => false
 }

This code is equivalent to :

option.is_none()

all (aka forall in Scala)

match option {
      None => true,
      Some(x) => foo(x)
 }

This code is equivalent to :

option.into_iter().all (|x| foo(x))

any (aka exists in Scala)

match option {
      None => false,
      Some(x) => foo(x)
 }

This code is equivalent to :

option.into_iter().any (|x| foo(x))

or (aka orElse in Scala)

match option {
      None => foo,
      Some(x) => Some(x)
 }

This code is equivalent to :

option.or(foo)

unwrap_or (aka getOrElse in Scala)

match option {
      None => foo,
      Some(x) => x
 }

This code is equivalent to :

option.unwrap_or(foo)

filter_map().collect() (aka toList in Scala)

match option {
      None => Vec::new(),
      Some(x) => vec![x]
 }

This code is equivalent to :

option.into_iter().filter_map(|x| Some(x)).collect::<Vec<_>>()

Resources

Iterating over an Option

Cookbook

1. Convert Vec<String> to Vec<&str>

fn main() {
    let v: Vec<String> = vec![String::from("john"), String::from("doe")];

    let mut v_ref = Vec::<&str>::with_capacity(v.len());
    for item in &v {
        v_ref.push(item);
    }
}