From 8f1196aefe8f6215ce1b628b64bad59355996684 Mon Sep 17 00:00:00 2001 From: Thomas Heck Date: Sat, 22 Feb 2020 17:15:13 +0100 Subject: [PATCH] gstreamer rewrite --- Cargo.toml | 12 +- shell.nix | 32 +++-- src/main.rs | 398 ++++++++++++++++++++-------------------------------- 3 files changed, 176 insertions(+), 266 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index d8cc3d3..707243c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,9 +1,15 @@ [package] -name = "audio_conv" +name = "audio-conv" version = "0.0.0" authors = ["Thomas Heck "] edition = "2018" [dependencies] -stainless_ffmpeg = "0.1" -rayon = "1.0.2" +gstreamer-audio = "0.15" +gstreamer = "0.15" +gstreamer-base = "0.15" +glib = "0.9" +futures = "0.3" +num_cpus = "1" +walkdir = "2" +libc = "0.2" diff --git a/shell.nix b/shell.nix index 6e1dd70..9eb3be2 100644 --- a/shell.nix +++ b/shell.nix @@ -1,15 +1,17 @@ -with import {}; - -stdenv.mkDerivation { - name = "audio_conv"; - buildInputs = [ - stdenv - pkg-config - ffmpeg_4 - clang - cargo - rustc - rls - ]; - LIBCLANG_PATH="${llvmPackages.libclang}/lib"; -} +let + moz_overlay = import (builtins.fetchTarball https://github.com/mozilla/nixpkgs-mozilla/archive/master.tar.gz); + nixpkgs = import { overlays = [ moz_overlay ]; }; + rustpkgs = nixpkgs.rustChannels.stable; +in + with nixpkgs; + stdenv.mkDerivation { + name = "audio-conv"; + buildInputs = [ + rustpkgs.rust + rustpkgs.cargo + rustpkgs.rls-preview + rustpkgs.rustfmt-preview + gst_all_1.gstreamer + gst_all_1.gst-plugins-base + ]; + } diff --git a/src/main.rs b/src/main.rs index 742bc8b..b2e7a6b 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,277 +1,179 @@ -use std::env; -use std::path::Path; -use std::{fs, io}; - -use stainless_ffmpeg::{self as ffmpeg, codec, filter, format, frame, media}; -use rayon::prelude::*; +use futures::prelude::*; +use glib::error::{BoolError as GBoolError, Error as GError}; +use glib::translate::ToGlibPtr; +use gstreamer::Element; +use gstreamer_audio::{prelude::*, AudioDecoder, AudioEncoder}; +use gstreamer_base::prelude::*; +use std::borrow::Cow; +use std::path::{Path, PathBuf}; #[derive(Debug)] enum Error { - Io(io::Error), - Ffmpeg(ffmpeg::Error), - String(String), - Str(&'static str), -} - -impl From for Error { - fn from(v: ffmpeg::Error) -> Error { - Error::Ffmpeg(v) - } -} - -impl From for Error { - fn from(v: io::Error) -> Error { - Error::Io(v) - } + Str(Cow<'static, str>), + GBoolError(GBoolError), + GError(GError), } impl From for Error { - fn from(v: String) -> Error { - Error::String(v) + fn from(err: String) -> Error { + Error::Str(err.into()) } } impl From<&'static str> for Error { - fn from(v: &'static str) -> Error { - Error::Str(v) + fn from(err: &'static str) -> Error { + Error::Str(err.into()) } } -fn filter( - decoder: &codec::decoder::Audio, - encoder: &codec::encoder::Audio, -) -> Result { - let mut filter = filter::Graph::new(); - - let args = format!( - "time_base={}:sample_rate={}:sample_fmt={}:channel_layout=0x{:x}", - decoder.time_base(), - decoder.rate(), - decoder.format().name(), - decoder.channel_layout().bits() - ); - - filter.add(&filter::find("abuffer").unwrap(), "in", &args)?; - filter.add(&filter::find("abuffersink").unwrap(), "out", "")?; - - { - let mut out = filter.get("out").unwrap(); - - out.set_sample_format(encoder.format()); - out.set_channel_layout(encoder.channel_layout()); - out.set_sample_rate(encoder.rate()); +impl From for Error { + fn from(err: GBoolError) -> Error { + Error::GBoolError(err) } - - filter.output("in", 0)?.input("out", 0)?.parse("anull")?; - filter.validate()?; - - if let Some(codec) = encoder.codec() { - if !codec - .capabilities() - .contains(ffmpeg::codec::capabilities::VARIABLE_FRAME_SIZE) - { - filter - .get("out") - .unwrap() - .sink() - .set_frame_size(encoder.frame_size()); - } - } - - Ok(filter) } -struct Transcoder { - stream: usize, - filter: filter::Graph, - decoder: codec::decoder::Audio, - encoder: codec::encoder::Audio, +impl From for Error { + fn from(err: GError) -> Error { + Error::GError(err) + } } -fn transcoder( - ictx: &mut format::context::Input, - octx: &mut format::context::Output, -) -> Result { - let input = ictx - .streams() - .best(media::Type::Audio) - .expect("could not find best audio stream"); - let mut decoder = input.codec().decoder().audio()?; - - let codec = ffmpeg::encoder::find(octx.format().codec(&"", media::Type::Audio)) - .expect("failed to find encoder") - .audio()?; - let global = octx - .format() - .flags() - .contains(ffmpeg::format::flag::GLOBAL_HEADER); - - decoder.set_parameters(input.parameters())?; - - let mut output = octx.add_stream(codec)?; - let mut encoder = output.codec().encoder().audio()?; - - let channel_layout = codec - .channel_layouts() - .map(|cls| cls.best(decoder.channel_layout().channels())) - .unwrap_or(ffmpeg::channel_layout::STEREO); - - if global { - encoder.set_flags(ffmpeg::codec::flag::GLOBAL_HEADER); - } - - encoder.set_rate(48_000); //decoder.rate() as i32); - encoder.set_channel_layout(channel_layout); - encoder.set_channels(channel_layout.channels()); - encoder.set_format( - codec - .formats() - .expect("unknown supported formats") - .next() - .unwrap(), - ); - - encoder.set_time_base((1, 48_000)); - output.set_time_base((1, 48_000)); - - let mut encode_dict = ffmpeg::Dictionary::new(); - encode_dict.set("vbr", "on"); - encoder.set_bit_rate(192_000); - let encoder = encoder.open_as_with(codec, encode_dict)?; - output.set_parameters(&encoder); - - let filter = filter(&decoder, &encoder)?; - - Ok(Transcoder { - stream: input.index(), - filter, - decoder, - encoder, - }) +fn gmake>(factory_name: &str) -> Result { + let res = gstreamer::ElementFactory::make(factory_name, None) + // TODO: passthrough err source + .map_err(|_| format!("could not make \"{}\"", factory_name))? + .downcast() + .map_err(|_| { + format!( + "could not cast \"{}\" into `{}`", + factory_name, + std::any::type_name::() + ) + })?; + Ok(res) } -fn transcode(input: &Path, output: &Path) -> Result<(), Error> { - println!("{:?}", output); - - let mut ictx = format::input(&input)?; - let original_extension = output - .extension() - .expect("file without extension") - .to_string_lossy(); - let output_tmp = output.with_extension("tmp"); - let mut octx = format::output_as(&output_tmp, &original_extension)?; - let mut transcoder = transcoder(&mut ictx, &mut octx)?; - - octx.set_metadata(ictx.metadata().to_owned()); - octx.write_header()?; - - let in_time_base = transcoder.decoder.time_base(); - - let mut frame = frame::Audio::empty(); - let mut encoded = ffmpeg::Packet::empty(); - - for (stream, mut packet) in ictx.packets() { - if stream.index() != transcoder.stream { - continue; - } - - packet.rescale_ts(stream.time_base(), in_time_base); - - if let Ok(true) = transcoder.decoder.decode(&packet, &mut frame) { - transcoder.filter.get("in").unwrap().source().add(&frame)?; - - while let Ok(..) = transcoder - .filter - .get("out") - .unwrap() - .sink() - .frame(&mut frame) - { - if let Ok(true) = transcoder.encoder.encode(&frame, &mut encoded) { - encoded.set_stream(0); - encoded.write_interleaved(&mut octx)?; - } - unsafe { - ffmpeg::ffi::av_frame_unref(frame.as_mut_ptr()); - } - } - } - } - - transcoder.filter.get("in").unwrap().source().flush()?; - - while let Ok(..) = transcoder - .filter - .get("out") - .unwrap() - .sink() - .frame(&mut frame) - { - if let Ok(true) = transcoder.encoder.encode(&frame, &mut encoded) { - encoded.set_stream(0); - encoded.write_interleaved(&mut octx)?; - } - } - - if let Ok(true) = transcoder.encoder.flush(&mut encoded) { - encoded.set_stream(0); - encoded.write_interleaved(&mut octx)?; - } - - octx.write_trailer()?; - - fs::rename(output_tmp, output)?; - - Ok(()) -} - -fn transcode_path(input: &Path, output: &Path) -> Result<(), Error> { - input.read_dir()?.par_bridge().try_for_each(|entry| { - let entry = entry?; - let file_type = entry.file_type()?; - let new_input = input.join(entry.file_name()); - let mut new_output = output.join(entry.file_name()); - if file_type.is_dir() { - transcode_path(new_input.as_ref(), new_output.as_ref())?; - } else if file_type.is_file() { - if entry.path().extension().unwrap() != "flac" { - // println!("not flac input: {:?}", entry.path()); - return Ok(()); - } - fs::create_dir_all(&output)?; - new_output.set_extension("opus"); - let in_mtime = new_input.metadata()?.modified()?; - let out_mtime = new_output.metadata().and_then(|md| md.modified()); +fn get_paths(input: PathBuf, output: PathBuf) -> impl Iterator { + walkdir::WalkDir::new(input.as_path()) + .into_iter() + .filter_map(|e| e.ok()) + .filter(|e| e.file_type().is_file()) + .filter(|e| { + e.path() + .extension() + .map(|ext| ext == "flac") + .unwrap_or(false) + }) + .map(move |e| { + let mut out = output.join(e.path().strip_prefix(&input).unwrap()); + out.set_extension("opus"); + (e, out) + }) + .filter(|(e, out)| { + let in_mtime = e.metadata().unwrap().modified().unwrap(); + let out_mtime = out.metadata().and_then(|md| md.modified()); match out_mtime { - Ok(out_mtime) => { - if out_mtime < in_mtime { - transcode(new_input.as_ref(), new_output.as_ref())?; - } - } - Err(e) => { - if e.kind() == io::ErrorKind::NotFound { - transcode(new_input.as_ref(), new_output.as_ref())?; - } else { - return Err(e.into()); - } - } + Ok(out_mtime) => out_mtime < in_mtime, + Err(err) if err.kind() == std::io::ErrorKind::NotFound => true, + Err(err) => panic!(err), } - } else { - Err(format!( - "Unsupported file type `{:?}` (maybe symlink?)", - new_input - ))?; - } - Ok(()) - }) + }) + .map(|(e, out)| (e.into_path(), out)) + .inspect(|(_, out)| std::fs::create_dir_all(out.parent().unwrap()).unwrap()) } fn main() -> Result<(), Error> { - ffmpeg::init()?; + gstreamer::init().unwrap(); + let ctx = glib::MainContext::default(); + ctx.push_thread_default(); + let glib_loop = glib::MainLoop::new(Some(&ctx), false); - let input = env::args().nth(1).expect("missing input"); - let output = env::args().nth(2).expect("missing output"); - transcode_path(input.as_ref(), output.as_ref())?; + let input = std::env::args().nth(1).expect("missing input"); + let output = std::env::args().nth(2).expect("missing output"); + + let it = get_paths(input.into(), output.into()); + + let f = + futures::stream::iter(it).for_each_concurrent(num_cpus::get(), |(src, dest)| async move { + if let Err(err) = transcode(src.as_path(), dest.as_path()).await { + println!( + "err \"{}\" => \"{}\": {:?}", + src.to_string_lossy(), + dest.to_string_lossy(), + err + ); + } + }); + + ctx.spawn_local(f); + glib_loop.run(); + ctx.pop_thread_default(); + Ok(()) +} + +async fn transcode(src: &Path, dest: &Path) -> Result<(), Error> { + let file_src: gstreamer_base::BaseSrc = gmake("filesrc")?; + let src_cstring = ToGlibPtr::<*const libc::c_char>::to_glib_none(src).1; + let src_gstring = glib::GString::ForeignOwned(Some(src_cstring)); + file_src.set_property("location", &src_gstring)?; + + let file_dest: gstreamer_base::BaseSink = gmake("filesink")?; + let dest_cstring = ToGlibPtr::<*const libc::c_char>::to_glib_none(dest).1; + let dest_gstring = glib::GString::ForeignOwned(Some(dest_cstring)); + file_dest.set_property("location", &dest_gstring)?; + file_dest.set_sync(false); + + let parse: Element = gmake("flacparse")?; + let dec: AudioDecoder = gmake("flacdec")?; + let encoder: AudioEncoder = gmake("opusenc")?; + encoder.set_property("bitrate", &160_000)?; + // 0 = cbr; 1 = vbr + encoder.set_property_from_str("bitrate-type", "1"); + + let elems: &[&Element] = &[ + file_src.upcast_ref(), + &parse, + dec.upcast_ref(), + &gmake("audioresample")?, + encoder.upcast_ref(), + &gmake("oggmux")?, + file_dest.upcast_ref(), + ]; + + let pipeline = gstreamer::Pipeline::new(None); + pipeline.add_many(elems)?; + + Element::link_many(elems)?; + + let bus = pipeline.get_bus().ok_or("pipe get bus")?; + + pipeline + .set_state(gstreamer::State::Playing) + .map_err(|_| "Unable to set the pipeline to the `Playing` state")?; + + gstreamer::BusStream::new(&bus) + .map(|msg| { + use gstreamer::MessageView; + + match msg.view() { + MessageView::Eos(..) => Ok(false), + MessageView::Error(err) => Err(err.get_error()), + _ => Ok(true), + } + }) + .take_while(|e| { + if let Ok(true) = e { + futures::future::ready(true) + } else { + futures::future::ready(false) + } + }) + .try_for_each(|_| futures::future::ready(Ok(()))) + .await?; + + pipeline + .set_state(gstreamer::State::Null) + .map_err(|_| "Unable to set the pipeline to the `Null` state")?; Ok(()) }