58 Commits

Author SHA1 Message Date
5242aac566 bump version (v1.1.0) 2021-02-10 23:22:40 +01:00
49003c9983 add readme 2021-02-10 23:21:57 +01:00
530446bcd6 add flac dest format to example config 2021-02-10 23:18:24 +01:00
aa65b30873 update cargo deps 2021-02-10 22:22:06 +01:00
f1a71189c8 flake.lock: Update
Flake input changes:

* Updated 'nixpkgs': 'github:NixOS/nixpkgs/b881f100f65eacbbafdf65cff78c8d4104a1d04a' -> 'github:NixOS/nixpkgs/26f6af373ec1b867d751b56fb802f14010c8351b'
2021-02-10 22:15:31 +01:00
cc2ac9cb37 update nix deps 2021-02-02 13:27:15 +01:00
511f5d1237 update cargo deps 2021-02-02 13:26:33 +01:00
ff06358268 set resampling qulity to highest "10" 2021-02-02 13:22:14 +01:00
183e34c217 add flac encoding format 2021-01-15 21:18:27 +01:00
755f5dbd1d update nix deps 2020-12-31 01:07:44 +01:00
762064efd6 clean up 2020-12-29 21:31:33 +01:00
9959a26e48 only include needed src files in cargo pkg/publish 2020-12-28 18:46:47 +01:00
870f7fc1ea bump version (v1.0.0) 2020-12-28 15:53:56 +01:00
9dd803cbdf update cargo deps 2020-12-28 15:47:53 +01:00
7139573fcf add cargo metadata 2020-12-28 15:42:50 +01:00
35043094f9 fix typo 2020-12-28 15:32:18 +01:00
23e4382e1b update rust deps 2020-12-27 16:26:08 +01:00
ad02308008 add init subcommand
it writes out the example config into a file
2020-12-27 13:54:46 +01:00
5417edc24c remove unused fn 2020-12-27 13:11:10 +01:00
ed375eb6fd add missing error reporting 2020-12-27 12:38:06 +01:00
8b96f17cdf return Result from get_convertion_args instead of panicking on error 2020-12-27 01:46:17 +01:00
5099454991 remove unneeded cloning 2020-12-23 15:35:38 +01:00
901b0d8a4d allow omission of extensions in config file 2020-12-23 12:24:56 +01:00
3e4a9f1cbe add support for extensions matching 2020-12-22 18:37:15 +01:00
4af2e1504c add support for regex & glob pattern for same match entry 2020-12-22 18:37:15 +01:00
85e6a0cbeb add support for transcoding to mp3 2020-12-22 18:37:15 +01:00
61c275352d (nix): update deps 2020-12-22 12:19:48 +01:00
b673526098 limit task progress ratio range
to prevent ui crashes when the progress data requested from gstreamer is somehow broken
2020-12-19 16:29:37 +01:00
29a4ca3220 use tokios async fs api instead of blocking fs 2020-12-19 15:55:42 +01:00
acb0c5a0e9 fix panic in ui if there are no tasks on start 2020-12-19 13:45:32 +01:00
1349f9e01f fix failing init if log file is missing 2020-12-19 13:15:32 +01:00
b0c48d405e add ui 2020-12-19 13:02:20 +01:00
99a75fa4e1 (nix): use unstable nixpkgs for dev-environment 2020-12-02 10:15:45 +01:00
133196f3cd use nix flake & remove legacy shell.nix 2020-11-30 17:59:52 +01:00
1f57a5489d add Cargo.lock 2020-11-30 17:51:53 +01:00
0d8c02cafc fix typo 2020-11-29 19:33:49 +01:00
1bb1cf3a83 add "wav" as default matcher extension 2020-11-29 19:30:38 +01:00
e8d0324f30 use gstreamer Element instead of subclass where possible 2020-11-29 19:20:47 +01:00
b5190ed581 use gstreamer "decodebin" to auto-decode audio files 2020-11-29 19:13:52 +01:00
595fbbb3ed add support for glob matchers 2020-11-28 18:19:54 +01:00
458b7b9aa1 add input file matching through configs
and allow codec options for each matching
2020-11-28 16:53:00 +01:00
dac57fa4d1 fix relative path handling in config 2020-11-08 17:05:17 +01:00
cef350fca2 improve configurability 2020-11-08 16:21:27 +01:00
f1d3805e6b move blocking directory reading to external thread 2020-11-08 00:24:27 +01:00
938742ddf5 add editorconfig 2020-11-07 19:55:36 +01:00
45744e30d8 upgrade deps 2020-11-07 17:17:39 +01:00
94a09513c0 add comment 2020-05-26 12:42:40 +02:00
a5100020f6 use rust from nixpkgs 2020-05-26 12:06:04 +02:00
f7f6625585 remove tmp file on error 2020-03-14 14:03:31 +01:00
d84145b91f return error on missing input/output paths 2020-03-13 16:34:33 +01:00
1bdcebccb5 use *futures* executor instead of glib 2020-03-13 16:32:43 +01:00
fbd0b8b976 use *anyhow* for error handling 2020-03-13 14:57:47 +01:00
9fa135dc7b fix converting source files with different bitdepth than 16bit 2020-03-12 05:35:51 +01:00
7c7dc63ba9 add missing dep 2020-03-07 12:20:53 +01:00
0223f24fa5 do not ignore pipeline errors 2020-03-07 10:16:29 +01:00
2a2810a3aa clean up 2020-02-22 18:29:43 +01:00
f449efe273 write whole files 2020-02-22 17:23:22 +01:00
8f1196aefe gstreamer rewrite 2020-02-22 17:15:13 +01:00
12 changed files with 2406 additions and 256 deletions

8
.editorconfig Normal file
View File

@@ -0,0 +1,8 @@
root = true
[*]
indent_style = space
indent_size = 4
charset = utf-8
trim_trailing_whitespace = true
insert_final_newline = true

1
.gitignore vendored
View File

@@ -1,2 +1 @@
/target/
/Cargo.lock

