Spurious stream lock
Without extra care, a simple stream of String can be viciously locked at runtime if not fully UTF-8 compliant.
Simple stream of String
Does show an error when unvalid UTF-8 line is encountered
use gallery_rs_utils::locate;
use std::fs::File;
use std::io::{self, BufRead};
fn main() {
let file = File::open(locate("corrupted_utf8")).unwrap();
let stream = io::BufReader::new(file).lines();
stream.for_each(|line| println!("{}", line.unwrap()));
}
Does filter bad lines when unvalid UTF-8 line is encountered
use gallery_rs_utils::locate;
use futures::{Stream, StreamExt, pin_mut, stream};
use tokio::fs::File;
use tokio::io::{AsyncBufReadExt, BufReader};
use tokio_stream::wrappers::LinesStream;
//lock not reproduced
#[tokio::main]
async fn main() {
let file = File::open(locate("corrupted_utf8")).await.unwrap();
let decoder = BufReader::new(file).lines();
let lines = LinesStream::new(decoder);
let stream = lines.flat_map(stream::iter);
process(stream).await;
}
pub async fn process(stream: impl Stream<Item = String>) {
pin_mut!(stream);
let mut total = 0;
while let Some(line) = tokio_stream::StreamExt::next(&mut stream).await {
println!("{} {}", total, line);
total += 1;
}
}
Does lock when unvalid UTF-8 line is encountered
use gallery_rs_utils::locate;
use async_fs::File;
use futures::{AsyncBufReadExt, Stream, StreamExt, io::BufReader, pin_mut, stream};
//lock reproduced
#[tokio::main]
async fn main() {
let file = File::open(locate("corrupted_utf8")).await.unwrap();
let stream = BufReader::new(file).lines().flat_map(stream::iter);
process(stream).await;
}
pub async fn process(stream: impl Stream<Item = String>) {
pin_mut!(stream);
let mut total = 0;
while let Some(line) = tokio_stream::StreamExt::next(&mut stream).await {
println!("{} {}", total, line);
total += 1;
}
}