35 Commits
v1.0.0 ... wip

Author SHA1 Message Date
56fde73a40 WIP 2021-10-27 11:05:49 +02:00
3108aca6ba build: change editorconfig to use 2 space indentation for yaml files 2021-09-16 21:51:44 +02:00
09459615b8 build: update cargo deps 2021-09-16 21:44:57 +02:00
6872e7897b build: bump version (v1.2.2) 2021-08-25 17:11:23 +02:00
2d1497cb36 build: update deps 2021-08-25 17:09:33 +02:00
f1fb3506b5 build: update deps 2021-07-25 14:37:08 +02:00
f4050fe645 refactor: switch from space to tab indentation 2021-07-10 10:07:30 +02:00
b533f059d7 build: bump version (v1.2.1) 2021-07-05 20:02:49 +02:00
00a25e168d refactor: use more readable clamp method 2021-07-05 19:53:49 +02:00
b51c9939c1 build: upgrade deps 2021-07-05 19:46:06 +02:00
c22d45818e build: update deps 2021-06-18 11:46:48 +02:00
18cc852e6b build: update deps 2021-05-27 22:52:48 +02:00
65b4f398d9 build: bump version (v1.2.0) 2021-04-22 21:20:51 +02:00
7f40cb0581 build: update nix deps 2021-04-22 21:20:51 +02:00
bc15a4449d build: update cargo deps 2021-04-22 21:20:51 +02:00
1cf7cec8bd feat: add "jobs" cli argument 2021-04-22 21:20:51 +02:00
5cf98b3c17 feat: improve error messages 2021-04-21 00:57:23 +02:00
54e174eb0a build: include "README.md" & "CHANGELOG.md" to crate package 2021-04-21 00:26:28 +02:00
803860cce5 doc: add changelog 2021-04-21 00:26:21 +02:00
f2bfddd76e refactor: move actual transcoding into own fn 2021-04-17 01:53:25 +02:00
d073ef10b5 feat: add "copy" codec 2021-04-17 01:14:34 +02:00
399c4b8a2c build: update cargo deps 2021-04-17 00:21:47 +02:00
3188d074b7 build: update nix deps 2021-04-17 00:20:44 +02:00
5242aac566 bump version (v1.1.0) 2021-02-10 23:22:40 +01:00
49003c9983 add readme 2021-02-10 23:21:57 +01:00
530446bcd6 add flac dest format to example config 2021-02-10 23:18:24 +01:00
aa65b30873 update cargo deps 2021-02-10 22:22:06 +01:00
f1a71189c8 flake.lock: Update
Flake input changes:

* Updated 'nixpkgs': 'github:NixOS/nixpkgs/b881f100f65eacbbafdf65cff78c8d4104a1d04a' -> 'github:NixOS/nixpkgs/26f6af373ec1b867d751b56fb802f14010c8351b'
2021-02-10 22:15:31 +01:00
cc2ac9cb37 update nix deps 2021-02-02 13:27:15 +01:00
511f5d1237 update cargo deps 2021-02-02 13:26:33 +01:00
ff06358268 set resampling qulity to highest "10" 2021-02-02 13:22:14 +01:00
183e34c217 add flac encoding format 2021-01-15 21:18:27 +01:00
755f5dbd1d update nix deps 2020-12-31 01:07:44 +01:00
762064efd6 clean up 2020-12-29 21:31:33 +01:00
9959a26e48 only include needed src files in cargo pkg/publish 2020-12-28 18:46:47 +01:00
11 changed files with 1471 additions and 1172 deletions

View File

@@ -1,8 +1,12 @@
root = true
[*]
indent_style = space
indent_style = tab
indent_size = 4
charset = utf-8
trim_trailing_whitespace = true
insert_final_newline = true
[*.yaml]
indent_style = space
indent_size = 2

1
.rustfmt.toml Normal file
View File

@@ -0,0 +1 @@
hard_tabs = true

19
CHANGELOG.md Normal file
View File

@@ -0,0 +1,19 @@
# Changelog
## v1.2.2
* dependencies upgraded
## v1.2.1
* dependencies upgraded
## v1.2.0
* "copy" encoding format added
* "jobs" cli argument added, that lets you set the number of concurrent transcodes
## v1.1.0
* "flac" encoding format added
* resampling quality set to highest/"10"

