Tokio vs Futures

There are slight but sometimes confusing differences between tokio and the standard library. futures_util::io::AsyncBufReadExt.lines() is an iterator but surprisingly tokio::io::util::async_buf_read_ext::AsyncBufReadExt.lines() is not.
Fortunatly tokio_stream::wrappers::LinesStream is a wrapper around tokio::io::Lines that implements Stream.

Implementation with Futures

async-compression-badge async-fs-badge futures-badge

use std::future;

use async_compression::futures::bufread::GzipDecoder;
use futures::{AsyncBufReadExt, StreamExt, io::BufReader};

use async_fs::File;
use gallery_rs_utils::locate;

#[tokio::main]
async fn main() {
    let file = File::open(locate("123.gz")).await.unwrap();
    let decoder = BufReader::new(file);

    //this is an iterator
    let lines = BufReader::new(GzipDecoder::new(decoder)).lines();

    // let res = lines.for_each(|l| {
    //     println!("{:?}", l);
    //     future::ready(())
    // });
    // res.await
    let sum = lines.fold(0, |acc, n| async move {
        acc + n.unwrap().trim().parse::<u32>().unwrap()
    });
    assert_eq!(sum.await, 6);
}

Implementation with Tokio

async-compression-badge tokio-badge

use async_compression::tokio::bufread::GzipDecoder;

use gallery_rs_utils::locate;
use tokio::{
    fs::File,
    io::{AsyncBufReadExt, BufReader},
};

#[tokio::main]
async fn main() {
    let file = File::open(locate("123.gz")).await.unwrap();
    let decoder = BufReader::new(file);

    //this is not an iterator
    let mut lines = BufReader::new(GzipDecoder::new(decoder)).lines();

    assert_eq!(lines.next_line().await.unwrap().unwrap(), "1");
    assert_eq!(lines.next_line().await.unwrap().unwrap(), "2");
    assert_eq!(lines.next_line().await.unwrap().unwrap(), "3");
}

Implementation with Tokio Stream

async-compression-badge tokio-badge tokio-stream-badge

use async_compression::tokio::bufread::GzipDecoder;

use futures::StreamExt;
use gallery_rs_utils::locate;
use tokio::{
    fs::File,
    io::{AsyncBufReadExt, BufReader},
};
use tokio_stream::wrappers::LinesStream;

#[tokio::main]
async fn main() {
    let file = File::open(locate("123.gz")).await.unwrap();
    let decoder = BufReader::new(file);

    //this is not an iterator
    let lines = BufReader::new(GzipDecoder::new(decoder)).lines();
    //but we can wrap it to be one
    let stream = LinesStream::new(lines);

    let sum = stream.fold(0, |acc, n| async move {
        acc + n.unwrap().trim().parse::<u32>().unwrap()
    });
    assert_eq!(sum.await, 6);
}