refactor: move actual transcoding into own fn

This commit is contained in:
2021-04-17 01:53:25 +02:00
parent d073ef10b5
commit f2bfddd76e

View File

@@ -1,7 +1,7 @@
mod config; mod config;
mod ui; mod ui;
use crate::config::Config; use crate::config::{Config, Transcode};
use anyhow::{Context, Error, Result}; use anyhow::{Context, Error, Result};
use futures::{pin_mut, prelude::*}; use futures::{pin_mut, prelude::*};
use glib::{subclass::prelude::*, GBoxed, GString}; use glib::{subclass::prelude::*, GBoxed, GString};
@@ -68,7 +68,7 @@ fn gmake<T: IsA<Element>>(factory_name: &str) -> Result<T> {
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct ConversionArgs { pub struct ConversionArgs {
rel_from_path: PathBuf, rel_from_path: PathBuf,
transcode: config::Transcode, transcode: Transcode,
} }
fn get_conversion_args(config: &Config) -> impl Iterator<Item = Result<ConversionArgs>> + '_ { fn get_conversion_args(config: &Config) -> impl Iterator<Item = Result<ConversionArgs>> + '_ {
@@ -276,25 +276,49 @@ async fn transcode(
// "whole" files to the intended file path, ignoring partial files in the mtime check // "whole" files to the intended file path, ignoring partial files in the mtime check
let to_path_tmp = to_path.with_extension("tmp"); let to_path_tmp = to_path.with_extension("tmp");
if let config::Transcode::Copy = args.transcode { rm_file_on_err(&to_path_tmp, async {
rm_file_on_err(&to_path_tmp, async { match args.transcode {
fs::copy(&from_path, &to_path_tmp).await.with_context(|| { Transcode::Copy => {
format!( fs::copy(&from_path, &to_path_tmp).await.with_context(|| {
"could not copy file from {} to {}", format!(
from_path.display(), "could not copy file from {} to {}",
to_path_tmp.display() from_path.display(),
to_path_tmp.display()
)
})?;
}
_ => {
to_path.set_extension(args.transcode.extension());
transcode_gstreamer(
&from_path,
&to_path_tmp,
args.transcode.clone(),
task_id,
queue,
) )
}) .await?
}
}
fs::rename(&to_path_tmp, &to_path).await.with_context(|| {
format!(
"could not rename temporary file {} to {}",
to_path_tmp.display(),
to_path.display()
)
}) })
.await?; })
.await
fs::rename(&to_path_tmp, &to_path).await?; }
return Ok(());
}
to_path.set_extension(args.transcode.extension());
async fn transcode_gstreamer(
from_path: &Path,
to_path: &Path,
transcode: Transcode,
task_id: usize,
queue: &ui::MsgQueue,
) -> Result<()> {
let file_src: Element = gmake("filesrc")?; let file_src: Element = gmake("filesrc")?;
file_src.set_property("location", &path_to_gstring(&from_path))?; file_src.set_property("location", &path_to_gstring(&from_path))?;
@@ -310,10 +334,7 @@ async fn transcode(
// downgrade pipeline RC to a weak RC to break the reference cycle // downgrade pipeline RC to a weak RC to break the reference cycle
let pipeline_weak = pipeline.downgrade(); let pipeline_weak = pipeline.downgrade();
let transcode_args = args.transcode.clone(); let to_path_clone = to_path.to_owned();
let to_path_tmp_clone = to_path_tmp.clone();
decodebin.connect_pad_added(move |decodebin, src_pad| { decodebin.connect_pad_added(move |decodebin, src_pad| {
let insert_sink = || -> Result<()> { let insert_sink = || -> Result<()> {
let pipeline = match pipeline_weak.upgrade() { let pipeline = match pipeline_weak.upgrade() {
@@ -354,8 +375,8 @@ async fn transcode(
gmake("audioconvert")?, gmake("audioconvert")?,
]; ];
match &transcode_args { match &transcode {
config::Transcode::Opus { Transcode::Opus {
bitrate, bitrate,
bitrate_type, bitrate_type,
} => { } => {
@@ -378,13 +399,13 @@ async fn transcode(
dest_elems.push(gmake("oggmux")?); dest_elems.push(gmake("oggmux")?);
} }
config::Transcode::Flac { compression } => { Transcode::Flac { compression } => {
let encoder: Element = gmake("flacenc")?; let encoder: Element = gmake("flacenc")?;
encoder.set_property_from_str("quality", &compression.to_string()); encoder.set_property_from_str("quality", &compression.to_string());
dest_elems.push(encoder); dest_elems.push(encoder);
} }
config::Transcode::Mp3 { Transcode::Mp3 {
bitrate, bitrate,
bitrate_type, bitrate_type,
} => { } => {
@@ -404,11 +425,14 @@ async fn transcode(
dest_elems.push(gmake("id3v2mux")?); dest_elems.push(gmake("id3v2mux")?);
} }
config::Transcode::Copy => unreachable!(), Transcode::Copy => {
// already handled outside this fn
unreachable!();
}
}; };
let file_dest: gstreamer_base::BaseSink = gmake("filesink")?; let file_dest: gstreamer_base::BaseSink = gmake("filesink")?;
file_dest.set_property("location", &path_to_gstring(&to_path_tmp_clone))?; file_dest.set_property("location", &path_to_gstring(&to_path_clone))?;
file_dest.set_sync(false); file_dest.set_sync(false);
dest_elems.push(file_dest.upcast()); dest_elems.push(file_dest.upcast());
@@ -446,125 +470,120 @@ async fn transcode(
let bus = pipeline.get_bus().context("pipe get bus")?; let bus = pipeline.get_bus().context("pipe get bus")?;
rm_file_on_err(&to_path_tmp, async { pipeline
pipeline .set_state(gstreamer::State::Playing)
.set_state(gstreamer::State::Playing) .context("Unable to set the pipeline to the `Playing` state")?;
.context("Unable to set the pipeline to the `Playing` state")?;
let stream_processor = async { let stream_processor = async {
bus.stream() bus.stream()
.map::<Result<bool>, _>(|msg| { .map::<Result<bool>, _>(|msg| {
use gstreamer::MessageView; use gstreamer::MessageView;
match msg.view() { match msg.view() {
// MessageView::Progress() => { // MessageView::Progress() => {
// } // }
MessageView::Eos(..) => { MessageView::Eos(..) => {
// we need to actively stop pulling the stream, that's because stream will // we need to actively stop pulling the stream, that's because stream will
// never end despite yielding an `Eos` message // never end despite yielding an `Eos` message
Ok(false) Ok(false)
}
MessageView::Error(err) => {
pipeline.set_state(gstreamer::State::Null).context(
"Unable to set the pipeline to the `Null` state, after error",
)?;
let err = err
.get_details()
.and_then(|details| {
if details.get_name() != "error-details" {
return None;
}
let err = details
.get::<&GBoxErrorWrapper>("error")
.unwrap()
.map(|err| err.clone().into())
.expect("error-details message without actual error");
Some(err)
})
.unwrap_or_else(|| {
GErrorMessage {
src: msg
.get_src()
.map(|s| String::from(s.get_path_string()))
.unwrap_or_else(|| String::from("None")),
error: err.get_error().to_string(),
debug: err.get_debug(),
source: err.get_error(),
}
.into()
});
Err(err)
}
_ => Ok(true),
} }
}) MessageView::Error(err) => {
.take_while(|e| { pipeline.set_state(gstreamer::State::Null).context(
if let Ok(false) = e { "Unable to set the pipeline to the `Null` state, after error",
futures::future::ready(false) )?;
} else {
futures::future::ready(true) let err = err
.get_details()
.and_then(|details| {
if details.get_name() != "error-details" {
return None;
}
let err = details
.get::<&GBoxErrorWrapper>("error")
.unwrap()
.map(|err| err.clone().into())
.expect("error-details message without actual error");
Some(err)
})
.unwrap_or_else(|| {
GErrorMessage {
src: msg
.get_src()
.map(|s| String::from(s.get_path_string()))
.unwrap_or_else(|| String::from("None")),
error: err.get_error().to_string(),
debug: err.get_debug(),
source: err.get_error(),
}
.into()
});
Err(err)
} }
}) _ => Ok(true),
.try_for_each(|_| futures::future::ready(Ok(()))) }
.await })
.context("failed converting")?; .take_while(|e| {
if let Ok(false) = e {
futures::future::ready(false)
} else {
futures::future::ready(true)
}
})
.try_for_each(|_| futures::future::ready(Ok(())))
.await
.context("failed converting")?;
Result::<_>::Ok(()) Result::<_>::Ok(())
}; };
pin_mut!(stream_processor); pin_mut!(stream_processor);
let mut progress_interval = interval(Duration::from_millis(ui::UPDATE_INTERVAL_MILLIS / 2)); let mut progress_interval = interval(Duration::from_millis(ui::UPDATE_INTERVAL_MILLIS / 2));
let progress_processor = async { let progress_processor = async {
use gstreamer::ClockTime; use gstreamer::ClockTime;
loop { loop {
progress_interval.tick().await; progress_interval.tick().await;
let dur = decodebin let dur = decodebin
.query_duration::<ClockTime>() .query_duration::<ClockTime>()
.and_then(|time| time.nanoseconds());
let ratio = dur.and_then(|dur| {
if dur == 0 {
return None;
}
let pos = decodebin
.query_position::<ClockTime>()
.and_then(|time| time.nanoseconds()); .and_then(|time| time.nanoseconds());
let ratio = dur.and_then(|dur| { pos.map(|pos| {
if dur == 0 { let ratio = pos as f64 / dur as f64;
return None; ratio.max(0.0).min(1.0)
} })
});
let pos = decodebin if let Some(ratio) = ratio {
.query_position::<ClockTime>() queue.push(ui::Msg::TaskProgress { id: task_id, ratio });
.and_then(|time| time.nanoseconds());
pos.map(|pos| {
let ratio = pos as f64 / dur as f64;
ratio.max(0.0).min(1.0)
})
});
if let Some(ratio) = ratio {
queue.push(ui::Msg::TaskProgress { id: task_id, ratio });
}
} }
}
#[allow(unreachable_code)] #[allow(unreachable_code)]
Result::<_>::Ok(()) Result::<_>::Ok(())
}; };
pin_mut!(progress_processor); pin_mut!(progress_processor);
future::try_select(stream_processor, progress_processor) future::try_select(stream_processor, progress_processor)
.await .await
.map_err(|err| err.factor_first().0)?; .map_err(|err| err.factor_first().0)?;
pipeline pipeline
.set_state(gstreamer::State::Null) .set_state(gstreamer::State::Null)
.context("Unable to set the pipeline to the `Null` state")?; .context("Unable to set the pipeline to the `Null` state")?;
fs::rename(&to_path_tmp, &to_path).await?; Ok(())
Ok(())
})
.await
} }
async fn rm_file_on_err<F, T>(path: &Path, f: F) -> Result<T> async fn rm_file_on_err<F, T>(path: &Path, f: F) -> Result<T>