494
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -1,6 +1,6 @@
[package]
name = "audio-conv"
version = "1.0.0"
version = "1.2.2"
edition = "2018"
description = "Copies directory structure and converts audio files in it"
authors = ["Thomas Heck <t@b128.net>"]
@@ -13,12 +13,18 @@ categories = [
"multimedia::encoding",
]
keywords = ["audio", "conversion", "opus", "flac"]
include = [
"/src/**/*",
"/example.audio-conv.yaml",
"/README.md",
"/CHANGELOG.md",
]
[dependencies]
gstreamer-audio = { version = "0.16", features = ["v1_10"] }
gstreamer = { version = "0.16", features = ["v1_10"] }
gstreamer-base = { version = "0.16", features = ["v1_10"] }
glib = "0.10"
gstreamer-audio = { version = "0.17", features = ["v1_10"] }
gstreamer = { version = "0.17", features = ["v1_10"] }
gstreamer-base = { version = "0.17", features = ["v1_10"] }
glib = "0.14"
futures = "0.3"
num_cpus = "1"
walkdir = "2"
@@ -30,7 +36,7 @@ serde_yaml = "0.8"
regex = "1"
globset = "0.4"
derive_more = "0.99"
tui = { version = "0.13", default-features = false, features = ["crossterm"] }
tui = { version = "0.16", default-features = false, features = ["crossterm"] }
[dependencies.tokio]
version = "1"

34
README.md Normal file
View File

@@ -0,0 +1,34 @@
# audio-conv
Takes two paths, all audio files encountered in the first path are transcoded and stored in the
second path. The directory structure from the first path gets also copied to the second path.
## Dependencies
Requires *gstreamer* version 1.10 or higher with the *base* plugin.
The supported source audio formats (or even other media that is able to contain audio) depend on
the installed *gstreamer* plugins.
## Installation via nix flakes
*audio-conv* can be easily installed via *nix flakes*:
```bash
$ nix profile install gitlab:chpio/audio-conv/release
```
## Generate example config
*audio-conv* is able to write an example config to your current directory:
```bash
$ audio-conv init
```
Now you need to edit the generated *audio-conv.yaml* file. And let it convert your audio files
by running it:
```bash
$ audio-conv
```

View File

@@ -19,3 +19,18 @@ matches:
# # one of: 8, 16, 24, 32, 40, 48, 56, 64, 80, 96, 112, 128, 160, 192, 224, 256 or 320
# bitrate: 256
# bitrate_type: vbr # or cbr
# for flac:
# to:
# codec: flac
# # effort spend for the compression. 0 (fastes compression) to 9 (highest compression)
# compression: 8
# copies the whole file without transcoding it or extracting audio from it. Using Copy on Write
# if supported by the filesystem.
# to:
# codec: copy
# extracts the audio without transcoding it
# to:
# codec: copyaudio

12
flake.lock generated
View File

