From f1d3805e6be77146249c37794b4a18915712b946 Mon Sep 17 00:00:00 2001 From: Thomas Heck Date: Sat, 7 Nov 2020 20:50:54 +0100 Subject: [PATCH] move blocking directory reading to external thread --- src/main.rs | 35 ++++++++++++++++++++++++----------- 1 file changed, 24 insertions(+), 11 deletions(-) diff --git a/src/main.rs b/src/main.rs index d2a6c0b..46010bd 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,5 +1,5 @@ use anyhow::{Context, Result}; -use futures::prelude::*; +use futures::{channel::mpsc, prelude::*}; use glib::GString; use gstreamer::Element; use gstreamer_audio::{prelude::*, AudioEncoder}; @@ -56,16 +56,29 @@ fn main() -> Result<()> { gstreamer::init()?; let input = std::env::args().nth(1).context("missing input")?; let output = std::env::args().nth(2).context("missing output")?; - futures::executor::block_on( - futures::stream::iter(get_path_pairs(input.into(), output.into())).for_each_concurrent( - num_cpus::get(), - |(src, dest)| async move { - if let Err(err) = transcode(src.as_path(), dest.as_path()).await { - println!("err {} => {}:\n{:?}", src.display(), dest.display(), err); - } - }, - ), - ); + + let (pair_tx, pair_rx) = mpsc::channel(16); + + // move blocking directory reading to an external thread + let pair_producer = std::thread::spawn(|| { + let produce_pairs = futures::stream::iter(get_path_pairs(input.into(), output.into())) + .map(Ok) + .forward(pair_tx) + .map(|res| res.context("sending path pairs failed")); + futures::executor::block_on(produce_pairs) + }); + + let transcoder = pair_rx.for_each_concurrent(num_cpus::get(), |(src, dest)| async move { + if let Err(err) = transcode(src.as_path(), dest.as_path()).await { + println!("err {} => {}:\n{:?}", src.display(), dest.display(), err); + } + }); + futures::executor::block_on(transcoder); + + pair_producer + .join() + .expect("directory reading thread panicked")?; + Ok(()) }