Spurious stream lock

Without extra care, a simple stream of String can be viciously locked at runtime if not fully UTF-8 compliant.

Update: This turn out to be a real bug in futures-0.3.28-badge and was fixed in futures-0.3.29-badge

Simple stream of String

Does show an error when unvalid UTF-8 line is encountered

use gallery_rs_utils::locate;

use std::fs::File;
use std::io::{self, BufRead};

fn main() {
    let file = File::open(locate("corrupted_utf8")).unwrap();
    let stream = io::BufReader::new(file).lines();
    stream.for_each(|line| println!("{}", line.unwrap()));
}

Does filter bad lines when unvalid UTF-8 line is encountered

tokio-badge tokio-stream-badge

use gallery_rs_utils::locate;

use futures::{Stream, StreamExt, pin_mut, stream};
use tokio::fs::File;
use tokio::io::{AsyncBufReadExt, BufReader};
use tokio_stream::wrappers::LinesStream;

//lock not reproduced
#[tokio::main]
async fn main() {
    let file = File::open(locate("corrupted_utf8")).await.unwrap();

    let decoder = BufReader::new(file).lines();
    let lines = LinesStream::new(decoder);
    let stream = lines.flat_map(stream::iter);

    process(stream).await;
}

pub async fn process(stream: impl Stream<Item = String>) {
    pin_mut!(stream);
    let mut total = 0;
    while let Some(line) = tokio_stream::StreamExt::next(&mut stream).await {
        println!("{} {}", total, line);
        total += 1;
    }
}

Does lock when unvalid UTF-8 line is encountered

futures-0.3.28-badge

use gallery_rs_utils::locate;

use async_fs::File;
use futures::{AsyncBufReadExt, Stream, StreamExt, io::BufReader, pin_mut, stream};

//lock reproduced
#[tokio::main]
async fn main() {
    let file = File::open(locate("corrupted_utf8")).await.unwrap();

    let stream = BufReader::new(file).lines().flat_map(stream::iter);

    process(stream).await;
}

pub async fn process(stream: impl Stream<Item = String>) {
    pin_mut!(stream);
    let mut total = 0;
    while let Some(line) = tokio_stream::StreamExt::next(&mut stream).await {
        println!("{} {}", total, line);
        total += 1;
    }
}