@@ -2,11 +2,11 @@
"nodes": {
"flake-utils": {
"locked": {
"lastModified": 1605370193,
"narHash": "sha256-YyMTf3URDL/otKdKgtoMChu4vfVL3vCMkRqpGifhUn0=",
"lastModified": 1629481132,
"narHash": "sha256-JHgasjPR0/J1J3DRm4KxM4zTyAj4IOJY8vIl75v/kPI=",
"owner": "numtide",
"repo": "flake-utils",
"rev": "5021eac20303a61fafe17224c087f5519baed54d",
"rev": "997f7efcb746a9c140ce1f13c72263189225f482",
"type": "github"
},
"original": {
@@ -32,11 +32,11 @@
},
"nixpkgs": {
"locked": {
"lastModified": 1608633860,
"narHash": "sha256-AGJfdJCR5jfIt8PqGiENXRqhthrS3Gxy8Wzb3Z2GsS4=",
"lastModified": 1629897889,
"narHash": "sha256-YoY/umk+NUtLFJgvTJkup6nLJb+sGEZ21hrupKTp7EI=",
"owner": "NixOS",
"repo": "nixpkgs",
"rev": "da1b28ab8f361fbe14dc539cd69ce1bfd015fd68",
"rev": "6248814b6892af7dc0cf973b49690fd102088e02",
"type": "github"
},
"original": {

View File

@@ -12,6 +12,7 @@ pub struct Config {
pub from: PathBuf,
pub to: PathBuf,
pub matches: Vec<TranscodeMatch>,
pub jobs: Option<usize>,
}
#[derive(Debug)]
@@ -32,6 +33,12 @@ pub enum Transcode {
bitrate_type: BitrateType,
},
#[serde(rename = "flac")]
Flac {
#[serde(default = "default_flac_compression")]
compression: u8,
},
#[serde(rename = "mp3")]
Mp3 {
#[serde(default = "default_mp3_bitrate")]
@@ -40,13 +47,22 @@ pub enum Transcode {
#[serde(default = "bitrate_type_vbr")]
bitrate_type: BitrateType,
},
#[serde(rename = "copy")]
Copy,
#[serde(rename = "copyaudio")]
CopyAudio,
}
impl Transcode {
pub fn extension(&self) -> &'static str {
match self {
Transcode::Opus { .. } => "opus",
Transcode::Flac { .. } => "flac",
Transcode::Mp3 { .. } => "mp3",
Transcode::Copy => "",
Transcode::CopyAudio => "",
}
}
}
@@ -55,6 +71,10 @@ fn default_opus_bitrate() -> u16 {
160
}
fn default_flac_compression() -> u8 {
5
}
fn bitrate_type_vbr() -> BitrateType {
BitrateType::Vbr
}
@@ -112,7 +132,7 @@ pub fn config() -> Result<Config> {
.long("config")
.required(false)
.takes_value(true)
.help("path to an audio-conv config file, defaults to \"audio-conv.yaml\""),
.help("Path to an audio-conv config file, defaults to \"audio-conv.yaml\""),
)
.arg(
Arg::with_name("from")
@@ -120,7 +140,7 @@ pub fn config() -> Result<Config> {
.long("from")
.required(false)
.takes_value(true)
.help("from directory path"),
.help("\"from\" directory path"),
)
.arg(
Arg::with_name("to")
@@ -128,12 +148,20 @@ pub fn config() -> Result<Config> {
.long("to")
.required(false)
.takes_value(true)
.help("to directory path"),
.help("\"to\" directory path"),
)
.arg(
Arg::with_name("jobs")
.short("j")
.long("jobs")
.required(false)
.takes_value(true)
.help("Allow N jobs/transcodes at once. Defaults to number of logical cores"),
)
.subcommand(SubCommand::with_name("init").about("writes an example config"))
.get_matches();
let current_dir = std::env::current_dir().context("could not get current directory")?;
let current_dir = std::env::current_dir().context("Could not get current directory")?;
let config_path = arg_matches.value_of_os("config");
let force_load = config_path.is_some();
@@ -148,17 +176,17 @@ pub fn config() -> Result<Config> {
.create_new(true)
.open(&config_path)
.and_then(|mut f| f.write_all(std::include_bytes!("../example.audio-conv.yaml")))
.with_context(|| format!("unable to write config file to {}", config_path.display()))?;
.with_context(|| format!("Unable to write config file to {}", config_path.display()))?;
std::process::exit(0);
}
let config_dir = config_path
.parent()
.context("could not get parent directory of the config file")?;
.context("Could not get parent directory of the config file")?;
let config_file = load_config_file(&config_path)
.with_context(|| format!("failed loading config file \"{}\"", config_path.display()))?;
.with_context(|| format!("Failed loading config file {}", config_path.display()))?;
if force_load && config_file.is_none() {
return Err(Error::msg(format!(
@@ -170,7 +198,7 @@ pub fn config() -> Result<Config> {
let default_regex = RegexBuilder::new("\\.(flac|wav)$")
.case_insensitive(true)
.build()
.expect("failed compiling default match regex");
.expect("Failed compiling default match regex");
let transcode_matches = config_file
.as_ref()
@@ -183,8 +211,8 @@ pub fn config() -> Result<Config> {
let glob = GlobBuilder::new(glob)
.case_insensitive(true)
.build()
.context("failed building glob")?;
let regex = Regex::new(glob.regex()).context("failed compiling regex")?;
.context("Failed building glob")?;
let regex = Regex::new(glob.regex()).context("Failed compiling regex")?;
Ok(regex)
});
@@ -192,7 +220,7 @@ pub fn config() -> Result<Config> {
let regex = RegexBuilder::new(regex)
.case_insensitive(true)
.build()
.context("failed compiling regex")?;
.context("Failed compiling regex")?;
Ok(regex)
});
@@ -204,7 +232,7 @@ pub fn config() -> Result<Config> {
let regex = RegexBuilder::new(&ext)
.case_insensitive(true)
.build()
.context("failed compiling regex")?;
.context("Failed compiling regex")?;
Ok(regex)
});
@@ -247,7 +275,7 @@ pub fn config() -> Result<Config> {
})
.ok_or_else(|| Error::msg("\"from\" not configured"))?
.canonicalize()
.context("could not canonicalize \"from\" path")?
.context("Could not canonicalize \"from\" path")?
},
to: arg_matches
.value_of_os("to")
@@ -261,8 +289,26 @@ pub fn config() -> Result<Config> {
})
.ok_or_else(|| Error::msg("\"to\" not configured"))?
.canonicalize()
.context("could not canonicalize \"to\" path")?,
.context("Could not canonicalize \"to\" path")?,
matches: transcode_matches,
jobs: arg_matches
.value_of_os("jobs")
.map(|jobs_os_str| {
let jobs_str = jobs_os_str.to_str().with_context(|| {
// TODO: use `OsStr.display` when it lands
// https://github.com/rust-lang/rust/pull/80841
format!(
"Could not convert \"jobs\" argument to string due to invalid characters",
)
})?;
jobs_str.parse().with_context(|| {
format!(
"Could not parse \"jobs\" argument \"{}\" to a number",
&jobs_str
)
})
})
.transpose()?,
})
}
@@ -273,6 +319,6 @@ fn load_config_file(path: &Path) -> Result<Option<ConfigFile>> {
Err(err) => return Err(Error::new(err)),
};
let config: ConfigFile =
serde_yaml::from_reader(&mut file).context("could not parse config file")?;
serde_yaml::from_reader(&mut file).context("Could not parse config file")?;
Ok(Some(config))
}

