feat(users/edef/weave): use tracing_indicatif for progress

Progress bars :3

Change-Id: I770d0f8381521b6efc8b38c0db4d59c771887fee
Reviewed-on: https://cl.tvl.fyi/c/depot/+/12673
Tested-by: BuildkiteCI
Reviewed-by: flokli <flokli@flokli.de>
This commit is contained in:
edef 2024-10-19 19:23:56 +00:00
parent b3f0e25fbc
commit 84a82f6f41
6 changed files with 1071 additions and 114 deletions

View file

@ -36,17 +36,27 @@ use polars::{
lazy::dsl::{col, SpecialEq},
prelude::*,
};
use tracing::info_span;
use tracing_indicatif::span_ext::IndicatifSpanExt as _;
use weave::{as_fixed_binary, hash64, leak, load_ph_array, DONE, INDEX_NULL};
use weave::{as_fixed_binary, hash64, leak, load_ph_array, INDEX_NULL};
#[tracing::instrument]
fn main() -> Result<()> {
let _tracing = tvix_tracing::TracingBuilder::default()
.enable_progressbar()
.build()?;
let ph_array: &'static [[u8; 20]] = leak(load_ph_array()?);
// TODO(edef): re-parallelise this
// We originally parallelised on chunks, but ph_array is only a single chunk, due to how Parquet loading works.
// TODO(edef): outline the 64-bit hash prefix? it's an indirection, but it saves ~2G of memory
eprint!("… build index\r");
let ph_map: &'static HashTable<(u64, u32)> = {
let span = info_span!("ph_map", indicatif.pb_show = tracing::field::Empty).entered();
span.pb_set_message("build index");
span.pb_start();
let mut ph_map = HashTable::with_capacity(ph_array.len());
for (offset, item) in ph_array.iter().enumerate() {
@ -57,7 +67,6 @@ fn main() -> Result<()> {
&*Box::leak(Box::new(ph_map))
};
eprintln!("{DONE}");
let ph_to_idx = |key: &[u8; 20]| -> u32 {
let hash = hash64(key);
@ -69,35 +78,41 @@ fn main() -> Result<()> {
.unwrap_or(INDEX_NULL)
};
eprint!("… swizzle references\r");
LazyFrame::scan_parquet("narinfo.parquet", ScanArgsParquet::default())?
.with_column(
col("references")
.map(
move |series: Series| -> PolarsResult<Option<Series>> {
Ok(Some(
series
.list()?
.apply_to_inner(&|series: Series| -> PolarsResult<Series> {
let series = series.binary()?;
let mut out: Vec<u32> = Vec::with_capacity(series.len());
out.extend(as_fixed_binary(series).flatten().map(ph_to_idx));
Ok(Series::from_vec("reference_idxs", out))
})?
.into_series(),
))
},
SpecialEq::from_type(DataType::List(DataType::UInt32.into())),
)
.alias("reference_idxs"),
)
.select([col("reference_idxs")])
.with_streaming(true)
.sink_parquet(
"narinfo-references.parquet".into(),
ParquetWriteOptions::default(),
)?;
eprintln!("{DONE}");
{
let span = info_span!("swizzle_refs", indicatif.pb_show = tracing::field::Empty).entered();
span.pb_set_message("swizzle references");
span.pb_start();
LazyFrame::scan_parquet("narinfo.parquet", ScanArgsParquet::default())?
.with_column(
col("references")
.map(
move |series: Series| -> PolarsResult<Option<Series>> {
Ok(Some(
series
.list()?
.apply_to_inner(&|series: Series| -> PolarsResult<Series> {
let series = series.binary()?;
let mut out: Vec<u32> = Vec::with_capacity(series.len());
out.extend(
as_fixed_binary(series).flatten().map(ph_to_idx),
);
Ok(Series::from_vec("reference_idxs", out))
})?
.into_series(),
))
},
SpecialEq::from_type(DataType::List(DataType::UInt32.into())),
)
.alias("reference_idxs"),
)
.select([col("reference_idxs")])
.with_streaming(true)
.sink_parquet(
"narinfo-references.parquet".into(),
ParquetWriteOptions::default(),
)?;
};
Ok(())
}

View file

@ -8,6 +8,7 @@ use std::{
slice,
sync::Arc,
};
use tracing_indicatif::span_ext::IndicatifSpanExt as _;
use polars::{
datatypes::BinaryChunked,
@ -20,7 +21,6 @@ pub type FixedBytes<const N: usize> =
ArcRef<'static, polars::export::arrow::buffer::Bytes<u8>, [[u8; N]]>;
pub const INDEX_NULL: u32 = !0;
pub const DONE: &str = "\u{2714}";
/// A terrific hash function, turning 20 bytes of cryptographic hash
/// into 8 bytes of cryptographic hash.
@ -42,8 +42,13 @@ pub fn leak<O, T: ?Sized>(r: OwningRef<Arc<O>, T>) -> &T {
/// Read a dense `store_path_hash` array from `narinfo.parquet`,
/// returning it as an owned [FixedBytes].
#[tracing::instrument(fields(indicatif.pb_show = tracing::field::Empty))]
pub fn load_ph_array() -> Result<FixedBytes<20>> {
eprint!("… load store_path_hash\r");
let span = tracing::Span::current();
span.pb_set_message("load store_path_hash");
span.pb_start();
// TODO(edef): this could use a further pushdown, since polars is more hindrance than help here
// We know this has to fit in memory (we can't mmap it without further encoding constraints),
// and we want a single `Vec<[u8; 20]>` of the data.
@ -57,7 +62,6 @@ pub fn load_ph_array() -> Result<FixedBytes<20>> {
);
u32::try_from(ph_array.len()).expect("dataset exceeds 2^32");
eprintln!("{DONE}");
Ok(ph_array)
}

View file

@ -15,6 +15,8 @@ use std::{
ops::Index,
sync::atomic::{AtomicU32, Ordering},
};
use tracing::{info_span, warn};
use tracing_indicatif::span_ext::IndicatifSpanExt;
use polars::{
datatypes::StaticArray,
@ -23,36 +25,48 @@ use polars::{
prelude::*,
};
use weave::{as_fixed_binary, hash64, DONE, INDEX_NULL};
use weave::{as_fixed_binary, hash64, INDEX_NULL};
#[tracing::instrument]
fn main() -> Result<()> {
eprint!("… parse roots\r");
let roots: PathSet32 = as_fixed_binary::<20>(
LazyFrame::scan_parquet("releases.parquet", ScanArgsParquet::default())?
.explode([col("store_path_hash")])
.select([col("store_path_hash")])
.collect()?
.column("store_path_hash")?
.binary()?,
)
.flatten()
.collect();
eprintln!("{DONE}");
let _tracing = tvix_tracing::TracingBuilder::default()
.enable_progressbar()
.build()?;
let roots: PathSet32 = {
let span = info_span!("parse_roots", indicatif.pb_show = tracing::field::Empty).entered();
span.pb_set_message("parse roots");
span.pb_start();
as_fixed_binary::<20>(
LazyFrame::scan_parquet("releases.parquet", ScanArgsParquet::default())?
.explode([col("store_path_hash")])
.select([col("store_path_hash")])
.collect()?
.column("store_path_hash")?
.binary()?,
)
.flatten()
.collect()
};
{
let ph_array = weave::load_ph_array()?;
let span = info_span!("resolve_roots", indicatif.pb_show = tracing::field::Empty).entered();
span.pb_set_message("resolve roots");
span.pb_start();
eprint!("… resolve roots\r");
ph_array.par_iter().enumerate().for_each(|(idx, h)| {
if let Some(idx_slot) = roots.find(h) {
assert_eq!(
idx_slot.swap(idx as u32, Ordering::Relaxed),
INDEX_NULL,
"duplicate entry"
);
}
});
eprintln!("{DONE}");
weave::load_ph_array()?
.into_par_iter()
.enumerate()
.for_each(|(idx, h)| {
if let Some(idx_slot) = roots.find(h) {
assert_eq!(
idx_slot.swap(idx as u32, Ordering::Relaxed),
INDEX_NULL,
"duplicate entry"
);
}
});
}
let mut todo = FxHashSet::default();
@ -67,17 +81,28 @@ fn main() -> Result<()> {
}
todo.insert(idx);
}
println!("skipping {unknown_roots} unknown roots");
if unknown_roots != 0 {
warn!("skipping {unknown_roots} unknown roots");
}
}
eprint!("… load reference_idxs\r");
let ri_array = ParquetReader::new(File::open("narinfo-references.parquet")?)
.finish()?
.column("reference_idxs")?
.list()?
.clone();
let ri_array;
let ri_array = {
let span = info_span!(
"load_reference_idxs",
indicatif.pb_show = tracing::field::Empty
)
.entered();
span.pb_set_message("load reference_idxs");
span.pb_start();
ri_array = ParquetReader::new(File::open("narinfo-references.parquet")?)
.finish()?
.column("reference_idxs")?
.list()?
.clone();
ChunkedList::new(ri_array.downcast_iter().map(|chunk| {
(
chunk.offsets(),
@ -91,49 +116,64 @@ fn main() -> Result<()> {
)
}))
};
eprintln!("{DONE}");
let mut seen = todo.clone();
while !todo.is_empty() {
println!("todo: {} seen: {}", todo.len(), seen.len());
{
let span = info_span!("mark", indicatif.pb_show = tracing::field::Empty).entered();
span.pb_set_message("marking");
span.pb_set_style(&tvix_tracing::PB_PROGRESS_STYLE);
todo = todo
.par_iter()
.flat_map(|&parent| {
if parent == INDEX_NULL {
return FxHashSet::default();
}
while !todo.is_empty() {
span.pb_set_length(seen.len() as u64);
span.pb_set_position(seen.len().saturating_sub(todo.len()) as u64);
ri_array[parent as usize]
.iter()
.cloned()
.filter(|child| !seen.contains(child))
.collect::<FxHashSet<u32>>()
})
.collect();
todo = todo
.par_iter()
.flat_map(|&parent| {
if parent == INDEX_NULL {
return FxHashSet::default();
}
for &index in &todo {
seen.insert(index);
ri_array[parent as usize]
.iter()
.cloned()
.filter(|child| !seen.contains(child))
.collect::<FxHashSet<u32>>()
})
.collect();
for &index in &todo {
seen.insert(index);
}
}
span.pb_set_length(seen.len() as u64);
span.pb_set_position(seen.len() as u64);
if seen.remove(&INDEX_NULL) {
warn!("WARNING: missing edges");
}
}
println!("done: {} paths", seen.len());
let seen = {
let span = info_span!("gather_live", indicatif.pb_show = tracing::field::Empty).entered();
span.pb_set_message("gathering live set");
if seen.remove(&INDEX_NULL) {
println!("WARNING: missing edges");
let mut seen: Vec<u32> = seen.into_iter().collect();
seen.par_sort();
seen
};
{
let span = info_span!("write_output", indicatif.pb_show = tracing::field::Empty).entered();
span.pb_set_message("writing output");
span.pb_start();
ParquetWriter::new(File::create("live_idxs.parquet")?).finish(&mut df! {
"live_idx" => seen,
}?)?;
}
eprint!("… gathering live set\r");
let mut seen: Vec<u32> = seen.into_iter().collect();
seen.par_sort();
eprintln!("{DONE}");
eprint!("… writing output\r");
ParquetWriter::new(File::create("live_idxs.parquet")?).finish(&mut df! {
"live_idx" => seen,
}?)?;
eprintln!("{DONE}");
Ok(())
}