diff --git a/src/main.rs b/src/main.rs index 0d1e33a..b09aeff 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,7 +1,7 @@ mod config; mod ui; -use crate::config::Config; +use crate::config::{Config, Transcode}; use anyhow::{Context, Error, Result}; use futures::{pin_mut, prelude::*}; use glib::{subclass::prelude::*, GBoxed, GString}; @@ -68,7 +68,7 @@ fn gmake>(factory_name: &str) -> Result { #[derive(Debug, Clone)] pub struct ConversionArgs { rel_from_path: PathBuf, - transcode: config::Transcode, + transcode: Transcode, } fn get_conversion_args(config: &Config) -> impl Iterator> + '_ { @@ -276,25 +276,49 @@ async fn transcode( // "whole" files to the intended file path, ignoring partial files in the mtime check let to_path_tmp = to_path.with_extension("tmp"); - if let config::Transcode::Copy = args.transcode { - rm_file_on_err(&to_path_tmp, async { - fs::copy(&from_path, &to_path_tmp).await.with_context(|| { - format!( - "could not copy file from {} to {}", - from_path.display(), - to_path_tmp.display() + rm_file_on_err(&to_path_tmp, async { + match args.transcode { + Transcode::Copy => { + fs::copy(&from_path, &to_path_tmp).await.with_context(|| { + format!( + "could not copy file from {} to {}", + from_path.display(), + to_path_tmp.display() + ) + })?; + } + _ => { + to_path.set_extension(args.transcode.extension()); + + transcode_gstreamer( + &from_path, + &to_path_tmp, + args.transcode.clone(), + task_id, + queue, ) - }) + .await? + } + } + + fs::rename(&to_path_tmp, &to_path).await.with_context(|| { + format!( + "could not rename temporary file {} to {}", + to_path_tmp.display(), + to_path.display() + ) }) - .await?; - - fs::rename(&to_path_tmp, &to_path).await?; - - return Ok(()); - } - - to_path.set_extension(args.transcode.extension()); + }) + .await +} +async fn transcode_gstreamer( + from_path: &Path, + to_path: &Path, + transcode: Transcode, + task_id: usize, + queue: &ui::MsgQueue, +) -> Result<()> { let file_src: Element = gmake("filesrc")?; file_src.set_property("location", &path_to_gstring(&from_path))?; @@ -310,10 +334,7 @@ async fn transcode( // downgrade pipeline RC to a weak RC to break the reference cycle let pipeline_weak = pipeline.downgrade(); - let transcode_args = args.transcode.clone(); - - let to_path_tmp_clone = to_path_tmp.clone(); - + let to_path_clone = to_path.to_owned(); decodebin.connect_pad_added(move |decodebin, src_pad| { let insert_sink = || -> Result<()> { let pipeline = match pipeline_weak.upgrade() { @@ -354,8 +375,8 @@ async fn transcode( gmake("audioconvert")?, ]; - match &transcode_args { - config::Transcode::Opus { + match &transcode { + Transcode::Opus { bitrate, bitrate_type, } => { @@ -378,13 +399,13 @@ async fn transcode( dest_elems.push(gmake("oggmux")?); } - config::Transcode::Flac { compression } => { + Transcode::Flac { compression } => { let encoder: Element = gmake("flacenc")?; encoder.set_property_from_str("quality", &compression.to_string()); dest_elems.push(encoder); } - config::Transcode::Mp3 { + Transcode::Mp3 { bitrate, bitrate_type, } => { @@ -404,11 +425,14 @@ async fn transcode( dest_elems.push(gmake("id3v2mux")?); } - config::Transcode::Copy => unreachable!(), + Transcode::Copy => { + // already handled outside this fn + unreachable!(); + } }; let file_dest: gstreamer_base::BaseSink = gmake("filesink")?; - file_dest.set_property("location", &path_to_gstring(&to_path_tmp_clone))?; + file_dest.set_property("location", &path_to_gstring(&to_path_clone))?; file_dest.set_sync(false); dest_elems.push(file_dest.upcast()); @@ -446,125 +470,120 @@ async fn transcode( let bus = pipeline.get_bus().context("pipe get bus")?; - rm_file_on_err(&to_path_tmp, async { - pipeline - .set_state(gstreamer::State::Playing) - .context("Unable to set the pipeline to the `Playing` state")?; + pipeline + .set_state(gstreamer::State::Playing) + .context("Unable to set the pipeline to the `Playing` state")?; - let stream_processor = async { - bus.stream() - .map::, _>(|msg| { - use gstreamer::MessageView; + let stream_processor = async { + bus.stream() + .map::, _>(|msg| { + use gstreamer::MessageView; - match msg.view() { - // MessageView::Progress() => { + match msg.view() { + // MessageView::Progress() => { - // } - MessageView::Eos(..) => { - // we need to actively stop pulling the stream, that's because stream will - // never end despite yielding an `Eos` message - Ok(false) - } - MessageView::Error(err) => { - pipeline.set_state(gstreamer::State::Null).context( - "Unable to set the pipeline to the `Null` state, after error", - )?; - - let err = err - .get_details() - .and_then(|details| { - if details.get_name() != "error-details" { - return None; - } - - let err = details - .get::<&GBoxErrorWrapper>("error") - .unwrap() - .map(|err| err.clone().into()) - .expect("error-details message without actual error"); - Some(err) - }) - .unwrap_or_else(|| { - GErrorMessage { - src: msg - .get_src() - .map(|s| String::from(s.get_path_string())) - .unwrap_or_else(|| String::from("None")), - error: err.get_error().to_string(), - debug: err.get_debug(), - source: err.get_error(), - } - .into() - }); - Err(err) - } - _ => Ok(true), + // } + MessageView::Eos(..) => { + // we need to actively stop pulling the stream, that's because stream will + // never end despite yielding an `Eos` message + Ok(false) } - }) - .take_while(|e| { - if let Ok(false) = e { - futures::future::ready(false) - } else { - futures::future::ready(true) + MessageView::Error(err) => { + pipeline.set_state(gstreamer::State::Null).context( + "Unable to set the pipeline to the `Null` state, after error", + )?; + + let err = err + .get_details() + .and_then(|details| { + if details.get_name() != "error-details" { + return None; + } + + let err = details + .get::<&GBoxErrorWrapper>("error") + .unwrap() + .map(|err| err.clone().into()) + .expect("error-details message without actual error"); + Some(err) + }) + .unwrap_or_else(|| { + GErrorMessage { + src: msg + .get_src() + .map(|s| String::from(s.get_path_string())) + .unwrap_or_else(|| String::from("None")), + error: err.get_error().to_string(), + debug: err.get_debug(), + source: err.get_error(), + } + .into() + }); + Err(err) } - }) - .try_for_each(|_| futures::future::ready(Ok(()))) - .await - .context("failed converting")?; + _ => Ok(true), + } + }) + .take_while(|e| { + if let Ok(false) = e { + futures::future::ready(false) + } else { + futures::future::ready(true) + } + }) + .try_for_each(|_| futures::future::ready(Ok(()))) + .await + .context("failed converting")?; - Result::<_>::Ok(()) - }; - pin_mut!(stream_processor); + Result::<_>::Ok(()) + }; + pin_mut!(stream_processor); - let mut progress_interval = interval(Duration::from_millis(ui::UPDATE_INTERVAL_MILLIS / 2)); - let progress_processor = async { - use gstreamer::ClockTime; + let mut progress_interval = interval(Duration::from_millis(ui::UPDATE_INTERVAL_MILLIS / 2)); + let progress_processor = async { + use gstreamer::ClockTime; - loop { - progress_interval.tick().await; + loop { + progress_interval.tick().await; - let dur = decodebin - .query_duration::() + let dur = decodebin + .query_duration::() + .and_then(|time| time.nanoseconds()); + + let ratio = dur.and_then(|dur| { + if dur == 0 { + return None; + } + + let pos = decodebin + .query_position::() .and_then(|time| time.nanoseconds()); - let ratio = dur.and_then(|dur| { - if dur == 0 { - return None; - } + pos.map(|pos| { + let ratio = pos as f64 / dur as f64; + ratio.max(0.0).min(1.0) + }) + }); - let pos = decodebin - .query_position::() - .and_then(|time| time.nanoseconds()); - - pos.map(|pos| { - let ratio = pos as f64 / dur as f64; - ratio.max(0.0).min(1.0) - }) - }); - - if let Some(ratio) = ratio { - queue.push(ui::Msg::TaskProgress { id: task_id, ratio }); - } + if let Some(ratio) = ratio { + queue.push(ui::Msg::TaskProgress { id: task_id, ratio }); } + } - #[allow(unreachable_code)] - Result::<_>::Ok(()) - }; - pin_mut!(progress_processor); + #[allow(unreachable_code)] + Result::<_>::Ok(()) + }; + pin_mut!(progress_processor); - future::try_select(stream_processor, progress_processor) - .await - .map_err(|err| err.factor_first().0)?; + future::try_select(stream_processor, progress_processor) + .await + .map_err(|err| err.factor_first().0)?; - pipeline - .set_state(gstreamer::State::Null) - .context("Unable to set the pipeline to the `Null` state")?; + pipeline + .set_state(gstreamer::State::Null) + .context("Unable to set the pipeline to the `Null` state")?; - fs::rename(&to_path_tmp, &to_path).await?; - - Ok(()) - }) - .await + Ok(()) } async fn rm_file_on_err(path: &Path, f: F) -> Result