1051
Cargo.lock generated Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -1,9 +1,42 @@
[package]
name = "audio_conv"
version = "0.0.0"
authors = ["Thomas Heck <t@b128.net>"]
name = "audio-conv"
version = "1.1.0"
edition = "2018"
description = "Copies directory structure and converts audio files in it"
authors = ["Thomas Heck <t@b128.net>"]
repository = "https://gitlab.com/chpio/audio-conv"
license = "MIT OR Apache-2.0"
categories = [
"command-line-utilities",
"multimedia",
"multimedia::audio",
"multimedia::encoding",
]
keywords = ["audio", "conversion", "opus", "flac"]
include = [
"/src/**/*",
"/example.audio-conv.yaml",
]
[dependencies]
stainless_ffmpeg = "0.1"
rayon = "1.0.2"
gstreamer-audio = { version = "0.16", features = ["v1_10"] }
gstreamer = { version = "0.16", features = ["v1_10"] }
gstreamer-base = { version = "0.16", features = ["v1_10"] }
glib = "0.10"
futures = "0.3"
num_cpus = "1"
walkdir = "2"
libc = "0.2"
anyhow = "1"
clap = "2"
serde = { version = "1.0", features = ["derive"] }
serde_yaml = "0.8"
regex = "1"
globset = "0.4"
derive_more = "0.99"
tui = { version = "0.14", default-features = false, features = ["crossterm"] }
[dependencies.tokio]
version = "1"
default-features = false
features = ["sync", "rt", "macros", "time", "fs", "io-util"]

34
README.md Normal file
View File

@@ -0,0 +1,34 @@
# audio-conv
Takes two paths, all audio files encountered in the first path are transcoded and stored in the
second path. The directory structure from the first path gets also copied to the second path.
## Dependencies
Requires *gstreamer* version 1.10 or higher with the *base* plugin.
The supported source audio formats (or even other media that is able to contain audio) depend on
the installed *gstreamer* plugins.
## Installation via nix flakes
*audio-conv* can be easily installed via *nix flakes*:
```bash
$ nix profile install gitlab:chpio/audio-conv/release
```
## Generate example config
*audio-conv* is able to write an example config to your current directory:
```bash
$ audio-conv init
```
Now you need to edit the generated *audio-conv.yaml* file. And let it convert your audio files
by running it:
```bash
$ audio-conv
```

27
example.audio-conv.yaml Normal file
View File

@@ -0,0 +1,27 @@
from: ./music
to: ./converted_test
matches:
- extensions:
- flac
- wav
# and/or `glob: "**/*.flac"`
# and/or `regex: "\.flac$"`
# you can also leave it empty for the default extensions
to:
codec: opus
bitrate: 160
bitrate_type: vbr # or cbr
# for mp3:
# to:
# codec: mp3
# # one of: 8, 16, 24, 32, 40, 48, 56, 64, 80, 96, 112, 128, 160, 192, 224, 256 or 320
# bitrate: 256
# bitrate_type: vbr # or cbr
# for flac:
# to:
# codec: flac
# # effort spend for the compression. 0 (fastes compression) to 9 (highest compression)
# compression: 8

58
flake.lock generated Normal file
View File

@@ -0,0 +1,58 @@
{
"nodes": {
"flake-utils": {
"locked": {
"lastModified": 1610051610,
"narHash": "sha256-U9rPz/usA1/Aohhk7Cmc2gBrEEKRzcW4nwPWMPwja4Y=",
"owner": "numtide",
"repo": "flake-utils",
"rev": "3982c9903e93927c2164caa727cd3f6a0e6d14cc",
"type": "github"
},
"original": {
"owner": "numtide",
"repo": "flake-utils",
"type": "github"
}
},
"import-cargo": {
"locked": {
"lastModified": 1594305518,
"narHash": "sha256-frtArgN42rSaEcEOYWg8sVPMUK+Zgch3c+wejcpX3DY=",
"owner": "edolstra",
"repo": "import-cargo",
"rev": "25d40be4a73d40a2572e0cc233b83253554f06c5",
"type": "github"
},
"original": {
"owner": "edolstra",
"repo": "import-cargo",
"type": "github"
}
},
"nixpkgs": {
"locked": {
"lastModified": 1612988144,
"narHash": "sha256-X1IO9gtzE0dRVpDqknjF39IVDnuKuZsRis38WnLfHLo=",
"owner": "NixOS",
"repo": "nixpkgs",
"rev": "26f6af373ec1b867d751b56fb802f14010c8351b",
"type": "github"
},
"original": {
"owner": "NixOS",
"repo": "nixpkgs",
"type": "github"
}
},
"root": {
"inputs": {
"flake-utils": "flake-utils",
"import-cargo": "import-cargo",
"nixpkgs": "nixpkgs"
}
}
},
"root": "root",
"version": 7
}

63
flake.nix Normal file
View File

@@ -0,0 +1,63 @@
{
description = "Converts audio files";
inputs = {
nixpkgs.url = github:NixOS/nixpkgs;
flake-utils.url = "github:numtide/flake-utils";
import-cargo.url = github:edolstra/import-cargo;
};
outputs = { self, flake-utils, nixpkgs, import-cargo }:
flake-utils.lib.eachDefaultSystem (system:
let
pkgs = import nixpkgs { inherit system; };
buildtimeDeps = with pkgs; [
cargo
rustc
pkg-config
];
runtimeDeps = with pkgs; [
gst_all_1.gstreamer
# needed for opus, resample, ...
gst_all_1.gst-plugins-base
# needed for flac
gst_all_1.gst-plugins-good
];
inherit (import-cargo.builders) importCargo;
in {
defaultPackage = pkgs.stdenv.mkDerivation {
name = "audio-conv";
src = self;
nativeBuildInputs = [
# setupHook which makes sure that a CARGO_HOME with vendored dependencies
# exists
(importCargo { lockFile = ./Cargo.lock; inherit pkgs; }).cargoHome
]
++ buildtimeDeps;
buildInputs = runtimeDeps;
buildPhase = ''
cargo build --release --offline
'';
installPhase = ''
install -Dm775 ./target/release/audio-conv $out/bin/audio-conv
'';
};
devShell = pkgs.stdenv.mkDerivation {
name = "audio-conv";
buildInputs = [ pkgs.rustfmt pkgs.rust-analyzer ]
++ buildtimeDeps
++ runtimeDeps;
};
}
);
}

