diff --git a/src/main.rs b/src/main.rs index c9dc2bb..d4f717c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2,11 +2,11 @@ use futures::prelude::*; use glib::error::{BoolError as GBoolError, Error as GError}; use glib::translate::ToGlibPtr; use gstreamer::Element; -use gstreamer_audio::{prelude::*, AudioDecoder, AudioEncoder}; +use gstreamer_audio::{prelude::*, AudioEncoder}; use gstreamer_base::prelude::*; use std::borrow::Cow; -use std::path::{Path, PathBuf}; use std::io::Error as StdIoError; +use std::path::{Path, PathBuf}; #[derive(Debug)] enum Error { @@ -87,11 +87,10 @@ fn get_paths(input: PathBuf, output: PathBuf) -> impl Iterator Result<(), Error> { - gstreamer::init().unwrap(); + gstreamer::init()?; let ctx = glib::MainContext::default(); ctx.push_thread_default(); let glib_loop = glib::MainLoop::new(Some(&ctx), false); @@ -101,8 +100,9 @@ fn main() -> Result<(), Error> { let it = get_paths(input.into(), output.into()); - let f = - futures::stream::iter(it).for_each_concurrent(num_cpus::get(), |(src, dest)| async move { + let glib_loop_clone = glib_loop.clone(); + let f = futures::stream::iter(it) + .for_each_concurrent(num_cpus::get(), |(src, dest)| async move { if let Err(err) = transcode(src.as_path(), dest.as_path()).await { println!( "err \"{}\" => \"{}\": {:?}", @@ -111,8 +111,12 @@ fn main() -> Result<(), Error> { err ); } + }) + .then(move |_| { + // we're done, kill the loop + glib_loop_clone.quit(); + futures::future::ready(()) }); - ctx.spawn_local(f); glib_loop.run(); ctx.pop_thread_default(); @@ -125,17 +129,15 @@ async fn transcode(src: &Path, dest: &Path) -> Result<(), Error> { let src_gstring = glib::GString::ForeignOwned(Some(src_cstring)); file_src.set_property("location", &src_gstring)?; - let original_dest = dest; - let dest = dest.with_extension("tmp"); - + // encode into a tmp file first, then rename to actuall file name, that way we're writing + // "whole" files to the intended file path, ignoring partial files in the mtime check + let tmp_dest = dest.with_extension("tmp"); let file_dest: gstreamer_base::BaseSink = gmake("filesink")?; - let dest_cstring = ToGlibPtr::<*const libc::c_char>::to_glib_none(&dest).1; - let dest_gstring = glib::GString::ForeignOwned(Some(dest_cstring)); - file_dest.set_property("location", &dest_gstring)?; + let tmp_dest_cstring = ToGlibPtr::<*const libc::c_char>::to_glib_none(&tmp_dest).1; + let tmp_dest_gstring = glib::GString::ForeignOwned(Some(tmp_dest_cstring)); + file_dest.set_property("location", &tmp_dest_gstring)?; file_dest.set_sync(false); - let parse: Element = gmake("flacparse")?; - let dec: AudioDecoder = gmake("flacdec")?; let encoder: AudioEncoder = gmake("opusenc")?; encoder.set_property("bitrate", &160_000)?; // 0 = cbr; 1 = vbr @@ -143,8 +145,8 @@ async fn transcode(src: &Path, dest: &Path) -> Result<(), Error> { let elems: &[&Element] = &[ file_src.upcast_ref(), - &parse, - dec.upcast_ref(), + &gmake("flacparse")?, + &gmake("flacdec")?, &gmake("audioresample")?, encoder.upcast_ref(), &gmake("oggmux")?, @@ -158,6 +160,11 @@ async fn transcode(src: &Path, dest: &Path) -> Result<(), Error> { let bus = pipeline.get_bus().ok_or("pipe get bus")?; + std::fs::create_dir_all( + dest.parent() + .ok_or_else(|| format!("could not get parent dir for {}", dest.to_string_lossy()))?, + )?; + pipeline .set_state(gstreamer::State::Playing) .map_err(|_| "Unable to set the pipeline to the `Playing` state")?; @@ -186,7 +193,7 @@ async fn transcode(src: &Path, dest: &Path) -> Result<(), Error> { .set_state(gstreamer::State::Null) .map_err(|_| "Unable to set the pipeline to the `Null` state")?; - std::fs::rename(dest, original_dest)?; + std::fs::rename(tmp_dest, dest)?; Ok(()) }