262 lines
7.4 KiB
Rust
262 lines
7.4 KiB
Rust
//! Weave resolves a list of roots from `releases.parquet` against `narinfo.parquet`,
|
|
//! and then uses the reference graph from the accompanying `narinfo-references.parquet`
|
|
//! produced by `swizzle` to collect the closure of the roots.
|
|
//!
|
|
//! They are written to `live_idxs.parquet`, which only has one column, representing
|
|
//! the row numbers in `narinfo.parquet` corresponding to live paths.
|
|
|
|
use anyhow::Result;
|
|
use hashbrown::{hash_table, HashTable};
|
|
use rayon::prelude::*;
|
|
use rustc_hash::FxHashSet;
|
|
use std::{
|
|
collections::BTreeMap,
|
|
fs::File,
|
|
ops::Index,
|
|
sync::atomic::{AtomicU32, Ordering},
|
|
};
|
|
use tracing::{info_span, warn};
|
|
use tracing_indicatif::span_ext::IndicatifSpanExt;
|
|
|
|
use polars::{
|
|
datatypes::StaticArray,
|
|
export::arrow::{array::UInt32Array, offset::OffsetsBuffer},
|
|
lazy::dsl::col,
|
|
prelude::*,
|
|
};
|
|
|
|
use weave::{as_fixed_binary, hash64, INDEX_NULL};
|
|
|
|
#[tracing::instrument]
|
|
fn main() -> Result<()> {
|
|
let _tracing = snix_tracing::TracingBuilder::default()
|
|
.enable_progressbar()
|
|
.build()?;
|
|
|
|
let roots: PathSet32 = {
|
|
let span = info_span!("parse_roots", indicatif.pb_show = tracing::field::Empty).entered();
|
|
span.pb_set_message("parse roots");
|
|
span.pb_start();
|
|
|
|
as_fixed_binary::<20>(
|
|
LazyFrame::scan_parquet("releases.parquet", ScanArgsParquet::default())?
|
|
.explode([col("store_path_hash")])
|
|
.select([col("store_path_hash")])
|
|
.collect()?
|
|
.column("store_path_hash")?
|
|
.binary()?,
|
|
)
|
|
.flatten()
|
|
.collect()
|
|
};
|
|
|
|
{
|
|
let span = info_span!("resolve_roots", indicatif.pb_show = tracing::field::Empty).entered();
|
|
span.pb_set_message("resolve roots");
|
|
span.pb_start();
|
|
|
|
weave::load_ph_array()?
|
|
.into_par_iter()
|
|
.enumerate()
|
|
.for_each(|(idx, h)| {
|
|
if let Some(idx_slot) = roots.find(h) {
|
|
assert_eq!(
|
|
idx_slot.swap(idx as u32, Ordering::Relaxed),
|
|
INDEX_NULL,
|
|
"duplicate entry"
|
|
);
|
|
}
|
|
});
|
|
}
|
|
|
|
let mut todo = FxHashSet::default();
|
|
todo.reserve(roots.len());
|
|
{
|
|
let mut unknown_roots = 0usize;
|
|
for (_, idx) in roots.table {
|
|
let idx = idx.into_inner();
|
|
if idx == INDEX_NULL {
|
|
unknown_roots += 1;
|
|
continue;
|
|
}
|
|
todo.insert(idx);
|
|
}
|
|
|
|
if unknown_roots != 0 {
|
|
warn!("skipping {unknown_roots} unknown roots");
|
|
}
|
|
}
|
|
|
|
let ri_array;
|
|
let ri_array = {
|
|
let span = info_span!(
|
|
"load_reference_idxs",
|
|
indicatif.pb_show = tracing::field::Empty
|
|
)
|
|
.entered();
|
|
span.pb_set_message("load reference_idxs");
|
|
span.pb_start();
|
|
|
|
ri_array = ParquetReader::new(File::open("narinfo-references.parquet")?)
|
|
.finish()?
|
|
.column("reference_idxs")?
|
|
.list()?
|
|
.clone();
|
|
|
|
ChunkedList::new(ri_array.downcast_iter().map(|chunk| {
|
|
(
|
|
chunk.offsets(),
|
|
chunk
|
|
.values()
|
|
.as_any()
|
|
.downcast_ref::<UInt32Array>()
|
|
.unwrap()
|
|
.as_slice()
|
|
.unwrap(),
|
|
)
|
|
}))
|
|
};
|
|
|
|
let mut seen = todo.clone();
|
|
{
|
|
let span = info_span!("mark", indicatif.pb_show = tracing::field::Empty).entered();
|
|
span.pb_set_message("marking");
|
|
span.pb_set_style(&snix_tracing::PB_PROGRESS_STYLE);
|
|
|
|
while !todo.is_empty() {
|
|
span.pb_set_length(seen.len() as u64);
|
|
span.pb_set_position(seen.len().saturating_sub(todo.len()) as u64);
|
|
|
|
todo = todo
|
|
.par_iter()
|
|
.flat_map(|&parent| {
|
|
if parent == INDEX_NULL {
|
|
return FxHashSet::default();
|
|
}
|
|
|
|
ri_array[parent as usize]
|
|
.iter()
|
|
.cloned()
|
|
.filter(|child| !seen.contains(child))
|
|
.collect::<FxHashSet<u32>>()
|
|
})
|
|
.collect();
|
|
|
|
for &index in &todo {
|
|
seen.insert(index);
|
|
}
|
|
}
|
|
|
|
span.pb_set_length(seen.len() as u64);
|
|
span.pb_set_position(seen.len() as u64);
|
|
|
|
if seen.remove(&INDEX_NULL) {
|
|
warn!("WARNING: missing edges");
|
|
}
|
|
}
|
|
|
|
let seen = {
|
|
let span = info_span!("gather_live", indicatif.pb_show = tracing::field::Empty).entered();
|
|
span.pb_set_message("gathering live set");
|
|
|
|
let mut seen: Vec<u32> = seen.into_iter().collect();
|
|
seen.par_sort();
|
|
seen
|
|
};
|
|
|
|
{
|
|
let span = info_span!("write_output", indicatif.pb_show = tracing::field::Empty).entered();
|
|
span.pb_set_message("writing output");
|
|
span.pb_start();
|
|
|
|
ParquetWriter::new(File::create("live_idxs.parquet")?).finish(&mut df! {
|
|
"live_idx" => seen,
|
|
}?)?;
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
struct PathSet32 {
|
|
table: HashTable<([u8; 20], AtomicU32)>,
|
|
}
|
|
|
|
impl PathSet32 {
|
|
fn with_capacity(capacity: usize) -> Self {
|
|
Self {
|
|
table: HashTable::with_capacity(capacity),
|
|
}
|
|
}
|
|
|
|
fn insert(&mut self, value: &[u8; 20]) -> bool {
|
|
let hash = hash64(value);
|
|
|
|
match self
|
|
.table
|
|
.entry(hash, |(x, _)| x == value, |(x, _)| hash64(x))
|
|
{
|
|
hash_table::Entry::Occupied(_) => false,
|
|
hash_table::Entry::Vacant(entry) => {
|
|
entry.insert((*value, AtomicU32::new(INDEX_NULL)));
|
|
true
|
|
}
|
|
}
|
|
}
|
|
|
|
fn find(&self, value: &[u8; 20]) -> Option<&AtomicU32> {
|
|
let hash = hash64(value);
|
|
self.table
|
|
.find(hash, |(x, _)| x == value)
|
|
.as_ref()
|
|
.map(|(_, x)| x)
|
|
}
|
|
|
|
fn len(&self) -> usize {
|
|
self.table.len()
|
|
}
|
|
}
|
|
|
|
impl<'a> FromIterator<&'a [u8; 20]> for PathSet32 {
|
|
fn from_iter<T: IntoIterator<Item = &'a [u8; 20]>>(iter: T) -> Self {
|
|
let iter = iter.into_iter();
|
|
let mut this = Self::with_capacity(iter.size_hint().0);
|
|
|
|
for item in iter {
|
|
this.insert(item);
|
|
}
|
|
|
|
this.table.shrink_to_fit(|(x, _)| hash64(x));
|
|
this
|
|
}
|
|
}
|
|
|
|
struct ChunkedList<'a, T> {
|
|
by_offset: BTreeMap<usize, (&'a OffsetsBuffer<i64>, &'a [T])>,
|
|
}
|
|
|
|
impl<'a, T> ChunkedList<'a, T> {
|
|
fn new(chunks: impl IntoIterator<Item = (&'a OffsetsBuffer<i64>, &'a [T])>) -> Self {
|
|
let mut next_offset = 0usize;
|
|
ChunkedList {
|
|
by_offset: chunks
|
|
.into_iter()
|
|
.map(|(offsets, values)| {
|
|
let offset = next_offset;
|
|
next_offset = next_offset.checked_add(offsets.len_proxy()).unwrap();
|
|
|
|
(offset, (offsets, values))
|
|
})
|
|
.collect(),
|
|
}
|
|
}
|
|
}
|
|
|
|
impl<'a, T> Index<usize> for ChunkedList<'a, T> {
|
|
type Output = [T];
|
|
|
|
fn index(&self, index: usize) -> &Self::Output {
|
|
let (&base, &(offsets, values)) = self.by_offset.range(..=index).next_back().unwrap();
|
|
let (start, end) = offsets.start_end(index - base);
|
|
&values[start..end]
|
|
}
|
|
}
|