View File

@@ -1,15 +0,0 @@
with import <nixpkgs> {};
stdenv.mkDerivation {
name = "audio_conv";
buildInputs = [
stdenv
pkg-config
ffmpeg_4
clang
cargo
rustc
rls
];
LIBCLANG_PATH="${llvmPackages.libclang}/lib";
}

289
src/config.rs Normal file
View File

@@ -0,0 +1,289 @@
use anyhow::{Context, Error, Result};
use globset::GlobBuilder;
use regex::bytes::{Regex, RegexBuilder};
use serde::Deserialize;
use std::{
io::Write,
path::{Path, PathBuf},
};
#[derive(Debug)]
pub struct Config {
pub from: PathBuf,
pub to: PathBuf,
pub matches: Vec<TranscodeMatch>,
}
#[derive(Debug)]
pub struct TranscodeMatch {
pub regexes: Vec<Regex>,
pub to: Transcode,
}
#[derive(Clone, Debug, Deserialize)]
#[serde(tag = "codec")]
pub enum Transcode {
#[serde(rename = "opus")]
Opus {
#[serde(default = "default_opus_bitrate")]
bitrate: u16,
#[serde(default = "bitrate_type_vbr")]
bitrate_type: BitrateType,
},
#[serde(rename = "flac")]
Flac {
#[serde(default = "default_flac_compression")]
compression: u8,
},
#[serde(rename = "mp3")]
Mp3 {
#[serde(default = "default_mp3_bitrate")]
bitrate: u16,
#[serde(default = "bitrate_type_vbr")]
bitrate_type: BitrateType,
},
}
impl Transcode {
pub fn extension(&self) -> &'static str {
match self {
Transcode::Opus { .. } => "opus",
Transcode::Flac { .. } => "flac",
Transcode::Mp3 { .. } => "mp3",
}
}
}
fn default_opus_bitrate() -> u16 {
160
}
fn default_flac_compression() -> u8 {
5
}
fn bitrate_type_vbr() -> BitrateType {
BitrateType::Vbr
}
fn default_mp3_bitrate() -> u16 {
256
}
impl Default for Transcode {
fn default() -> Self {
Transcode::Opus {
bitrate: default_opus_bitrate(),
bitrate_type: bitrate_type_vbr(),
}
}
}
#[derive(Clone, Debug, Deserialize)]
pub enum BitrateType {
#[serde(rename = "cbr")]
Cbr,
#[serde(rename = "vbr")]
Vbr,
}
#[derive(Debug, Default, Deserialize)]
struct ConfigFile {
from: Option<PathBuf>,
to: Option<PathBuf>,
#[serde(default)]
matches: Vec<TranscodeMatchFile>,
}
#[derive(Debug, Deserialize)]
struct TranscodeMatchFile {
glob: Option<String>,
regex: Option<String>,
#[serde(default)]
extensions: Vec<String>,
to: Transcode,
}
pub fn config() -> Result<Config> {
use clap::{App, Arg, SubCommand};
let arg_matches = App::new("audio-conv")
.version(clap::crate_version!())
.about("Converts audio files")
.arg(
Arg::with_name("config")
.short("c")
.long("config")
.required(false)
.takes_value(true)
.help("path to an audio-conv config file, defaults to \"audio-conv.yaml\""),
)
.arg(
Arg::with_name("from")
.short("f")
.long("from")
.required(false)
.takes_value(true)
.help("from directory path"),
)
.arg(
Arg::with_name("to")
.short("t")
.long("to")
.required(false)
.takes_value(true)
.help("to directory path"),
)
.subcommand(SubCommand::with_name("init").about("writes an example config"))
.get_matches();
let current_dir = std::env::current_dir().context("could not get current directory")?;
let config_path = arg_matches.value_of_os("config");
let force_load = config_path.is_some();
let config_path = config_path
.map(AsRef::<Path>::as_ref)
.unwrap_or_else(|| AsRef::<Path>::as_ref("audio-conv.yaml"));
let config_path = current_dir.join(config_path);
if let Some("init") = arg_matches.subcommand_name() {
std::fs::OpenOptions::new()
.write(true)
.create_new(true)
.open(&config_path)
.and_then(|mut f| f.write_all(std::include_bytes!("../example.audio-conv.yaml")))
.with_context(|| format!("unable to write config file to {}", config_path.display()))?;
std::process::exit(0);
}
let config_dir = config_path
.parent()
.context("could not get parent directory of the config file")?;
let config_file = load_config_file(&config_path)
.with_context(|| format!("failed loading config file \"{}\"", config_path.display()))?;
if force_load && config_file.is_none() {
return Err(Error::msg(format!(
"could not find config file \"{}\"",
config_path.display()
)));
}
let default_regex = RegexBuilder::new("\\.(flac|wav)$")
.case_insensitive(true)
.build()
.expect("failed compiling default match regex");
let transcode_matches = config_file
.as_ref()
.map(|config_file| {
config_file
.matches
.iter()
.map(|m| {
let glob = m.glob.iter().map(|glob| {
let glob = GlobBuilder::new(glob)
.case_insensitive(true)
.build()
.context("failed building glob")?;
let regex = Regex::new(glob.regex()).context("failed compiling regex")?;
Ok(regex)
});
let regex = m.regex.iter().map(|regex| {
let regex = RegexBuilder::new(regex)
.case_insensitive(true)
.build()
.context("failed compiling regex")?;
Ok(regex)
});
let extensions = m.extensions.iter().map(|ext| {
let mut ext = regex::escape(ext);
ext.insert_str(0, &"\\.");
ext.push_str("$");
let regex = RegexBuilder::new(&ext)
.case_insensitive(true)
.build()
.context("failed compiling regex")?;
Ok(regex)
});
let mut regexes = glob
.chain(regex)
.chain(extensions)
.collect::<Result<Vec<_>>>()?;
if regexes.is_empty() {
regexes.push(default_regex.clone());
}
Ok(TranscodeMatch {
regexes,
to: m.to.clone(),
})
})
.collect::<Result<Vec<_>>>()
})
.transpose()?
.filter(|matches| !matches.is_empty())
.unwrap_or_else(|| {
vec![TranscodeMatch {
regexes: vec![default_regex],
to: Transcode::default(),
}]
});
Ok(Config {
from: {
arg_matches
.value_of_os("from")
.map(|p| current_dir.join(p))
.or_else(|| {
config_file
.as_ref()
.map(|c| c.from.as_ref())
.flatten()
.map(|p| config_dir.join(p))
})
.ok_or_else(|| Error::msg("\"from\" not configured"))?
.canonicalize()
.context("could not canonicalize \"from\" path")?
},
to: arg_matches
.value_of_os("to")
.map(|p| current_dir.join(p))
.or_else(|| {
config_file
.as_ref()
.map(|c| c.to.as_ref())
.flatten()
.map(|p| config_dir.join(p))
})
.ok_or_else(|| Error::msg("\"to\" not configured"))?
.canonicalize()
.context("could not canonicalize \"to\" path")?,
matches: transcode_matches,
})
}
fn load_config_file(path: &Path) -> Result<Option<ConfigFile>> {
let mut file = match std::fs::File::open(path) {
Ok(file) => file,
Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(None),
Err(err) => return Err(Error::new(err)),
};
let config: ConfigFile =
serde_yaml::from_reader(&mut file).context("could not parse config file")?;
Ok(Some(config))
}

