Tokio vs Futures
There are slight but sometimes confusing differences between tokio and the standard library.
futures_util::io::AsyncBufReadExt.lines()
is an iterator but surprisingly tokio::io::util::async_buf_read_ext::AsyncBufReadExt.lines()
is not.
Fortunatly tokio_stream::wrappers::LinesStream
is a wrapper around tokio::io::Lines
that implements Stream
.
Implementation with Futures
use std::future;
use async_compression::futures::bufread::GzipDecoder;
use futures::{AsyncBufReadExt, StreamExt, io::BufReader};
use async_fs::File;
use gallery_rs_utils::locate;
#[tokio::main]
async fn main() {
let file = File::open(locate("123.gz")).await.unwrap();
let decoder = BufReader::new(file);
//this is an iterator
let lines = BufReader::new(GzipDecoder::new(decoder)).lines();
// let res = lines.for_each(|l| {
// println!("{:?}", l);
// future::ready(())
// });
// res.await
let sum = lines.fold(0, |acc, n| async move {
acc + n.unwrap().trim().parse::<u32>().unwrap()
});
assert_eq!(sum.await, 6);
}
Implementation with Tokio
use async_compression::tokio::bufread::GzipDecoder;
use gallery_rs_utils::locate;
use tokio::{
fs::File,
io::{AsyncBufReadExt, BufReader},
};
#[tokio::main]
async fn main() {
let file = File::open(locate("123.gz")).await.unwrap();
let decoder = BufReader::new(file);
//this is not an iterator
let mut lines = BufReader::new(GzipDecoder::new(decoder)).lines();
assert_eq!(lines.next_line().await.unwrap().unwrap(), "1");
assert_eq!(lines.next_line().await.unwrap().unwrap(), "2");
assert_eq!(lines.next_line().await.unwrap().unwrap(), "3");
}
Implementation with Tokio Stream
use async_compression::tokio::bufread::GzipDecoder;
use futures::StreamExt;
use gallery_rs_utils::locate;
use tokio::{
fs::File,
io::{AsyncBufReadExt, BufReader},
};
use tokio_stream::wrappers::LinesStream;
#[tokio::main]
async fn main() {
let file = File::open(locate("123.gz")).await.unwrap();
let decoder = BufReader::new(file);
//this is not an iterator
let lines = BufReader::new(GzipDecoder::new(decoder)).lines();
//but we can wrap it to be one
let stream = LinesStream::new(lines);
let sum = stream.fold(0, |acc, n| async move {
acc + n.unwrap().trim().parse::<u32>().unwrap()
});
assert_eq!(sum.await, 6);
}