133 lines
4.2 KiB
Rust
133 lines
4.2 KiB
Rust
use anyhow::Result;
|
|
use owning_ref::{ArcRef, OwningRef};
|
|
use rayon::prelude::*;
|
|
use std::{
|
|
fs::File,
|
|
mem,
|
|
ops::{Deref, Range},
|
|
slice,
|
|
sync::Arc,
|
|
};
|
|
use tracing_indicatif::span_ext::IndicatifSpanExt as _;
|
|
|
|
use polars::{
|
|
datatypes::BinaryChunked,
|
|
export::arrow::array::BinaryArray,
|
|
prelude::{ParquetReader, SerReader},
|
|
};
|
|
|
|
/// An shared `[[u8; N]]` backed by a Polars [Buffer].
|
|
pub type FixedBytes<const N: usize> =
|
|
ArcRef<'static, polars::export::arrow::buffer::Bytes<u8>, [[u8; N]]>;
|
|
|
|
pub const INDEX_NULL: u32 = !0;
|
|
|
|
/// A terrific hash function, turning 20 bytes of cryptographic hash
|
|
/// into 8 bytes of cryptographic hash.
|
|
pub fn hash64(h: &[u8; 20]) -> u64 {
|
|
let mut buf = [0; 8];
|
|
buf.copy_from_slice(&h[..8]);
|
|
u64::from_ne_bytes(buf)
|
|
}
|
|
|
|
pub fn leak<O, T: ?Sized>(r: OwningRef<Arc<O>, T>) -> &T {
|
|
// SAFETY: Either `ptr` points into the `Arc`, which lives until `r` is dropped,
|
|
// or it points at something else entirely which lives at least as long.
|
|
unsafe {
|
|
let ptr: *const T = r.deref();
|
|
mem::forget(r);
|
|
&*ptr
|
|
}
|
|
}
|
|
|
|
/// Read a dense `store_path_hash` array from `narinfo.parquet`,
|
|
/// returning it as an owned [FixedBytes].
|
|
#[tracing::instrument(fields(indicatif.pb_show = tracing::field::Empty))]
|
|
pub fn load_ph_array() -> Result<FixedBytes<20>> {
|
|
let span = tracing::Span::current();
|
|
|
|
span.pb_set_message("load store_path_hash");
|
|
span.pb_start();
|
|
|
|
// TODO(edef): this could use a further pushdown, since polars is more hindrance than help here
|
|
// We know this has to fit in memory (we can't mmap it without further encoding constraints),
|
|
// and we want a single `Vec<[u8; 20]>` of the data.
|
|
let ph_array = into_fixed_binary_rechunk::<20>(
|
|
ParquetReader::new(File::open("narinfo.parquet").unwrap())
|
|
.with_columns(Some(vec!["store_path_hash".into()]))
|
|
.set_rechunk(true)
|
|
.finish()?
|
|
.column("store_path_hash")?
|
|
.binary()?,
|
|
);
|
|
|
|
u32::try_from(ph_array.len()).expect("dataset exceeds 2^32");
|
|
|
|
Ok(ph_array)
|
|
}
|
|
|
|
/// Iterator over `&[[u8; N]]` from a dense [BinaryChunked].
|
|
pub fn as_fixed_binary<const N: usize>(
|
|
chunked: &BinaryChunked,
|
|
) -> impl DoubleEndedIterator<Item = &[[u8; N]]> {
|
|
chunked.downcast_iter().map(|array| {
|
|
let range = assert_fixed_dense::<N>(array);
|
|
exact_chunks(&array.values()[range]).unwrap()
|
|
})
|
|
}
|
|
|
|
/// Convert a dense [BinaryChunked] into a single chunk as [FixedBytes],
|
|
/// without taking a reference to the offsets array and validity bitmap.
|
|
fn into_fixed_binary_rechunk<const N: usize>(chunked: &BinaryChunked) -> FixedBytes<N> {
|
|
let chunked = chunked.rechunk();
|
|
let mut iter = chunked.downcast_iter();
|
|
let array = iter.next().unwrap();
|
|
assert!(iter.next().is_none());
|
|
|
|
let (buf, off, len) = {
|
|
let range = assert_fixed_dense::<N>(array);
|
|
array.values().clone().sliced(range.start, range.len())
|
|
}
|
|
.into_inner();
|
|
|
|
ArcRef::new(buf).map(|bytes| exact_chunks(&bytes[off..off + len]).unwrap())
|
|
}
|
|
|
|
/// Ensures that the supplied Arrow array consists of densely packed bytestrings of length `N`.
|
|
/// In other words, ensure that it is free of nulls, and that the offsets have a fixed stride of `N`.
|
|
#[must_use = "only the range returned is guaranteed to be conformant"]
|
|
fn assert_fixed_dense<const N: usize>(array: &BinaryArray<i64>) -> Range<usize> {
|
|
let null_count = array.validity().map_or(0, |bits| bits.unset_bits());
|
|
if null_count > 0 {
|
|
panic!("null values present");
|
|
}
|
|
|
|
let offsets = array.offsets();
|
|
let length_check = offsets
|
|
.as_slice()
|
|
.par_windows(2)
|
|
.all(|w| (w[1] - w[0]) == N as i64);
|
|
|
|
if !length_check {
|
|
panic!("lengths are inconsistent");
|
|
}
|
|
|
|
(*offsets.first() as usize)..(*offsets.last() as usize)
|
|
}
|
|
|
|
fn exact_chunks<const K: usize>(buf: &[u8]) -> Option<&[[u8; K]]> {
|
|
// SAFETY: We ensure that `buf.len()` is a multiple of K, and there are no alignment requirements.
|
|
unsafe {
|
|
let ptr = buf.as_ptr();
|
|
let len = buf.len();
|
|
|
|
if len % K != 0 {
|
|
return None;
|
|
}
|
|
|
|
let ptr = ptr as *mut [u8; K];
|
|
let len = len / K;
|
|
|
|
Some(slice::from_raw_parts(ptr, len))
|
|
}
|
|
}
|