View File

@@ -1,20 +1,24 @@
mod config;
mod ui;
use crate::config::Config;
use crate::config::{Config, Transcode};
use anyhow::{Context, Error, Result};
use futures::{future, pin_mut, prelude::*};
use glib::{subclass::prelude::*, GBoxed, GString};
use gstreamer::{gst_element_error, prelude::*, Element};
use futures::{pin_mut, prelude::*};
use glib::{GBoxed, GString};
use gstreamer::{
element_error, prelude::*, Caps, Element, Pad, PadBuilder, PadDirection, PadPresence,
PadTemplate, Structure,
};
use gstreamer_base::prelude::*;
use std::{
borrow::Cow,
collections::VecDeque,
error::Error as StdError,
ffi, fmt,
fmt::Write as FmtWrite,
path::{Path, PathBuf},
result::Result as StdResult,
sync::Arc,
sync::{Arc, Mutex},
time::Duration,
};
use tokio::{fs, io::AsyncWriteExt, task, time::interval};
@@ -52,12 +56,12 @@ struct GErrorMessage {
fn gmake<T: IsA<Element>>(factory_name: &str) -> Result<T> {
let res = gstreamer::ElementFactory::make(factory_name, None)
.with_context(|| format!("could not make gstreamer Element \"{}\"", factory_name))?
.with_context(|| format!("Could not make gstreamer Element \"{}\"", factory_name))?
.downcast()
.ok()
.with_context(|| {
format!(
"could not cast gstreamer Element \"{}\" into `{}`",
"Could not cast gstreamer Element \"{}\" into `{}`",
factory_name,
std::any::type_name::<T>()
)
@@ -68,7 +72,7 @@ fn gmake<T: IsA<Element>>(factory_name: &str) -> Result<T> {
#[derive(Debug, Clone)]
pub struct ConversionArgs {
rel_from_path: PathBuf,
transcode: config::Transcode,
transcode: Transcode,
}
fn get_conversion_args(config: &Config) -> impl Iterator<Item = Result<ConversionArgs>> + '_ {
@@ -97,33 +101,21 @@ fn get_conversion_args(config: &Config) -> impl Iterator<Item = Result<Conversio
let rel_path = e.path().strip_prefix(&config.from).with_context(|| {
format!(
"unable to get relative path for {} from {}",
"Unable to get relative path for {} from {}",
e.path().display(),
config.from.display()
)
})?;
let mut to = config.to.join(&rel_path);
to.set_extension(transcode.extension());
let is_newer = {
let from_mtime = e
.metadata()
.map_err(Error::new)
.and_then(|md| md.modified().map_err(Error::new))
.with_context(|| {
format!("unable to get mtime for from file {}", e.path().display())
})?;
let to_mtime = to.metadata().and_then(|md| md.modified());
match to_mtime {
Ok(to_mtime) => to_mtime < from_mtime,
Err(err) if err.kind() == std::io::ErrorKind::NotFound => true,
Err(err) => {
return Err(err).with_context(|| {
format!("unable to get mtime for to file {}", to.display())
})
}
}
let is_newer = if let Transcode::CopyAudio = transcode {
// we are doing the "is newer check" in the transcoder, because we do not know
// the file extension at this moment, which is derived from the audio type in
// the source file
true
} else {
let from_path = config.to.join(&rel_path);
let to_path = from_path.with_extension(transcode.extension());
is_file_newer(&from_path, &to_path)?
};
if is_newer {
@@ -147,15 +139,15 @@ async fn main() -> Result<()> {
let main_handle = async move {
let ok = task::spawn_local(main_loop(ui_queue))
.await
.context("main task failed")??;
.context("Main task failed")??;
Result::<_>::Ok(ok)
};
let ui_handle = async move {
let ok = task::spawn_local(ui_fut)
.await
.context("ui task failed")?
.context("ui failed")?;
.context("Ui task failed")?
.context("Ui failed")?;
Result::<_>::Ok(ok)
};
@@ -168,20 +160,20 @@ async fn main() -> Result<()> {
async fn main_loop(ui_queue: ui::MsgQueue) -> Result<()> {
let (config, conv_args) = task::spawn_blocking(|| -> Result<_> {
gstreamer::init()?;
let config = config::config().context("could not get the config")?;
let config = config::config().context("Could not get the config")?;
let conv_args = get_conversion_args(&config)
.collect::<Result<Vec<_>>>()
.context("failed loading dir structure")?;
.context("Failed loading dir structure")?;
Ok((config, conv_args))
})
.await
.context("init task failed")??;
.context("Init task failed")??;
let log_path = Path::new(".")
.canonicalize()
.context("unable to canonicalize path to log file")?
.context("Unable to canonicalize path to log file")?
.join("audio-conv.log");
ui_queue.push(ui::Msg::Init {
@@ -189,9 +181,11 @@ async fn main_loop(ui_queue: ui::MsgQueue) -> Result<()> {
log_path: log_path.clone(),
});
let concurrent_jobs = config.jobs.unwrap_or_else(|| num_cpus::get());
stream::iter(conv_args.into_iter().enumerate())
.map(Ok)
.try_for_each_concurrent(num_cpus::get(), |(i, args)| {
.try_for_each_concurrent(concurrent_jobs, |(i, args)| {
let config = &config;
let ui_queue = &ui_queue;
let log_path = &log_path;
@@ -206,7 +200,7 @@ async fn main_loop(ui_queue: ui::MsgQueue) -> Result<()> {
Ok(()) => ui_queue.push(ui::Msg::TaskEnd { id: i }),
Err(err) => {
let err = err.context(format!(
"failed transcoding \"{}\"",
"Transcoding failed for {}",
args.rel_from_path.display()
));
@@ -264,16 +258,70 @@ async fn transcode(
) -> Result<()> {
let from_path = config.from.join(&args.rel_from_path);
let mut to_path = config.to.join(&args.rel_from_path);
to_path.set_extension(args.transcode.extension());
let file_src: Element = gmake("filesrc")?;
file_src.set_property("location", &path_to_gstring(&from_path))?;
fs::create_dir_all(
to_path
.parent()
.with_context(|| format!("Could not get parent dir for {}", to_path.display()))?,
)
.await?;
// encode into a tmp file first, then rename to actuall file name, that way we're writing
// "whole" files to the intended file path, ignoring partial files in the mtime check
let to_path_tmp = to_path.with_extension("tmp");
let decodebin: Element = gmake("decodebin")?;
rm_file_on_err(&to_path_tmp, async {
let new_extension = match args.transcode {
Transcode::Copy => {
fs::copy(&from_path, &to_path_tmp).await.with_context(|| {
format!(
"Could not copy file from {} to {}",
from_path.display(),
to_path_tmp.display()
)
})?;
None
}
_ => {
to_path.set_extension(args.transcode.extension());
transcode_gstreamer(
&from_path,
&to_path_tmp,
args.transcode.clone(),
task_id,
queue,
)
.await?
}
};
if let Some(new_extension) = new_extension {
to_path.set_extension(new_extension);
}
fs::rename(&to_path_tmp, &to_path).await.with_context(|| {
format!(
"Could not rename temporary file {} to {}",
to_path_tmp.display(),
to_path.display()
)
})
})
.await
}
async fn transcode_gstreamer(
from_path: &Path,
to_path: &Path,
transcode: Transcode,
task_id: usize,
queue: &ui::MsgQueue,
) -> Result<Option<&'static str>> {
let file_src: Element = gmake("filesrc")?;
file_src.set_property("location", &path_to_gstring(&from_path))?;
let decodebin: Element = gmake("parsebin")?;
let src_elems: &[&Element] = &[&file_src, &decodebin];
@@ -285,10 +333,11 @@ async fn transcode(
// downgrade pipeline RC to a weak RC to break the reference cycle
let pipeline_weak = pipeline.downgrade();
let transcode_args = args.transcode.clone();
let to_path_tmp_clone = to_path_tmp.clone();
let new_extension = Arc::new(Mutex::new(None));
let new_extension_clone = new_extension.clone();
let from_path_clone = from_path.to_owned();
let to_path_clone = to_path.to_owned();
decodebin.connect_pad_added(move |decodebin, src_pad| {
let insert_sink = || -> Result<()> {
let pipeline = match pipeline_weak.upgrade() {
@@ -299,38 +348,32 @@ async fn transcode(
}
};
let is_audio = src_pad.get_current_caps().and_then(|caps| {
caps.get_structure(0).map(|s| {
let name = s.get_name();
name.starts_with("audio/")
let is_audio_mime = src_pad.current_caps().and_then(|caps| {
println!("{:?}", caps);
caps.structure(0).as_ref().map(|s| {
let name = s.name();
(name.starts_with("audio/"), name)
})
});
match is_audio {
let audio_mime = match is_audio_mime {
None => {
return Err(Error::msg(format!(
"Failed to get media type from pad {}",
src_pad.get_name()
src_pad.name()
)));
}
Some(false) => {
Some((false, ..)) => {
// not audio pad... ignoring
return Ok(());
}
Some(true) => {}
}
Some((true, mime)) => mime,
};
let resample: Element = gmake("audioresample")?;
// quality from 0 to 10
resample.set_property("quality", &7)?;
let mut dest_elems = VecDeque::new();
let mut dest_elems = vec![
resample,
// `audioconvert` converts audio format, bitdepth, ...
gmake("audioconvert")?,
];
match &transcode_args {
config::Transcode::Opus {
let is_transcoding = match &transcode {
Transcode::Opus {
bitrate,
bitrate_type,
} => {
@@ -339,7 +382,7 @@ async fn transcode(
"bitrate",
&i32::from(*bitrate)
.checked_mul(1_000)
.context("bitrate overflowed")?,
.context("Bitrate overflowed")?,
)?;
encoder.set_property_from_str(
"bitrate-type",
@@ -349,10 +392,19 @@ async fn transcode(
},
);
dest_elems.push(encoder);
dest_elems.push(gmake("oggmux")?);
dest_elems.push_back(encoder);
dest_elems.push_back(gmake("oggmux")?);
true
}
config::Transcode::Mp3 {
Transcode::Flac { compression } => {
let encoder: Element = gmake("flacenc")?;
encoder.set_property_from_str("quality", &compression.to_string());
dest_elems.push_back(encoder);
true
}
Transcode::Mp3 {
bitrate,
bitrate_type,
} => {
@@ -368,15 +420,85 @@ async fn transcode(
},
)?;
dest_elems.push(encoder);
dest_elems.push(gmake("id3v2mux")?);
dest_elems.push_back(encoder);
dest_elems.push_back(gmake("id3v2mux")?);
true
}
Transcode::Copy => {
// already handled outside this fn
unreachable!();
}
Transcode::CopyAudio => {
let (extension, mux) = match audio_mime {
"audio/ogg" | "audio/opus" | "audio/x-opus" => {
let mux: Element = gmake("oggmux")?;
// let caps = Caps::new_simple("audio/x-opus", &[]);
let template = PadTemplate::new(
"audio_%u",
PadDirection::Sink,
PadPresence::Request,
// &Caps::builder_full_with_any_features().structure(Structure::new("opus", "")).build()
&src_pad.current_caps().unwrap(),
)?;
// println!("{:?}", caps);
mux.add_pad(&Pad::from_template(&template, Some("audio_%u")))?;
("opus", Some(mux))
}
"audio/mpeg" => ("mp3", None),
"audio/flac" => ("flac", Some(gmake("oggmux")?)),
_ => {
return Err(Error::msg(format!(
"Unsupprted audio mime type \"{}\"",
audio_mime
)))
}
};
let is_newer = is_file_newer(
&from_path_clone,
&from_path_clone.with_extension(extension),
)?;
if !is_newer {
return Ok(());
}
if let Some(mux) = mux {
dest_elems.push_back(mux);
}
new_extension_clone
.lock()
.expect("Could not lock extension mutex")
.replace(extension);
false
}
};
if is_transcoding {
let resample: Element = gmake("audioresample")?;
// quality from 0 to 10
resample.set_property("quality", &10)?;
let elems = [gmake("decodebin")?, gmake("audioconvert")?, resample];
// reversed order because we are pushing to the front
for elem in IntoIterator::into_iter(elems).into_iter().rev() {
dest_elems.push_front(elem);
}
}
let file_dest: gstreamer_base::BaseSink = gmake("filesink")?;
file_dest.set_property("location", &path_to_gstring(&to_path_tmp_clone))?;
file_dest.set_property("location", &path_to_gstring(&to_path_clone))?;
file_dest.set_sync(false);
dest_elems.push(file_dest.upcast());
dest_elems.push_back(file_dest.upcast());
let dest_elem_refs: Vec<_> = dest_elems.iter().collect();
pipeline.add_many(&dest_elem_refs)?;
@@ -389,8 +511,9 @@ async fn transcode(
let sink_pad = dest_elems
.get(0)
.unwrap()
.get_static_pad("sink")
.expect("1. dest element has no sinkpad");
.static_pad("sink")
.or_else(|| dest_elems.get(0).unwrap().static_pad("audio_0"))
.context("1. dest element has no sinkpad")?;
src_pad.link(&sink_pad)?;
Ok(())
@@ -401,7 +524,7 @@ async fn transcode(
.field("error", &GBoxErrorWrapper::new(err))
.build();
gst_element_error!(
element_error!(
decodebin,
gstreamer::LibraryError::Failed,
("Failed to insert sink"),
@@ -410,16 +533,8 @@ async fn transcode(
}
});
let bus = pipeline.get_bus().context("pipe get bus")?;
let bus = pipeline.bus().context("Could not get bus for pipeline")?;
fs::create_dir_all(
to_path
.parent()
.with_context(|| format!("could not get parent dir for {}", to_path.display()))?,
)
.await?;
rm_file_on_err(&to_path_tmp, async {
pipeline
.set_state(gstreamer::State::Playing)
.context("Unable to set the pipeline to the `Playing` state")?;
@@ -439,37 +554,43 @@ async fn transcode(
Ok(false)
}
MessageView::Error(err) => {
pipeline.set_state(gstreamer::State::Null).context(
"Unable to set the pipeline to the `Null` state, after error",
)?;
let pipe_stop_res = pipeline.set_state(gstreamer::State::Null);
let err = err
.get_details()
let err: Error = err
.details()
.and_then(|details| {
if details.get_name() != "error-details" {
if details.name() != "error-details" {
return None;
}
let err = details
.get::<&GBoxErrorWrapper>("error")
.unwrap()
.map(|err| err.clone().into())
.expect("error-details message without actual error");
.clone()
.into();
Some(err)
})
.unwrap_or_else(|| {
GErrorMessage {
src: msg
.get_src()
.map(|s| String::from(s.get_path_string()))
.src()
.map(|s| String::from(s.path_string()))
.unwrap_or_else(|| String::from("None")),
error: err.get_error().to_string(),
debug: err.get_debug(),
source: err.get_error(),
error: err.error().to_string(),
debug: err.debug(),
source: err.error(),
}
.into()
});
if let Err(pipe_err) = pipe_stop_res {
let err = err.context(pipe_err).context(
"Unable to set the pipeline to the `Null` state, after error",
);
Err(err)
} else {
Err(err)
}
}
_ => Ok(true),
}
@@ -482,8 +603,7 @@ async fn transcode(
}
})
.try_for_each(|_| futures::future::ready(Ok(())))
.await
.context("failed converting")?;
.await?;
Result::<_>::Ok(())
};
@@ -498,7 +618,7 @@ async fn transcode(
let dur = decodebin
.query_duration::<ClockTime>()
.and_then(|time| time.nanoseconds());
.map(|time| time.nseconds());
let ratio = dur.and_then(|dur| {
if dur == 0 {
@@ -507,11 +627,11 @@ async fn transcode(
let pos = decodebin
.query_position::<ClockTime>()
.and_then(|time| time.nanoseconds());
.map(|time| time.nseconds());
pos.map(|pos| {
let ratio = pos as f64 / dur as f64;
ratio.max(0.0).min(1.0)
ratio.clamp(0.0, 1.0)
})
});
@@ -533,25 +653,47 @@ async fn transcode(
.set_state(gstreamer::State::Null)
.context("Unable to set the pipeline to the `Null` state")?;
fs::rename(&to_path_tmp, &to_path).await?;
Ok(())
})
.await
let mut new_extension = new_extension
.lock()
.expect("Could not lock extension mutex");
Ok(new_extension.take())
}
async fn rm_file_on_err<F, T>(path: &Path, f: F) -> F::Output
fn is_file_newer(from_path: &Path, to_path: &Path) -> Result<bool> {
let from_mtime = from_path
.metadata()
.map_err(Error::new)
.and_then(|md| md.modified().map_err(Error::new))
.with_context(|| {
format!(
"Unable to get mtime for \"from\" file {}",
from_path.display()
)
})?;
let to_mtime = to_path.metadata().and_then(|md| md.modified());
match to_mtime {
Ok(to_mtime) => Ok(to_mtime < from_mtime),
Err(err) if err.kind() == std::io::ErrorKind::NotFound => Ok(true),
Err(err) => {
return Err(err).with_context(|| {
format!("Unable to get mtime for \"to\" file {}", to_path.display())
})
}
}
}
async fn rm_file_on_err<F, T>(path: &Path, f: F) -> Result<T>
where
F: Future<Output = Result<T>>,
{
match f.await {
Err(err) => match fs::remove_file(path).await {
Ok(..) => Err(err),
Ok(()) => Err(err),
Err(fs_err) if fs_err.kind() == std::io::ErrorKind::NotFound => Err(err),
Err(fs_err) => {
let err = err
.context(fs_err)
.context(format!("removing {} failed", path.display()));
.context(format!("Removing file {} failed", path.display()));
Err(err)
}
},

View File

@@ -88,7 +88,7 @@ impl State {
Msg::TaskEnd { id } => {
self.running_tasks
.remove(&id)
.context("unable to remove finished task; could't find task")?;
.context("Unable to remove finished task; could't find task")?;
self.ended_tasks += 1;
}
Msg::TaskProgress { id, ratio } => {
@@ -102,7 +102,7 @@ impl State {
// TODO
self.running_tasks
.remove(&id)
.context("unable to remove errored task; could't find task")?;
.context("Unable to remove errored task; could't find task")?;
self.ended_tasks += 1;
self.has_errored = true;
}
@@ -136,7 +136,7 @@ impl State {
running_tasks.sort_by_key(|task| task.id);
if !self.has_rendered {
self.terminal.clear().context("cleaning ui failed")?;
self.terminal.clear().context("Clearing ui failed")?;
self.has_rendered = true;
}
@@ -222,7 +222,7 @@ impl State {
chunks[1],
);
})
.context("rendering ui failed")?;
.context("Rendering ui failed")?;
Ok(())
}
@@ -267,8 +267,8 @@ pub fn init() -> (MsgQueue, impl Future<Output = Result<()>>) {
}
})
.await
.context("ui update task failed")?
.context("ui update failed")?;
.context("Ui update task failed")?
.context("Ui update failed")?;
match render_res {
Some(s) => wrapped = Some(s),