refactor(users/edef/weave/swizzle): use polars streaming
This vastly reduces the memory requirements, so we can run in ~40G RAM. Change-Id: I4952a780df294bd852a8b4682ba2fd59b9bae675 Reviewed-on: https://cl.tvl.fyi/c/depot/+/12667 Reviewed-by: flokli <flokli@flokli.de> Tested-by: BuildkiteCI
This commit is contained in:
parent
bdc2891053
commit
313899c291
5 changed files with 70 additions and 112 deletions
|
|
@ -32,21 +32,21 @@
|
|||
|
||||
use anyhow::Result;
|
||||
use hashbrown::HashTable;
|
||||
use polars::prelude::*;
|
||||
use rayon::prelude::*;
|
||||
use std::fs::File;
|
||||
use tokio::runtime::Runtime;
|
||||
use polars::{
|
||||
lazy::dsl::{col, SpecialEq},
|
||||
prelude::*,
|
||||
};
|
||||
|
||||
use weave::{as_fixed_binary, hash64, load_ph_array, DONE, INDEX_NULL};
|
||||
use weave::{as_fixed_binary, hash64, leak, load_ph_array, DONE, INDEX_NULL};
|
||||
|
||||
fn main() -> Result<()> {
|
||||
let ph_array = load_ph_array()?;
|
||||
let ph_array: &'static [[u8; 20]] = leak(load_ph_array()?);
|
||||
|
||||
// TODO(edef): re-parallelise this
|
||||
// We originally parallelised on chunks, but ph_array is only a single chunk, due to how Parquet loading works.
|
||||
// TODO(edef): outline the 64-bit hash prefix? it's an indirection, but it saves ~2G of memory
|
||||
eprint!("… build index\r");
|
||||
let ph_map: HashTable<(u64, u32)> = {
|
||||
let ph_map: &'static HashTable<(u64, u32)> = {
|
||||
let mut ph_map = HashTable::with_capacity(ph_array.len());
|
||||
|
||||
for (offset, item) in ph_array.iter().enumerate() {
|
||||
|
|
@ -55,59 +55,48 @@ fn main() -> Result<()> {
|
|||
ph_map.insert_unique(hash, (hash, offset), |&(hash, _)| hash);
|
||||
}
|
||||
|
||||
ph_map
|
||||
&*Box::leak(Box::new(ph_map))
|
||||
};
|
||||
eprintln!("{DONE}");
|
||||
|
||||
eprint!("… swizzle references\r");
|
||||
let mut pq = ParquetReader::new(File::open("narinfo.parquet")?)
|
||||
.with_columns(Some(vec!["references".into()]))
|
||||
.batched(1 << 16)?;
|
||||
|
||||
let mut reference_idxs =
|
||||
Series::new_empty("reference_idxs", &DataType::List(DataType::UInt32.into()));
|
||||
|
||||
let mut bounce = vec![];
|
||||
let runtime = Runtime::new()?;
|
||||
while let Some(batches) = runtime.block_on(pq.next_batches(48))? {
|
||||
batches
|
||||
.into_par_iter()
|
||||
.map(|df| -> ListChunked {
|
||||
df.column("references")
|
||||
.unwrap()
|
||||
.list()
|
||||
.unwrap()
|
||||
.apply_to_inner(&|series: Series| -> PolarsResult<Series> {
|
||||
let series = series.binary()?;
|
||||
let mut out: Vec<u32> = Vec::with_capacity(series.len());
|
||||
|
||||
out.extend(as_fixed_binary::<20>(series).flat_map(|xs| xs).map(|key| {
|
||||
let hash = hash64(&key);
|
||||
ph_map
|
||||
.find(hash, |&(candidate_hash, candidate_index)| {
|
||||
candidate_hash == hash
|
||||
&& &ph_array[candidate_index as usize] == key
|
||||
})
|
||||
.map(|&(_, index)| index)
|
||||
.unwrap_or(INDEX_NULL)
|
||||
}));
|
||||
|
||||
Ok(Series::from_vec("reference_idxs", out))
|
||||
})
|
||||
.unwrap()
|
||||
let ph_to_idx = |key: &[u8; 20]| -> u32 {
|
||||
let hash = hash64(key);
|
||||
ph_map
|
||||
.find(hash, |&(candidate_hash, candidate_index)| {
|
||||
candidate_hash == hash && &ph_array[candidate_index as usize] == key
|
||||
})
|
||||
.collect_into_vec(&mut bounce);
|
||||
.map(|&(_, index)| index)
|
||||
.unwrap_or(INDEX_NULL)
|
||||
};
|
||||
|
||||
for batch in bounce.drain(..) {
|
||||
reference_idxs.append(&batch.into_series())?;
|
||||
}
|
||||
}
|
||||
eprintln!("{DONE}");
|
||||
|
||||
eprint!("… writing output\r");
|
||||
ParquetWriter::new(File::create("narinfo-references.parquet")?).finish(&mut df! {
|
||||
"reference_idxs" => reference_idxs,
|
||||
}?)?;
|
||||
eprint!("… swizzle references\r");
|
||||
LazyFrame::scan_parquet("narinfo.parquet", ScanArgsParquet::default())?
|
||||
.with_column(
|
||||
col("references")
|
||||
.map(
|
||||
move |series: Series| -> PolarsResult<Option<Series>> {
|
||||
Ok(Some(
|
||||
series
|
||||
.list()?
|
||||
.apply_to_inner(&|series: Series| -> PolarsResult<Series> {
|
||||
let series = series.binary()?;
|
||||
let mut out: Vec<u32> = Vec::with_capacity(series.len());
|
||||
out.extend(as_fixed_binary(series).flatten().map(ph_to_idx));
|
||||
Ok(Series::from_vec("reference_idxs", out))
|
||||
})?
|
||||
.into_series(),
|
||||
))
|
||||
},
|
||||
SpecialEq::from_type(DataType::List(DataType::UInt32.into())),
|
||||
)
|
||||
.alias("reference_idxs"),
|
||||
)
|
||||
.select([col("reference_idxs")])
|
||||
.with_streaming(true)
|
||||
.sink_parquet(
|
||||
"narinfo-references.parquet".into(),
|
||||
ParquetWriteOptions::default(),
|
||||
)?;
|
||||
eprintln!("{DONE}");
|
||||
|
||||
Ok(())
|
||||
|
|
|
|||
|
|
@ -1,7 +1,13 @@
|
|||
use anyhow::Result;
|
||||
use owning_ref::ArcRef;
|
||||
use owning_ref::{ArcRef, OwningRef};
|
||||
use rayon::prelude::*;
|
||||
use std::{fs::File, ops::Range, slice};
|
||||
use std::{
|
||||
fs::File,
|
||||
mem,
|
||||
ops::{Deref, Range},
|
||||
slice,
|
||||
sync::Arc,
|
||||
};
|
||||
|
||||
use polars::{
|
||||
datatypes::BinaryChunked,
|
||||
|
|
@ -24,6 +30,16 @@ pub fn hash64(h: &[u8; 20]) -> u64 {
|
|||
u64::from_ne_bytes(buf)
|
||||
}
|
||||
|
||||
pub fn leak<O, T: ?Sized>(r: OwningRef<Arc<O>, T>) -> &T {
|
||||
// SAFETY: Either `ptr` points into the `Arc`, which lives until `r` is dropped,
|
||||
// or it points at something else entirely which lives at least as long.
|
||||
unsafe {
|
||||
let ptr: *const T = r.deref();
|
||||
mem::forget(r);
|
||||
&*ptr
|
||||
}
|
||||
}
|
||||
|
||||
/// Read a dense `store_path_hash` array from `narinfo.parquet`,
|
||||
/// returning it as an owned [FixedBytes].
|
||||
pub fn load_ph_array() -> Result<FixedBytes<20>> {
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue