This commit is contained in:
2020-12-19 13:02:20 +01:00
parent 99a75fa4e1
commit b0c48d405e
6 changed files with 1085 additions and 478 deletions

View File

@@ -1,8 +1,9 @@
mod config;
mod ui;
use crate::config::Config;
use anyhow::{Context, Error, Result};
use futures::{channel::mpsc, prelude::*};
use futures::{future, pin_mut, prelude::*};
use glib::{subclass::prelude::*, GBoxed, GString};
use gstreamer::{gst_element_error, prelude::*, Element};
use gstreamer_base::prelude::*;
@@ -10,10 +11,14 @@ use std::{
borrow::Cow,
error::Error as StdError,
ffi, fmt,
fmt::Write as FmtWrite,
io::Write as IoWrite,
path::{Path, PathBuf},
result::Result as StdResult,
sync::Arc,
time::Duration,
};
use tokio::{task, time::interval};
#[derive(Clone, Debug, GBoxed)]
#[gboxed(type_name = "GBoxErrorWrapper")]
@@ -62,13 +67,12 @@ fn gmake<T: IsA<Element>>(factory_name: &str) -> Result<T> {
}
#[derive(Debug, Clone)]
struct ConvertionArgs {
from: PathBuf,
to: PathBuf,
pub struct ConvertionArgs {
rel_from_path: PathBuf,
transcode: config::Transcode,
}
fn get_path_pairs(config: Config) -> impl Iterator<Item = ConvertionArgs> {
fn get_convertion_args(config: &Config) -> impl Iterator<Item = ConvertionArgs> + '_ {
walkdir::WalkDir::new(&config.from)
.into_iter()
.filter_map(|e| e.ok())
@@ -88,7 +92,9 @@ fn get_path_pairs(config: Config) -> impl Iterator<Item = ConvertionArgs> {
return None;
};
let mut to = config.to.join(e.path().strip_prefix(&config.from).unwrap());
let rel_path = e.path().strip_prefix(&config.from).unwrap();
let mut to = config.to.join(&rel_path);
to.set_extension(transcode.extension());
let is_newer = {
@@ -107,54 +113,133 @@ fn get_path_pairs(config: Config) -> impl Iterator<Item = ConvertionArgs> {
}
Some(ConvertionArgs {
from: e.path().to_path_buf(),
to,
rel_from_path: rel_path.to_path_buf(),
transcode,
})
})
}
fn main() -> Result<()> {
gstreamer::init()?;
let config = config::config().context("could not get the config")?;
#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<()> {
task::LocalSet::new()
.run_until(async move {
let (ui_queue, ui_fut) = ui::init();
let (pair_tx, pair_rx) = mpsc::channel(16);
let main_handle = async move {
let ok = task::spawn_local(main_loop(ui_queue))
.await
.context("main task failed")??;
Result::<_>::Ok(ok)
};
// move blocking directory reading to an external thread
let pair_producer = std::thread::spawn(|| {
let produce_pairs = futures::stream::iter(get_path_pairs(config))
.map(Ok)
.forward(pair_tx)
.map(|res| res.context("sending path pairs failed"));
futures::executor::block_on(produce_pairs)
let ui_handle = async move {
let ok = task::spawn_local(ui_fut)
.await
.context("ui task failed")?
.context("ui failed")?;
Result::<_>::Ok(ok)
};
future::try_join(main_handle, ui_handle).await?;
Ok(())
})
.await
}
async fn main_loop(ui_queue: ui::MsgQueue) -> Result<()> {
let (config, conv_args) = task::spawn_blocking(|| -> Result<_> {
gstreamer::init()?;
let config = config::config().context("could not get the config")?;
let conv_args = get_convertion_args(&config).collect::<Vec<_>>();
Ok((config, conv_args))
})
.await
.context("init task failed")??;
let log_path = Path::new("./audio-conv.log")
.canonicalize()
.context("unable to canonicalize path to log file")?;
ui_queue.push(ui::Msg::Init {
task_len: conv_args.len(),
log_path: log_path.clone(),
});
let transcoder = pair_rx.for_each_concurrent(num_cpus::get(), |args| async move {
if let Err(err) = transcode(&args).await {
println!(
"err {} => {}:\n{:?}",
args.from.display(),
args.to.display(),
err
);
}
});
futures::executor::block_on(transcoder);
stream::iter(conv_args.into_iter().enumerate())
.map(Ok)
.try_for_each_concurrent(num_cpus::get(), |(i, args)| {
let config = config.clone();
let msg_queue = ui_queue.clone();
let log_path = &log_path;
pair_producer
.join()
.expect("directory reading thread panicked")?;
async move {
msg_queue.push(ui::Msg::TaskStart {
id: i,
args: args.clone(),
});
match transcode(&config, &args, i, &msg_queue).await {
Ok(()) => msg_queue.push(ui::Msg::TaskEnd { id: i }),
Err(err) => {
let err = err.context(format!(
"failed transcoding \"{}\"",
args.rel_from_path.display()
));
let mut log_file = match std::fs::OpenOptions::new()
.create(true)
.append(true)
.open(log_path)
{
Ok(log_file) => log_file,
Err(fs_err) => {
let err = err.context(fs_err).context("Unable to open log file");
return Err(err);
}
};
let mut err_str = String::new();
write!(&mut err_str, "{:?}\n", err).context("TODO")?;
log_file.write_all(err_str.as_ref()).map_err(|fs_err| {
err.context(format!(
"Unable to write transcoding error to log file (fs error: {})",
fs_err
))
})?;
msg_queue.push(ui::Msg::TaskError { id: i });
}
}
Result::<_>::Ok(())
}
})
.await?;
ui_queue.push(ui::Msg::Exit);
Ok(())
}
async fn transcode(args: &ConvertionArgs) -> Result<()> {
async fn transcode(
config: &Config,
args: &ConvertionArgs,
task_id: usize,
queue: &ui::MsgQueue,
) -> Result<()> {
let from_path = config.from.join(&args.rel_from_path);
let mut to_path = config.to.join(&args.rel_from_path);
to_path.set_extension(args.transcode.extension());
let file_src: Element = gmake("filesrc")?;
file_src.set_property("location", &path_to_gstring(&args.from))?;
file_src.set_property("location", &path_to_gstring(&from_path))?;
// encode into a tmp file first, then rename to actuall file name, that way we're writing
// "whole" files to the intended file path, ignoring partial files in the mtime check
let tmp_dest = args.to.with_extension("tmp");
let to_path_tmp = to_path.with_extension("tmp");
let decodebin: Element = gmake("decodebin")?;
@@ -170,7 +255,7 @@ async fn transcode(args: &ConvertionArgs) -> Result<()> {
let transcode_args = args.transcode.clone();
let tmp_dest_clone = tmp_dest.clone();
let to_path_tmp_clone = to_path_tmp.clone();
decodebin.connect_pad_added(move |decodebin, src_pad| {
let insert_sink = || -> Result<()> {
@@ -238,7 +323,7 @@ async fn transcode(args: &ConvertionArgs) -> Result<()> {
};
let file_dest: gstreamer_base::BaseSink = gmake("filesink")?;
file_dest.set_property("location", &path_to_gstring(&tmp_dest_clone))?;
file_dest.set_property("location", &path_to_gstring(&to_path_tmp_clone))?;
file_dest.set_sync(false);
dest_elems.push(file_dest.upcast());
@@ -277,78 +362,119 @@ async fn transcode(args: &ConvertionArgs) -> Result<()> {
let bus = pipeline.get_bus().context("pipe get bus")?;
std::fs::create_dir_all(
args.to
to_path
.parent()
.with_context(|| format!("could not get parent dir for {}", args.to.display()))?,
.with_context(|| format!("could not get parent dir for {}", to_path.display()))?,
)?;
rm_file_on_err(&tmp_dest, async {
rm_file_on_err(&to_path_tmp, async {
pipeline
.set_state(gstreamer::State::Playing)
.context("Unable to set the pipeline to the `Playing` state")?;
bus.stream()
.map::<Result<bool>, _>(|msg| {
use gstreamer::MessageView;
let stream_processor = async {
bus.stream()
.map::<Result<bool>, _>(|msg| {
use gstreamer::MessageView;
match msg.view() {
MessageView::Eos(..) => {
// we need to actively stop pulling the stream, that's because stream will
// never end despite yielding an `Eos` message
Ok(false)
match msg.view() {
// MessageView::Progress() => {
// }
MessageView::Eos(..) => {
// we need to actively stop pulling the stream, that's because stream will
// never end despite yielding an `Eos` message
Ok(false)
}
MessageView::Error(err) => {
pipeline.set_state(gstreamer::State::Null).context(
"Unable to set the pipeline to the `Null` state, after error",
)?;
let err = err
.get_details()
.and_then(|details| {
if details.get_name() != "error-details" {
return None;
}
let err = details
.get::<&GBoxErrorWrapper>("error")
.unwrap()
.map(|err| err.clone().into())
.expect("error-details message without actual error");
Some(err)
})
.unwrap_or_else(|| {
GErrorMessage {
src: msg
.get_src()
.map(|s| String::from(s.get_path_string()))
.unwrap_or_else(|| String::from("None")),
error: err.get_error().to_string(),
debug: err.get_debug(),
source: err.get_error(),
}
.into()
});
Err(err)
}
_ => Ok(true),
}
MessageView::Error(err) => {
pipeline.set_state(gstreamer::State::Null).context(
"Unable to set the pipeline to the `Null` state, after error",
)?;
let err = err
.get_details()
.and_then(|details| {
if details.get_name() != "error-details" {
return None;
}
let err = details
.get::<&GBoxErrorWrapper>("error")
.unwrap()
.map(|err| err.clone().into())
.expect("error-details message without actual error");
Some(err)
})
.unwrap_or_else(|| {
GErrorMessage {
src: msg
.get_src()
.map(|s| String::from(s.get_path_string()))
.unwrap_or_else(|| String::from("None")),
error: err.get_error().to_string(),
debug: err.get_debug(),
source: err.get_error(),
}
.into()
});
Err(err)
})
.take_while(|e| {
if let Ok(false) = e {
futures::future::ready(false)
} else {
futures::future::ready(true)
}
_ => Ok(true),
})
.try_for_each(|_| futures::future::ready(Ok(())))
.await
.context("failed converting")?;
Result::<_>::Ok(())
};
pin_mut!(stream_processor);
let mut progress_interval = interval(Duration::from_millis(ui::UPDATE_INTERVAL_MILLIS / 2));
let progress_processor = async {
use gstreamer::ClockTime;
loop {
progress_interval.tick().await;
let dur = decodebin
.query_duration::<ClockTime>()
.and_then(|time| time.nanoseconds());
let ratio = dur.and_then(|dur| {
let pos = decodebin
.query_position::<ClockTime>()
.and_then(|time| time.nanoseconds());
pos.map(|pos| pos as f64 / dur as f64)
});
if let Some(ratio) = ratio {
queue.push(ui::Msg::TaskProgress { id: task_id, ratio });
}
})
.take_while(|e| {
if let Ok(false) = e {
futures::future::ready(false)
} else {
futures::future::ready(true)
}
})
.try_for_each(|_| futures::future::ready(Ok(())))
}
#[allow(unreachable_code)]
Result::<_>::Ok(())
};
pin_mut!(progress_processor);
future::try_select(stream_processor, progress_processor)
.await
.context("failed converting")?;
.map_err(|err| err.factor_first().0)?;
pipeline
.set_state(gstreamer::State::Null)
.context("Unable to set the pipeline to the `Null` state")?;
std::fs::rename(&tmp_dest, &args.to)?;
std::fs::rename(&to_path_tmp, &to_path)?;
Ok(())
})
@@ -362,10 +488,13 @@ where
match f.await {
Err(err) => match std::fs::remove_file(path) {
Ok(..) => Err(err),
Err(rm_err) if rm_err.kind() == std::io::ErrorKind::NotFound => Err(err),
Err(rm_err) => Err(rm_err)
.context(format!("removing {}", path.display()))
.context(err),
Err(fs_err) if fs_err.kind() == std::io::ErrorKind::NotFound => Err(err),
Err(fs_err) => {
let err = err
.context(fs_err)
.context(format!("removing {} failed", path.display()));
Err(err)
}
},
res @ Ok(..) => res,
}
@@ -381,16 +510,15 @@ fn path_to_bytes(path: &Path) -> Cow<'_, [u8]> {
#[cfg(windows)]
{
let mut buf = Vec::<u8>::new();
// NOT TESTED
// FIXME: test and post answer to https://stackoverflow.com/questions/38948669
use std::os::windows::ffi::OsStrExt;
buf.extend(
path.as_os_str()
.encode_wide()
.map(|char| char.to_ne_bytes())
.flatten(),
);
let buf: Vec<u8> = path
.as_os_str()
.encode_wide()
.map(|char| char.to_ne_bytes())
.flatten()
.collect();
Cow::Owned(buf)
}
}