View File

@@ -1,277 +1,597 @@
use std::env;
use std::path::Path;
use std::{fs, io};
mod config;
mod ui;
use stainless_ffmpeg::{self as ffmpeg, codec, filter, format, frame, media};
use rayon::prelude::*;
use crate::config::Config;
use anyhow::{Context, Error, Result};
use futures::{pin_mut, prelude::*};
use glib::{subclass::prelude::*, GBoxed, GString};
use gstreamer::{gst_element_error, prelude::*, Element};
use gstreamer_base::prelude::*;
use std::{
borrow::Cow,
error::Error as StdError,
ffi, fmt,
fmt::Write as FmtWrite,
path::{Path, PathBuf},
result::Result as StdResult,
sync::Arc,
time::Duration,
};
use tokio::{fs, io::AsyncWriteExt, task, time::interval};
#[derive(Debug)]
enum Error {
Io(io::Error),
Ffmpeg(ffmpeg::Error),
String(String),
Str(&'static str),
}
#[derive(Clone, Debug, GBoxed)]
#[gboxed(type_name = "GBoxErrorWrapper")]
struct GBoxErrorWrapper(Arc<Error>);
impl From<ffmpeg::error::Error> for Error {
fn from(v: ffmpeg::Error) -> Error {
Error::Ffmpeg(v)
impl GBoxErrorWrapper {
fn new(err: Error) -> Self {
GBoxErrorWrapper(Arc::new(err))
}
}
impl From<io::Error> for Error {
fn from(v: io::Error) -> Error {
Error::Io(v)
impl StdError for GBoxErrorWrapper {
fn source(&self) -> Option<&(dyn StdError + 'static)> {
self.0.source()
}
}
impl From<String> for Error {
fn from(v: String) -> Error {
Error::String(v)
impl fmt::Display for GBoxErrorWrapper {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> StdResult<(), fmt::Error> {
self.0.fmt(f)
}
}
impl From<&'static str> for Error {
fn from(v: &'static str) -> Error {
Error::Str(v)
}
#[derive(Debug, derive_more::Display, derive_more::Error)]
#[display(fmt = "Received error from {}: {} (debug: {:?})", src, error, debug)]
struct GErrorMessage {
src: String,
error: String,
debug: Option<String>,
source: glib::Error,
}
fn filter(
decoder: &codec::decoder::Audio,
encoder: &codec::encoder::Audio,
) -> Result<filter::Graph, Error> {
let mut filter = filter::Graph::new();
let args = format!(
"time_base={}:sample_rate={}:sample_fmt={}:channel_layout=0x{:x}",
decoder.time_base(),
decoder.rate(),
decoder.format().name(),
decoder.channel_layout().bits()
);
filter.add(&filter::find("abuffer").unwrap(), "in", &args)?;
filter.add(&filter::find("abuffersink").unwrap(), "out", "")?;
{
let mut out = filter.get("out").unwrap();
out.set_sample_format(encoder.format());
out.set_channel_layout(encoder.channel_layout());
out.set_sample_rate(encoder.rate());
}
filter.output("in", 0)?.input("out", 0)?.parse("anull")?;
filter.validate()?;
if let Some(codec) = encoder.codec() {
if !codec
.capabilities()
.contains(ffmpeg::codec::capabilities::VARIABLE_FRAME_SIZE)
{
filter
.get("out")
.unwrap()
.sink()
.set_frame_size(encoder.frame_size());
}
}
Ok(filter)
fn gmake<T: IsA<Element>>(factory_name: &str) -> Result<T> {
let res = gstreamer::ElementFactory::make(factory_name, None)
.with_context(|| format!("could not make gstreamer Element \"{}\"", factory_name))?
.downcast()
.ok()
.with_context(|| {
format!(
"could not cast gstreamer Element \"{}\" into `{}`",
factory_name,
std::any::type_name::<T>()
)
})?;
Ok(res)
}
struct Transcoder {
stream: usize,
filter: filter::Graph,
decoder: codec::decoder::Audio,
encoder: codec::encoder::Audio,
#[derive(Debug, Clone)]
pub struct ConversionArgs {
rel_from_path: PathBuf,
transcode: config::Transcode,
}
fn transcoder(
ictx: &mut format::context::Input,
octx: &mut format::context::Output,
) -> Result<Transcoder, Error> {
let input = ictx
.streams()
.best(media::Type::Audio)
.expect("could not find best audio stream");
let mut decoder = input.codec().decoder().audio()?;
fn get_conversion_args(config: &Config) -> impl Iterator<Item = Result<ConversionArgs>> + '_ {
walkdir::WalkDir::new(&config.from)
.into_iter()
.filter_map(|e| e.ok())
.filter(|e| e.file_type().is_file())
.map(move |e| -> Result<Option<ConversionArgs>> {
let from_bytes = path_to_bytes(e.path());
let codec = ffmpeg::encoder::find(octx.format().codec(&"", media::Type::Audio))
.expect("failed to find encoder")
.audio()?;
let global = octx
.format()
.flags()
.contains(ffmpeg::format::flag::GLOBAL_HEADER);
let transcode = config
.matches
.iter()
.filter(|m| {
m.regexes
.iter()
.any(|regex| regex.is_match(from_bytes.as_ref()))
})
.map(|m| m.to.clone())
.next();
let transcode = if let Some(transcode) = transcode {
transcode
} else {
return Ok(None);
};
decoder.set_parameters(input.parameters())?;
let rel_path = e.path().strip_prefix(&config.from).with_context(|| {
format!(
"unable to get relative path for {} from {}",
e.path().display(),
config.from.display()
)
})?;
let mut output = octx.add_stream(codec)?;
let mut encoder = output.codec().encoder().audio()?;
let mut to = config.to.join(&rel_path);
to.set_extension(transcode.extension());
let channel_layout = codec
.channel_layouts()
.map(|cls| cls.best(decoder.channel_layout().channels()))
.unwrap_or(ffmpeg::channel_layout::STEREO);
if global {
encoder.set_flags(ffmpeg::codec::flag::GLOBAL_HEADER);
}
encoder.set_rate(48_000); //decoder.rate() as i32);
encoder.set_channel_layout(channel_layout);
encoder.set_channels(channel_layout.channels());
encoder.set_format(
codec
.formats()
.expect("unknown supported formats")
.next()
.unwrap(),
);
encoder.set_time_base((1, 48_000));
output.set_time_base((1, 48_000));
let mut encode_dict = ffmpeg::Dictionary::new();
encode_dict.set("vbr", "on");
encoder.set_bit_rate(192_000);
let encoder = encoder.open_as_with(codec, encode_dict)?;
output.set_parameters(&encoder);
let filter = filter(&decoder, &encoder)?;
Ok(Transcoder {
stream: input.index(),
filter,
decoder,
encoder,
})
}
fn transcode(input: &Path, output: &Path) -> Result<(), Error> {
println!("{:?}", output);
let mut ictx = format::input(&input)?;
let original_extension = output
.extension()
.expect("file without extension")
.to_string_lossy();
let output_tmp = output.with_extension("tmp");
let mut octx = format::output_as(&output_tmp, &original_extension)?;
let mut transcoder = transcoder(&mut ictx, &mut octx)?;
octx.set_metadata(ictx.metadata().to_owned());
octx.write_header()?;
let in_time_base = transcoder.decoder.time_base();
let mut frame = frame::Audio::empty();
let mut encoded = ffmpeg::Packet::empty();
for (stream, mut packet) in ictx.packets() {
if stream.index() != transcoder.stream {
continue;
}
packet.rescale_ts(stream.time_base(), in_time_base);
if let Ok(true) = transcoder.decoder.decode(&packet, &mut frame) {
transcoder.filter.get("in").unwrap().source().add(&frame)?;
while let Ok(..) = transcoder
.filter
.get("out")
.unwrap()
.sink()
.frame(&mut frame)
{
if let Ok(true) = transcoder.encoder.encode(&frame, &mut encoded) {
encoded.set_stream(0);
encoded.write_interleaved(&mut octx)?;
}
unsafe {
ffmpeg::ffi::av_frame_unref(frame.as_mut_ptr());
let is_newer = {
let from_mtime = e
.metadata()
.map_err(Error::new)
.and_then(|md| md.modified().map_err(Error::new))
.with_context(|| {
format!("unable to get mtime for from file {}", e.path().display())
})?;
let to_mtime = to.metadata().and_then(|md| md.modified());
match to_mtime {
Ok(to_mtime) => to_mtime < from_mtime,
Err(err) if err.kind() == std::io::ErrorKind::NotFound => true,
Err(err) => {
return Err(err).with_context(|| {
format!("unable to get mtime for to file {}", to.display())
})
}
}
};
if is_newer {
Ok(Some(ConversionArgs {
rel_from_path: rel_path.to_path_buf(),
transcode,
}))
} else {
Ok(None)
}
}
}
})
.filter_map(|e| e.transpose())
}
transcoder.filter.get("in").unwrap().source().flush()?;
#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<()> {
task::LocalSet::new()
.run_until(async move {
let (ui_queue, ui_fut) = ui::init();
while let Ok(..) = transcoder
.filter
.get("out")
.unwrap()
.sink()
.frame(&mut frame)
{
if let Ok(true) = transcoder.encoder.encode(&frame, &mut encoded) {
encoded.set_stream(0);
encoded.write_interleaved(&mut octx)?;
}
}
let main_handle = async move {
let ok = task::spawn_local(main_loop(ui_queue))
.await
.context("main task failed")??;
Result::<_>::Ok(ok)
};
if let Ok(true) = transcoder.encoder.flush(&mut encoded) {
encoded.set_stream(0);
encoded.write_interleaved(&mut octx)?;
}
let ui_handle = async move {
let ok = task::spawn_local(ui_fut)
.await
.context("ui task failed")?
.context("ui failed")?;
Result::<_>::Ok(ok)
};
octx.write_trailer()?;
future::try_join(main_handle, ui_handle).await?;
Ok(())
})
.await
}
fs::rename(output_tmp, output)?;
async fn main_loop(ui_queue: ui::MsgQueue) -> Result<()> {
let (config, conv_args) = task::spawn_blocking(|| -> Result<_> {
gstreamer::init()?;
let config = config::config().context("could not get the config")?;
let conv_args = get_conversion_args(&config)
.collect::<Result<Vec<_>>>()
.context("failed loading dir structure")?;
Ok((config, conv_args))
})
.await
.context("init task failed")??;
let log_path = Path::new(".")
.canonicalize()
.context("unable to canonicalize path to log file")?
.join("audio-conv.log");
ui_queue.push(ui::Msg::Init {
task_len: conv_args.len(),
log_path: log_path.clone(),
});
stream::iter(conv_args.into_iter().enumerate())
.map(Ok)
.try_for_each_concurrent(num_cpus::get(), |(i, args)| {
let config = &config;
let ui_queue = &ui_queue;
let log_path = &log_path;
async move {
ui_queue.push(ui::Msg::TaskStart {
id: i,
args: args.clone(),
});
match transcode(config, &args, i, ui_queue).await {
Ok(()) => ui_queue.push(ui::Msg::TaskEnd { id: i }),
Err(err) => {
let err = err.context(format!(
"failed transcoding \"{}\"",
args.rel_from_path.display()
));
let mut log_file = match fs::OpenOptions::new()
.create(true)
.append(true)
.open(log_path)
.await
{
Ok(log_file) => log_file,
Err(fs_err) => {
let err = err.context(fs_err).context("Unable to open log file");
return Err(err);
}
};
let mut err_str = String::new();
if let Err(write_err) = write!(&mut err_str, "{:?}\n", err) {
let err = err.context(format!(
"Unable to format transcoding error for logging (write error: {})",
write_err
));
return Err(err);
}
log_file
.write_all(err_str.as_ref())
.await
.map_err(|fs_err| {
err.context(format!(
"Unable to write transcoding error to log file (fs error: {})",
fs_err
))
})?;
ui_queue.push(ui::Msg::TaskError { id: i });
}
}
Result::<_>::Ok(())
}
})
.await?;
ui_queue.push(ui::Msg::Exit);
Ok(())
}
fn transcode_path(input: &Path, output: &Path) -> Result<(), Error> {
input.read_dir()?.par_bridge().try_for_each(|entry| {
let entry = entry?;
let file_type = entry.file_type()?;
let new_input = input.join(entry.file_name());
let mut new_output = output.join(entry.file_name());
if file_type.is_dir() {
transcode_path(new_input.as_ref(), new_output.as_ref())?;
} else if file_type.is_file() {
if entry.path().extension().unwrap() != "flac" {
// println!("not flac input: {:?}", entry.path());
return Ok(());
}
fs::create_dir_all(&output)?;
new_output.set_extension("opus");
let in_mtime = new_input.metadata()?.modified()?;
let out_mtime = new_output.metadata().and_then(|md| md.modified());
match out_mtime {
Ok(out_mtime) => {
if out_mtime < in_mtime {
transcode(new_input.as_ref(), new_output.as_ref())?;
}
async fn transcode(
config: &Config,
args: &ConversionArgs,
task_id: usize,
queue: &ui::MsgQueue,
) -> Result<()> {
let from_path = config.from.join(&args.rel_from_path);
let mut to_path = config.to.join(&args.rel_from_path);
to_path.set_extension(args.transcode.extension());
let file_src: Element = gmake("filesrc")?;
file_src.set_property("location", &path_to_gstring(&from_path))?;
// encode into a tmp file first, then rename to actuall file name, that way we're writing
// "whole" files to the intended file path, ignoring partial files in the mtime check
let to_path_tmp = to_path.with_extension("tmp");
let decodebin: Element = gmake("decodebin")?;
let src_elems: &[&Element] = &[&file_src, &decodebin];
let pipeline = gstreamer::Pipeline::new(None);
pipeline.add_many(src_elems)?;
Element::link_many(src_elems)?;
// downgrade pipeline RC to a weak RC to break the reference cycle
let pipeline_weak = pipeline.downgrade();
let transcode_args = args.transcode.clone();
let to_path_tmp_clone = to_path_tmp.clone();
decodebin.connect_pad_added(move |decodebin, src_pad| {
let insert_sink = || -> Result<()> {
let pipeline = match pipeline_weak.upgrade() {
Some(pipeline) => pipeline,
None => {
// pipeline already destroyed... ignoring
return Ok(());
}
Err(e) => {
if e.kind() == io::ErrorKind::NotFound {
transcode(new_input.as_ref(), new_output.as_ref())?;
} else {
return Err(e.into());
}
};
let is_audio = src_pad.get_current_caps().and_then(|caps| {
caps.get_structure(0).map(|s| {
let name = s.get_name();
name.starts_with("audio/")
})
});
match is_audio {
None => {
return Err(Error::msg(format!(
"Failed to get media type from pad {}",
src_pad.get_name()
)));
}
Some(false) => {
// not audio pad... ignoring
return Ok(());
}
Some(true) => {}
}
} else {
Err(format!(
"Unsupported file type `{:?}` (maybe symlink?)",
new_input
))?;
let resample: Element = gmake("audioresample")?;
// quality from 0 to 10
resample.set_property("quality", &10)?;
let mut dest_elems = vec![
resample,
// `audioconvert` converts audio format, bitdepth, ...
gmake("audioconvert")?,
];
match &transcode_args {
config::Transcode::Opus {
bitrate,
bitrate_type,
} => {
let encoder: Element = gmake("opusenc")?;
encoder.set_property(
"bitrate",
&i32::from(*bitrate)
.checked_mul(1_000)
.context("bitrate overflowed")?,
)?;
encoder.set_property_from_str(
"bitrate-type",
match bitrate_type {
config::BitrateType::Vbr => "1",
config::BitrateType::Cbr => "0",
},
);
dest_elems.push(encoder);
dest_elems.push(gmake("oggmux")?);
}
config::Transcode::Flac { compression } => {
let encoder: Element = gmake("flacenc")?;
encoder.set_property_from_str("quality", &compression.to_string());
dest_elems.push(encoder);
}
config::Transcode::Mp3 {
bitrate,
bitrate_type,
} => {
let encoder: Element = gmake("lamemp3enc")?;
// target: "1" = "bitrate"
encoder.set_property_from_str("target", "1");
encoder.set_property("bitrate", &i32::from(*bitrate))?;
encoder.set_property(
"cbr",
match bitrate_type {
config::BitrateType::Vbr => &false,
config::BitrateType::Cbr => &true,
},
)?;
dest_elems.push(encoder);
dest_elems.push(gmake("id3v2mux")?);
}
};
let file_dest: gstreamer_base::BaseSink = gmake("filesink")?;
file_dest.set_property("location", &path_to_gstring(&to_path_tmp_clone))?;
file_dest.set_sync(false);
dest_elems.push(file_dest.upcast());
let dest_elem_refs: Vec<_> = dest_elems.iter().collect();
pipeline.add_many(&dest_elem_refs)?;
Element::link_many(&dest_elem_refs)?;
for e in &dest_elems {
e.sync_state_with_parent()?;
}
let sink_pad = dest_elems
.get(0)
.unwrap()
.get_static_pad("sink")
.expect("1. dest element has no sinkpad");
src_pad.link(&sink_pad)?;
Ok(())
};
if let Err(err) = insert_sink() {
let details = gstreamer::Structure::builder("error-details")
.field("error", &GBoxErrorWrapper::new(err))
.build();
gst_element_error!(
decodebin,
gstreamer::LibraryError::Failed,
("Failed to insert sink"),
details: details
);
}
});
let bus = pipeline.get_bus().context("pipe get bus")?;
fs::create_dir_all(
to_path
.parent()
.with_context(|| format!("could not get parent dir for {}", to_path.display()))?,
)
.await?;
rm_file_on_err(&to_path_tmp, async {
pipeline
.set_state(gstreamer::State::Playing)
.context("Unable to set the pipeline to the `Playing` state")?;
let stream_processor = async {
bus.stream()
.map::<Result<bool>, _>(|msg| {
use gstreamer::MessageView;
match msg.view() {
// MessageView::Progress() => {
// }
MessageView::Eos(..) => {
// we need to actively stop pulling the stream, that's because stream will
// never end despite yielding an `Eos` message
Ok(false)
}
MessageView::Error(err) => {
pipeline.set_state(gstreamer::State::Null).context(
"Unable to set the pipeline to the `Null` state, after error",
)?;
let err = err
.get_details()
.and_then(|details| {
if details.get_name() != "error-details" {
return None;
}
let err = details
.get::<&GBoxErrorWrapper>("error")
.unwrap()
.map(|err| err.clone().into())
.expect("error-details message without actual error");
Some(err)
})
.unwrap_or_else(|| {
GErrorMessage {
src: msg
.get_src()
.map(|s| String::from(s.get_path_string()))
.unwrap_or_else(|| String::from("None")),
error: err.get_error().to_string(),
debug: err.get_debug(),
source: err.get_error(),
}
.into()
});
Err(err)
}
_ => Ok(true),
}
})
.take_while(|e| {
if let Ok(false) = e {
futures::future::ready(false)
} else {
futures::future::ready(true)
}
})
.try_for_each(|_| futures::future::ready(Ok(())))
.await
.context("failed converting")?;
Result::<_>::Ok(())
};
pin_mut!(stream_processor);
let mut progress_interval = interval(Duration::from_millis(ui::UPDATE_INTERVAL_MILLIS / 2));
let progress_processor = async {
use gstreamer::ClockTime;
loop {
progress_interval.tick().await;
let dur = decodebin
.query_duration::<ClockTime>()
.and_then(|time| time.nanoseconds());
let ratio = dur.and_then(|dur| {
if dur == 0 {
return None;
}
let pos = decodebin
.query_position::<ClockTime>()
.and_then(|time| time.nanoseconds());
pos.map(|pos| {
let ratio = pos as f64 / dur as f64;
ratio.max(0.0).min(1.0)
})
});
if let Some(ratio) = ratio {
queue.push(ui::Msg::TaskProgress { id: task_id, ratio });
}
}
#[allow(unreachable_code)]
Result::<_>::Ok(())
};
pin_mut!(progress_processor);
future::try_select(stream_processor, progress_processor)
.await
.map_err(|err| err.factor_first().0)?;
pipeline
.set_state(gstreamer::State::Null)
.context("Unable to set the pipeline to the `Null` state")?;
fs::rename(&to_path_tmp, &to_path).await?;
Ok(())
})
.await
}
fn main() -> Result<(), Error> {
ffmpeg::init()?;
let input = env::args().nth(1).expect("missing input");
let output = env::args().nth(2).expect("missing output");
transcode_path(input.as_ref(), output.as_ref())?;
Ok(())
async fn rm_file_on_err<F, T>(path: &Path, f: F) -> Result<T>
where
F: Future<Output = Result<T>>,
{
match f.await {
Err(err) => match fs::remove_file(path).await {
Ok(()) => Err(err),
Err(fs_err) if fs_err.kind() == std::io::ErrorKind::NotFound => Err(err),
Err(fs_err) => {
let err = err
.context(fs_err)
.context(format!("removing {} failed", path.display()));
Err(err)
}
},
res @ Ok(..) => res,
}
}
fn path_to_bytes(path: &Path) -> Cow<'_, [u8]> {
// https://stackoverflow.com/a/59224987/5572146
#[cfg(unix)]
{
use std::os::unix::ffi::OsStrExt;
Cow::Borrowed(path.as_os_str().as_bytes())
}
#[cfg(windows)]
{
// NOT TESTED
// FIXME: test and post answer to https://stackoverflow.com/questions/38948669
use std::os::windows::ffi::OsStrExt;
let buf: Vec<u8> = path
.as_os_str()
.encode_wide()
.map(|char| char.to_ne_bytes())
.flatten()
.collect();
Cow::Owned(buf)
}
}
fn path_to_gstring(path: &Path) -> GString {
let buf = path_to_bytes(path);
ffi::CString::new(buf)
.expect("Path contained null byte")
.into()
}

283
src/ui.rs Normal file
View File

@@ -0,0 +1,283 @@
use crate::ConversionArgs;
use anyhow::{Context, Result};
use futures::Future;
use std::{
borrow::Cow, cell::RefCell, collections::HashMap, io, mem, path::PathBuf, rc::Rc,
time::Duration,
};
use tokio::{task, time::interval};
use tui::{backend::CrosstermBackend, Terminal};
pub const UPDATE_INTERVAL_MILLIS: u64 = 100;
#[derive(Debug)]
pub enum Msg {
Init { task_len: usize, log_path: PathBuf },
Exit,
TaskStart { id: usize, args: ConversionArgs },
TaskEnd { id: usize },
TaskProgress { id: usize, ratio: f64 },
TaskError { id: usize },
}
#[derive(Debug, Clone)]
pub struct MsgQueue {
inner: Rc<RefCell<Vec<Msg>>>,
}
impl MsgQueue {
fn new() -> MsgQueue {
MsgQueue {
inner: Rc::new(RefCell::new(Vec::new())),
}
}
pub fn push(&self, msg: Msg) {
self.inner.borrow_mut().push(msg);
}
fn swap_inner(&self, other: &mut Vec<Msg>) {
let mut inner = self.inner.borrow_mut();
mem::swap(&mut *inner, other)
}
}
struct State {
terminal: Terminal<CrosstermBackend<io::Stdout>>,
log_path: Option<PathBuf>,
task_len: Option<usize>,
ended_tasks: usize,
running_tasks: HashMap<usize, Task>,
has_rendered: bool,
has_errored: bool,
}
impl State {
fn new() -> Result<State> {
let terminal = Terminal::new(CrosstermBackend::new(io::stdout()))
.context("Unable to create ui terminal")?;
Ok(State {
terminal,
log_path: None,
task_len: None,
ended_tasks: 0,
running_tasks: HashMap::new(),
has_rendered: false,
has_errored: false,
})
}
fn process_msg(&mut self, msg: Msg) -> Result<bool> {
match msg {
Msg::Init { task_len, log_path } => {
self.task_len = Some(task_len);
self.log_path = Some(log_path);
}
Msg::Exit => return Ok(false),
Msg::TaskStart { id, args } => {
self.running_tasks.insert(
id,
Task {
id,
ratio: None,
args,
},
);
}
Msg::TaskEnd { id } => {
self.running_tasks
.remove(&id)
.context("unable to remove finished task; could't find task")?;
self.ended_tasks += 1;
}
Msg::TaskProgress { id, ratio } => {
let mut task = self
.running_tasks
.get_mut(&id)
.context("Unable to update task progress; could't find task")?;
task.ratio = Some(ratio);
}
Msg::TaskError { id } => {
// TODO
self.running_tasks
.remove(&id)
.context("unable to remove errored task; could't find task")?;
self.ended_tasks += 1;
self.has_errored = true;
}
}
Ok(true)
}
fn render(&mut self) -> Result<()> {
use tui::{
layout::{Constraint, Direction, Layout, Rect},
style::{Color, Modifier, Style},
text::Text,
widgets::{Block, Borders, Gauge, Paragraph},
};
let task_len = if let Some(task_len) = self.task_len {
task_len
} else {
return Ok(());
};
if task_len == 0 {
return Ok(());
}
let tasks_ended = self.ended_tasks;
let mut running_tasks: Vec<_> = self.running_tasks.values().cloned().collect();
running_tasks.sort_by_key(|task| task.id);
if !self.has_rendered {
self.terminal.clear().context("cleaning ui failed")?;
self.has_rendered = true;
}
let error_text = match self.has_errored {
true => {
let text: Cow<'static, str> = self
.log_path
.as_ref()
.map(|lp| {
let text = format!("Error(s) occurred and were logged to {}", lp.display());
Cow::Owned(text)
})
.unwrap_or_else(|| Cow::Borrowed("Error(s) occurred"));
Some(text)
}
false => None,
};
self.terminal
.draw(|f| {
let chunks = Layout::default()
.direction(Direction::Vertical)
.margin(1)
.constraints([Constraint::Percentage(90), Constraint::Percentage(10)].as_ref())
.split(f.size());
let mut task_rect = chunks[0];
if error_text.is_some() {
task_rect.height -= 3;
}
for (row, task) in running_tasks
.into_iter()
.take(task_rect.height as usize / 2)
.enumerate()
{
f.render_widget(
Gauge::default()
.label(task.args.rel_from_path.to_string_lossy().as_ref())
.gauge_style(
Style::default()
.fg(Color::White)
.bg(Color::Black)
.add_modifier(Modifier::ITALIC),
)
.ratio(task.ratio.unwrap_or(0.0)),
Rect::new(
task_rect.x,
task_rect.y + row as u16 * 2,
task_rect.width,
1,
),
);
}
if let Some(error_text) = error_text {
f.render_widget(
Paragraph::new(Text::raw(error_text)).style(
Style::default()
.fg(Color::Red)
.bg(Color::Black)
.add_modifier(Modifier::BOLD),
),
Rect::new(task_rect.x, task_rect.height + 1, task_rect.width, 2),
);
}
f.render_widget(
Gauge::default()
.block(
Block::default()
.borders(Borders::ALL)
.title("Overall Progress"),
)
.gauge_style(
Style::default()
.fg(Color::White)
.bg(Color::Black)
.add_modifier(Modifier::ITALIC),
)
.ratio(tasks_ended as f64 / task_len as f64),
chunks[1],
);
})
.context("rendering ui failed")?;
Ok(())
}
}
#[derive(Debug, Clone)]
struct Task {
id: usize,
ratio: Option<f64>,
args: ConversionArgs,
}
pub fn init() -> (MsgQueue, impl Future<Output = Result<()>>) {
let queue = MsgQueue::new();
let queue_clone = queue.clone();
let fut = async move {
let mut interval = interval(Duration::from_millis(UPDATE_INTERVAL_MILLIS));
let mut wrapped = Some((Vec::new(), State::new()?));
loop {
interval.tick().await;
let (mut current_queue, mut state) = wrapped.take().context("`wrapped` is None")?;
queue_clone.swap_inner(&mut current_queue);
let render_res = task::spawn_blocking(move || -> Result<_> {
let mut exit = false;
for msg in current_queue.drain(..) {
if !state.process_msg(msg)? {
exit = true;
}
}
state.render()?;
if exit {
Ok(None)
} else {
Ok(Some((current_queue, state)))
}
})
.await
.context("ui update task failed")?
.context("ui update failed")?;
match render_res {
Some(s) => wrapped = Some(s),
None => break,
}
}
Result::<_>::Ok(())
};
(queue, fut)
}