diff --git a/Cargo.toml b/Cargo.toml index 6317d1b..300ef11 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,9 +5,9 @@ authors = ["Thomas Heck "] edition = "2018" [dependencies] -gstreamer-audio = "0.16" -gstreamer = "0.16" -gstreamer-base = "0.16" +gstreamer-audio = { version = "0.16", features = ["v1_10"] } +gstreamer = { version = "0.16", features = ["v1_10"] } +gstreamer-base = { version = "0.16", features = ["v1_10"] } glib = "0.10" futures = "0.3" num_cpus = "1" @@ -19,3 +19,4 @@ serde = { version = "1.0", features = ["derive"] } serde_yaml = "0.8" regex = "1" globset = "0.4" +derive_more = "0.99" diff --git a/shell.nix b/shell.nix index a35cafa..65289d9 100644 --- a/shell.nix +++ b/shell.nix @@ -17,6 +17,6 @@ in gst_all_1.gst-plugins-base # needed for flac - gst_all_1.gst-plugins-good + gst_all_1.gst-plugins-good ]; } diff --git a/src/main.rs b/src/main.rs index 55c2946..69f2dd6 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,18 +1,52 @@ mod config; use crate::config::Config; -use anyhow::{Context, Result}; +use anyhow::{Context, Error, Result}; use futures::{channel::mpsc, prelude::*}; -use glib::GString; -use gstreamer::Element; -use gstreamer_audio::{prelude::*, AudioEncoder}; +use glib::{subclass::prelude::*, GBoxed, GString}; +use gstreamer::{gst_element_error, prelude::*, Element}; +use gstreamer_audio::AudioEncoder; use gstreamer_base::prelude::*; use std::{ borrow::Cow, - ffi, + error::Error as StdError, + ffi, fmt, path::{Path, PathBuf}, + result::Result as StdResult, + sync::Arc, }; +#[derive(Clone, Debug, GBoxed)] +#[gboxed(type_name = "GBoxErrorWrapper")] +struct GBoxErrorWrapper(Arc); + +impl GBoxErrorWrapper { + fn new(err: Error) -> Self { + GBoxErrorWrapper(Arc::new(err)) + } +} + +impl StdError for GBoxErrorWrapper { + fn source(&self) -> Option<&(dyn StdError + 'static)> { + self.0.source() + } +} + +impl fmt::Display for GBoxErrorWrapper { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> StdResult<(), fmt::Error> { + self.0.fmt(f) + } +} + +#[derive(Debug, derive_more::Display, derive_more::Error)] +#[display(fmt = "Received error from {}: {} (debug: {:?})", src, error, debug)] +struct GErrorMessage { + src: String, + error: String, + debug: Option, + source: glib::Error, +} + fn gmake>(factory_name: &str) -> Result { let res = gstreamer::ElementFactory::make(factory_name, None) .with_context(|| format!("could not make gstreamer Element \"{}\"", factory_name))? @@ -28,6 +62,7 @@ fn gmake>(factory_name: &str) -> Result { Ok(res) } +#[derive(Debug, Clone)] struct ConvertionArgs { from: PathBuf, to: PathBuf, @@ -121,50 +156,124 @@ async fn transcode(args: &ConvertionArgs) -> Result<()> { // encode into a tmp file first, then rename to actuall file name, that way we're writing // "whole" files to the intended file path, ignoring partial files in the mtime check let tmp_dest = args.to.with_extension("tmp"); - let file_dest: gstreamer_base::BaseSink = gmake("filesink")?; - file_dest.set_property("location", &path_to_gstring(&tmp_dest))?; - file_dest.set_sync(false); - let resample: Element = gmake("audioresample")?; - // quality from 0 to 10 - resample.set_property("quality", &7)?; + let decodebin: Element = gmake("decodebin")?; - let encoder: AudioEncoder = gmake("opusenc")?; - - let config::Transcode::Opus { - bitrate, - bitrate_type, - } = &args.transcode; - encoder.set_property( - "bitrate", - &i32::from(*bitrate) - .checked_mul(1_000) - .context("bitrate overflowed")?, - )?; - encoder.set_property_from_str( - "bitrate-type", - match bitrate_type { - config::OpusBitrateType::Vbr => "1", - config::OpusBitrateType::Cbr => "0", - }, - ); - - let elems: &[&Element] = &[ - file_src.upcast_ref(), - &gmake("flacparse")?, - &gmake("flacdec")?, - &resample, - // `audioconvert` converts audio format, bitdepth, ... - &gmake("audioconvert")?, - encoder.upcast_ref(), - &gmake("oggmux")?, - file_dest.upcast_ref(), - ]; + let src_elems: &[&Element] = &[file_src.upcast_ref(), &decodebin]; let pipeline = gstreamer::Pipeline::new(None); - pipeline.add_many(elems)?; - Element::link_many(elems)?; + pipeline.add_many(src_elems)?; + Element::link_many(src_elems)?; + + // downgrade pipeline RC to a weak RC to break the reference cycle + let pipeline_weak = pipeline.downgrade(); + + let transcode_args = args.transcode.clone(); + + let tmp_dest_clone = tmp_dest.clone(); + + decodebin.connect_pad_added(move |decodebin, src_pad| { + let insert_sink = || -> Result<()> { + let pipeline = match pipeline_weak.upgrade() { + Some(pipeline) => pipeline, + None => { + // pipeline already destroyed... ignoring + return Ok(()); + } + }; + + let is_audio = src_pad.get_current_caps().and_then(|caps| { + caps.get_structure(0).map(|s| { + let name = s.get_name(); + name.starts_with("audio/") + }) + }); + match is_audio { + None => { + return Err(Error::msg(format!( + "Failed to get media type from pad {}", + src_pad.get_name() + ))); + } + Some(false) => { + // not audio pad... ignoring + return Ok(()); + } + Some(true) => {} + } + + let resample: Element = gmake("audioresample")?; + // quality from 0 to 10 + resample.set_property("quality", &7)?; + + let mut dest_elems = vec![ + resample, + // `audioconvert` converts audio format, bitdepth, ... + gmake("audioconvert")?, + ]; + + match &transcode_args { + config::Transcode::Opus { + bitrate, + bitrate_type, + } => { + let encoder: AudioEncoder = gmake("opusenc")?; + encoder.set_property( + "bitrate", + &i32::from(*bitrate) + .checked_mul(1_000) + .context("bitrate overflowed")?, + )?; + encoder.set_property_from_str( + "bitrate-type", + match bitrate_type { + config::OpusBitrateType::Vbr => "1", + config::OpusBitrateType::Cbr => "0", + }, + ); + + dest_elems.push(encoder.upcast()); + dest_elems.push(gmake("oggmux")?); + } + }; + + let file_dest: gstreamer_base::BaseSink = gmake("filesink")?; + file_dest.set_property("location", &path_to_gstring(&tmp_dest_clone))?; + file_dest.set_sync(false); + dest_elems.push(file_dest.upcast()); + + let dest_elem_refs: Vec<_> = dest_elems.iter().collect(); + pipeline.add_many(&dest_elem_refs)?; + Element::link_many(&dest_elem_refs)?; + + for e in &dest_elems { + e.sync_state_with_parent()?; + } + + let sink_pad = dest_elems + .get(0) + .unwrap() + .get_static_pad("sink") + .expect("1. dest element has no sinkpad"); + src_pad.link(&sink_pad)?; + + Ok(()) + }; + + if let Err(err) = insert_sink() { + let details = gstreamer::Structure::builder("error-details") + .field("error", &GBoxErrorWrapper::new(err)) + .build(); + + gst_element_error!( + decodebin, + gstreamer::LibraryError::Failed, + ("Failed to insert sink"), + details: details + ); + } + }); let bus = pipeline.get_bus().context("pipe get bus")?; @@ -180,14 +289,48 @@ async fn transcode(args: &ConvertionArgs) -> Result<()> { .context("Unable to set the pipeline to the `Playing` state")?; bus.stream() - .map(|msg| { + .map::, _>(|msg| { use gstreamer::MessageView; match msg.view() { - // we need to actively stop pulling the stream, that's because stream will - // never end despite yielding an `Eos` message - MessageView::Eos(..) => Ok(false), - MessageView::Error(err) => Err(err.get_error()), + MessageView::Eos(..) => { + // we need to actively stop pulling the stream, that's because stream will + // never end despite yielding an `Eos` message + Ok(false) + } + MessageView::Error(err) => { + pipeline.set_state(gstreamer::State::Null).context( + "Unable to set the pipeline to the `Null` state, after error", + )?; + + let err = err + .get_details() + .and_then(|details| { + if details.get_name() != "error-details" { + return None; + } + + let err = details + .get::<&GBoxErrorWrapper>("error") + .unwrap() + .map(|err| err.clone().into()) + .expect("error-details message without actual error"); + Some(err) + }) + .unwrap_or_else(|| { + GErrorMessage { + src: msg + .get_src() + .map(|s| String::from(s.get_path_string())) + .unwrap_or_else(|| String::from("None")), + error: err.get_error().to_string(), + debug: err.get_debug(), + source: err.get_error(), + } + .into() + }); + Err(err) + } _ => Ok(true), } }) @@ -199,7 +342,8 @@ async fn transcode(args: &ConvertionArgs) -> Result<()> { } }) .try_for_each(|_| futures::future::ready(Ok(()))) - .await?; + .await + .context("failed converting")?; pipeline .set_state(gstreamer::State::Null)