feat(tvix/store/bin): add progress bar infrastructure

This adds the tracing-indicatif crate, and configures it as a layer in
our tracing_subscriber pipeline to emit progress for every span that's
configured so.

It also moves from using std::io::stderr to write logs to using their
writer, to avoid clobbering output.

Progress bar styles are defined in a lazy_static, moving this into a
general tracing is left for later.

This adds some usage of this to the `imports` and `copy` commands.

The output can still be improved a bit - we should  probably split each
task up into a smaller (instrumented) helper functions, so we can create
a progress bar for each task.

Change-Id: I59a1915aa4e0caa89c911632dec59c4cbeba1b89
Reviewed-on: https://cl.tvl.fyi/c/depot/+/11747
Reviewed-by: flokli <flokli@flokli.de>
Reviewed-by: Simon Hauser <simon.hauser@helsinki-systems.de>
Tested-by: BuildkiteCI
Autosubmit: flokli <flokli@flokli.de>
This commit is contained in:
Florian Klink 2024-04-17 19:44:07 +03:00 committed by flokli
parent 9b77ce9f8f
commit 20513e7a52
8 changed files with 452 additions and 19 deletions

View file

@ -4,6 +4,7 @@ use clap::Subcommand;
use futures::future::try_join_all;
use futures::StreamExt;
use futures::TryStreamExt;
use indicatif::ProgressStyle;
use nix_compat::path_info::ExportedPathInfo;
use serde::Deserialize;
use serde::Serialize;
@ -14,10 +15,13 @@ use tokio_listener::SystemOptions;
use tokio_listener::UserOptions;
use tonic::transport::Server;
use tracing::info;
use tracing::info_span;
use tracing::instrument;
use tracing::Level;
use tracing_subscriber::EnvFilter;
use tracing_subscriber::Layer;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
use tracing::Span;
use tracing_indicatif::filter::IndicatifFilter;
use tracing_indicatif::{span_ext::IndicatifSpanExt, IndicatifLayer};
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, EnvFilter, Layer};
use tvix_castore::import::fs::ingest_path;
use tvix_store::nar::NarCalculationService;
use tvix_store::proto::NarInfo;
@ -31,6 +35,20 @@ use tvix_store::pathinfoservice::PathInfoService;
use tvix_store::proto::path_info_service_server::PathInfoServiceServer;
use tvix_store::proto::GRPCPathInfoServiceWrapper;
use lazy_static::lazy_static;
// FUTUREWORK: move this to tracing crate
lazy_static! {
pub static ref PB_PROGRESS_STYLE: ProgressStyle = ProgressStyle::with_template(
"{span_child_prefix}{bar:30} {wide_msg} [{elapsed_precise}] {pos:>7}/{len:7}"
)
.expect("invalid progress template");
pub static ref PB_SPINNER_STYLE: ProgressStyle = ProgressStyle::with_template(
"{span_child_prefix}{spinner} {wide_msg} [{elapsed_precise}] {pos:>7}/{len:7}"
)
.expect("invalid progress template");
}
#[cfg(any(feature = "fuse", feature = "virtiofs"))]
use tvix_store::pathinfoservice::make_fs;
@ -212,24 +230,32 @@ fn default_threads() -> usize {
}
#[tokio::main]
#[instrument(fields(indicatif.pb_show=1))]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let cli = Cli::parse();
// configure log settings
let level = cli.log_level.unwrap_or(Level::INFO);
let indicatif_layer = IndicatifLayer::new().with_progress_style(PB_SPINNER_STYLE.clone());
// Set up the tracing subscriber.
let subscriber = tracing_subscriber::registry().with(
tracing_subscriber::fmt::Layer::new()
.with_writer(std::io::stderr)
.compact()
.with_filter(
EnvFilter::builder()
.with_default_directive(level.into())
.from_env()
.expect("invalid RUST_LOG"),
),
);
let subscriber = tracing_subscriber::registry()
.with(
tracing_subscriber::fmt::Layer::new()
.with_writer(indicatif_layer.get_stderr_writer())
.compact()
.with_filter(
EnvFilter::builder()
.with_default_directive(level.into())
.from_env()
.expect("invalid RUST_LOG"),
),
)
.with(indicatif_layer.with_filter(
// only show progress for spans with indicatif.pb_show field being set
IndicatifFilter::new(false),
));
// Add the otlp layer (when otlp is enabled, and it's not disabled in the CLI)
// then init the registry.
@ -355,9 +381,19 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let nar_calculation_service: Arc<dyn NarCalculationService> =
nar_calculation_service.into();
let root_span = {
let s = Span::current();
s.pb_set_style(&PB_PROGRESS_STYLE);
s.pb_set_message("Importing paths");
s.pb_set_length(paths.len() as u64);
s.pb_start();
s
};
let tasks = paths
.into_iter()
.map(|path| {
let paths_span = root_span.clone();
tokio::task::spawn({
let blob_service = blob_service.clone();
let directory_service = directory_service.clone();
@ -380,6 +416,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
println!("{}", output_path.to_absolute_path());
}
}
paths_span.pb_inc(1);
}
})
})
@ -416,15 +453,26 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Arc the PathInfoService, as we clone it .
let path_info_service: Arc<dyn PathInfoService> = path_info_service.into();
let lookups_span = info_span!(
"lookup pathinfos",
"indicatif.pb_show" = tracing::field::Empty
);
lookups_span.pb_set_length(reference_graph.closure.len() as u64);
lookups_span.pb_set_style(&PB_PROGRESS_STYLE);
lookups_span.pb_start();
// From our reference graph, lookup all pathinfos that might exist.
let elems: Vec<_> = futures::stream::iter(reference_graph.closure)
.map(|elem| {
let path_info_service = path_info_service.clone();
async move {
path_info_service
let resp = path_info_service
.get(*elem.path.digest())
.await
.map(|resp| (elem, resp))
.map(|resp| (elem, resp));
Span::current().pb_inc(1);
resp
}
})
.buffer_unordered(50)