chore(users/edef): move to contrib

Change-Id: I1a6972fab8ada26917f29607fc401e376d634070
This commit is contained in:
Florian Klink 2025-03-17 12:41:31 +00:00
parent a7916624dc
commit 403d8fc897
55 changed files with 15 additions and 17 deletions

View file

@ -0,0 +1,155 @@
//! This tool lossily converts a Sled database produced by crunch-v2 into a Parquet file for analysis.
//! The resulting `crunch.parquet` has columns file_hash`, `nar_hash`, and `chunk`.
//! The first two are SHA-256 hashes of the compressed file and the NAR it decompresses to.
//! `chunk` is a struct array corresponding to [crunch_v2::proto::Chunk] messages.
//! They are concatenated without any additional structure, so nothing but the chunk list is preserved.
use anyhow::Result;
use clap::Parser;
use indicatif::{ProgressBar, ProgressStyle};
use std::fs::File;
use std::path::PathBuf;
use crunch_v2::proto::{self, path::Node};
use prost::Message;
use polars::{
chunked_array::builder::AnonymousOwnedListBuilder,
prelude::{
df, BinaryChunkedBuilder, ChunkedBuilder, DataFrame, DataType, Field, ListBuilderTrait,
NamedFrom, ParquetWriter, PrimitiveChunkedBuilder, Series, UInt32Type,
},
series::IntoSeries,
};
#[derive(Parser)]
struct Args {
/// Path to the sled database that's read from.
#[clap(default_value = "crunch.db")]
infile: PathBuf,
/// Path to the resulting parquet file that's written.
#[clap(default_value = "crunch.parquet")]
outfile: PathBuf,
}
fn main() -> Result<()> {
let args = Args::parse();
let w = ParquetWriter::new(File::create(args.outfile)?);
let db: sled::Db = sled::open(&args.infile).unwrap();
let files_tree: sled::Tree = db.open_tree("files").unwrap();
let progress =
ProgressBar::new(files_tree.len() as u64).with_style(ProgressStyle::with_template(
"{elapsed_precise}/{duration_precise} {wide_bar} {pos}/{len}",
)?);
let mut frame = FrameBuilder::new();
for entry in &files_tree {
let (file_hash, pb) = entry?;
frame.push(
file_hash[..].try_into().unwrap(),
proto::Path::decode(&pb[..])?,
);
progress.inc(1);
}
w.finish(&mut frame.finish())?;
Ok(())
}
struct FrameBuilder {
file_hash: BinaryChunkedBuilder,
nar_hash: BinaryChunkedBuilder,
chunk: AnonymousOwnedListBuilder,
}
impl FrameBuilder {
fn new() -> Self {
Self {
file_hash: BinaryChunkedBuilder::new("file_hash", 0, 0),
nar_hash: BinaryChunkedBuilder::new("nar_hash", 0, 0),
chunk: AnonymousOwnedListBuilder::new(
"chunk",
0,
Some(DataType::Struct(vec![
Field::new("hash", DataType::Binary),
Field::new("size", DataType::UInt32),
Field::new("size_compressed", DataType::UInt32),
])),
),
}
}
fn push(&mut self, file_hash: [u8; 32], pb: proto::Path) {
self.file_hash.append_value(&file_hash[..]);
self.nar_hash.append_value(pb.nar_hash);
self.chunk
.append_series(&ChunkFrameBuilder::new(pb.node.unwrap()))
.unwrap();
}
fn finish(mut self) -> DataFrame {
df! {
"file_hash" => self.file_hash.finish().into_series(),
"nar_hash" => self.nar_hash.finish().into_series(),
"chunk" => self.chunk.finish().into_series()
}
.unwrap()
}
}
struct ChunkFrameBuilder {
hash: BinaryChunkedBuilder,
size: PrimitiveChunkedBuilder<UInt32Type>,
size_compressed: PrimitiveChunkedBuilder<UInt32Type>,
}
impl ChunkFrameBuilder {
fn new(node: proto::path::Node) -> Series {
let mut this = Self {
hash: BinaryChunkedBuilder::new("hash", 0, 0),
size: PrimitiveChunkedBuilder::new("size", 0),
size_compressed: PrimitiveChunkedBuilder::new("size_compressed", 0),
};
this.push(node);
this.finish()
}
fn push(&mut self, node: Node) {
match node {
Node::Directory(node) => {
for node in node.files {
self.push(Node::File(node));
}
for node in node.directories {
self.push(Node::Directory(node));
}
}
Node::File(node) => {
for chunk in node.chunks {
self.hash.append_value(&chunk.hash);
self.size.append_value(chunk.size);
self.size_compressed.append_value(chunk.size_compressed);
}
}
Node::Symlink(_) => {}
}
}
fn finish(self) -> Series {
df! {
"hash" => self.hash.finish().into_series(),
"size" => self.size.finish().into_series(),
"size_compressed" => self.size_compressed.finish().into_series()
}
.unwrap()
.into_struct("chunk")
.into_series()
}
}

View file

@ -0,0 +1,3 @@
pub mod proto {
include!(concat!(env!("OUT_DIR"), "/snix.flatstore.v1.rs"));
}

View file

@ -0,0 +1,309 @@
//! This is a tool for ingesting subsets of cache.nixos.org into its own flattened castore format.
//! Currently, produced chunks are not preserved, and this purely serves as a way of measuring
//! compression/deduplication ratios for various chunking and compression parameters.
//!
//! NARs to be ingested are read from `ingest.parquet`, and filtered by an SQL expression provided as a program argument.
//! The `file_hash` column should contain SHA-256 hashes of the compressed data, corresponding to the `FileHash` narinfo field.
//! The `compression` column should contain either `"bzip2"` or `"xz"`, corresponding to the `Compression` narinfo field.
//! Additional columns are ignored, but can be used by the SQL filter expression.
//!
//! flatstore protobufs are written to a sled database named `crunch.db`, addressed by file hash.
use crunch_v2::proto;
mod remote;
use anyhow::Result;
use clap::Parser;
use futures::{stream, StreamExt, TryStreamExt};
use indicatif::{ProgressBar, ProgressStyle};
use std::{
io::{self, BufRead, Read, Write},
path::PathBuf,
ptr,
};
use polars::{
prelude::{col, LazyFrame, ScanArgsParquet},
sql::sql_expr,
};
use fastcdc::v2020::{ChunkData, StreamCDC};
use nix_compat::nar::reader as nar;
use digest::Digest;
use prost::Message;
use sha2::Sha256;
#[derive(Parser)]
struct Args {
/// Path to an existing parquet file.
/// The `file_hash` column should contain SHA-256 hashes of the compressed
/// data, corresponding to the `FileHash` narinfo field.
/// The `compression` column should contain either `"bzip2"` or `"xz"`,
/// corresponding to the `Compression` narinfo field.
/// Additional columns are ignored, but can be used by the SQL filter expression.
#[clap(long, default_value = "ingest.parquet")]
infile: PathBuf,
/// Filter expression to filter elements in the parquet file for.
filter: String,
/// Average chunk size for FastCDC, in KiB.
/// min value is half, max value double of that number.
#[clap(long, default_value_t = 256)]
avg_chunk_size: u32,
/// Path to the sled database where results are written to (flatstore
/// protobufs, addressed by file hash).
#[clap(long, default_value = "crunch.db")]
outfile: PathBuf,
}
#[tokio::main]
async fn main() -> Result<()> {
let args = Args::parse();
let filter = sql_expr(args.filter)?;
let avg_chunk_size = args.avg_chunk_size * 1024;
let df = LazyFrame::scan_parquet(&args.infile, ScanArgsParquet::default())?
.filter(filter)
.select([col("file_hash"), col("compression")])
.drop_nulls(None)
.collect()?;
let progress = ProgressBar::new(df.height() as u64).with_style(ProgressStyle::with_template(
"{elapsed_precise}/{duration_precise} {wide_bar} {pos}/{len}",
)?);
let file_hash = df
.column("file_hash")?
.binary()?
.into_iter()
.map(|h| -> [u8; 32] { h.unwrap().try_into().unwrap() });
let compression = df
.column("compression")?
.utf8()?
.into_iter()
.map(|c| c.unwrap());
let db: sled::Db = sled::open(args.outfile).unwrap();
let files_tree = db.open_tree("files").unwrap();
let res = stream::iter(file_hash.zip(compression))
.map(Ok)
.try_for_each_concurrent(Some(16), |(file_hash, compression)| {
let progress = progress.clone();
let files_tree = files_tree.clone();
async move {
if files_tree.contains_key(&file_hash)? {
progress.inc(1);
return Ok(());
}
let reader = remote::nar(file_hash, compression).await?;
tokio::task::spawn_blocking(move || {
let mut reader = Sha256Reader::from(reader);
let path =
ingest(nar::open(&mut reader)?, vec![], avg_chunk_size).map(|node| {
proto::Path {
nar_hash: reader.finalize().as_slice().into(),
node: Some(node),
}
})?;
files_tree.insert(file_hash, path.encode_to_vec())?;
progress.inc(1);
Ok::<_, anyhow::Error>(())
})
.await?
}
})
.await;
let flush = files_tree.flush_async().await;
res?;
flush?;
Ok(())
}
fn ingest(node: nar::Node, name: Vec<u8>, avg_chunk_size: u32) -> Result<proto::path::Node> {
match node {
nar::Node::Symlink { target } => Ok(proto::path::Node::Symlink(proto::SymlinkNode {
name,
target,
})),
nar::Node::Directory(mut reader) => {
let mut directories = vec![];
let mut files = vec![];
let mut symlinks = vec![];
while let Some(node) = reader.next()? {
match ingest(node.node, node.name.to_owned(), avg_chunk_size)? {
proto::path::Node::Directory(node) => {
directories.push(node);
}
proto::path::Node::File(node) => {
files.push(node);
}
proto::path::Node::Symlink(node) => {
symlinks.push(node);
}
}
}
Ok(proto::path::Node::Directory(proto::DirectoryNode {
name,
directories,
files,
symlinks,
}))
}
nar::Node::File { executable, reader } => {
let mut reader = B3Reader::from(reader);
let mut chunks = vec![];
for chunk in StreamCDC::new(
&mut reader,
avg_chunk_size / 2,
avg_chunk_size,
avg_chunk_size * 2,
) {
let ChunkData {
length: size, data, ..
} = chunk?;
let hash = blake3::hash(&data);
let size_compressed = zstd_size(&data, 9);
chunks.push(proto::Chunk {
hash: hash.as_bytes().as_slice().into(),
size: size.try_into().unwrap(),
size_compressed: size_compressed.try_into().unwrap(),
});
}
Ok(proto::path::Node::File(proto::FileNode {
name,
hash: reader.finalize().as_bytes().as_slice().into(),
chunks,
executable,
}))
}
}
}
struct Sha256Reader<R> {
inner: R,
hasher: Sha256,
buf: *const [u8],
}
const ZERO_BUF: *const [u8] = ptr::slice_from_raw_parts(1 as *const u8, 0);
unsafe impl<R: Send> Send for Sha256Reader<R> {}
impl<R> From<R> for Sha256Reader<R> {
fn from(value: R) -> Self {
Self {
inner: value,
hasher: Sha256::new(),
buf: ZERO_BUF,
}
}
}
impl<R: Read> Read for Sha256Reader<R> {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
self.buf = ZERO_BUF;
let n = self.inner.read(buf)?;
self.hasher.update(&buf[..n]);
Ok(n)
}
}
impl<R: BufRead> BufRead for Sha256Reader<R> {
fn fill_buf(&mut self) -> io::Result<&[u8]> {
self.buf = ZERO_BUF;
let buf = self.inner.fill_buf()?;
self.buf = buf as *const [u8];
Ok(buf)
}
fn consume(&mut self, amt: usize) {
// UNSAFETY: This assumes that `R::consume` doesn't invalidate the buffer.
// That's not a sound assumption in general, though it is likely to hold.
// TODO(edef): refactor this codebase to write a fresh NAR for verification purposes
// we already buffer full chunks, so there's no pressing need to reuse the input buffers
unsafe {
let (head, buf) = (*self.buf).split_at(amt);
self.buf = buf as *const [u8];
self.hasher.update(head);
self.inner.consume(amt);
}
}
}
impl<R> Sha256Reader<R> {
fn finalize(self) -> [u8; 32] {
self.hasher.finalize().into()
}
}
struct B3Reader<R> {
inner: R,
hasher: blake3::Hasher,
}
impl<R> From<R> for B3Reader<R> {
fn from(value: R) -> Self {
Self {
inner: value,
hasher: blake3::Hasher::new(),
}
}
}
impl<R: Read> Read for B3Reader<R> {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
let n = self.inner.read(buf)?;
self.hasher.update(&buf[..n]);
Ok(n)
}
}
impl<R> B3Reader<R> {
fn finalize(self) -> blake3::Hash {
self.hasher.finalize()
}
}
fn zstd_size(data: &[u8], level: i32) -> u64 {
let mut w = zstd::Encoder::new(CountingWriter::default(), level).unwrap();
w.write_all(&data).unwrap();
let CountingWriter(size) = w.finish().unwrap();
size
}
#[derive(Default)]
struct CountingWriter(u64);
impl Write for CountingWriter {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.0 += buf.len() as u64;
Ok(buf.len())
}
fn flush(&mut self) -> io::Result<()> {
Ok(())
}
}

View file

@ -0,0 +1,211 @@
use std::{
cmp,
io::{self, BufRead, BufReader, Read},
pin::Pin,
task::{self, Poll},
};
use anyhow::{bail, Result};
use bytes::{Buf, Bytes};
use futures::{future::BoxFuture, Future, FutureExt, Stream, StreamExt};
use lazy_static::lazy_static;
use tokio::runtime::Handle;
use nix_compat::nixbase32;
use rusoto_core::{ByteStream, Region};
use rusoto_s3::{GetObjectOutput, GetObjectRequest, S3Client, S3};
use bzip2::read::BzDecoder;
use xz2::read::XzDecoder;
lazy_static! {
static ref S3_CLIENT: S3Client = S3Client::new(Region::UsEast1);
}
const BUCKET: &str = "nix-cache";
pub async fn nar(
file_hash: [u8; 32],
compression: &str,
) -> Result<Box<BufReader<dyn Read + Send>>> {
let (extension, decompress): (&'static str, fn(_) -> Box<_>) = match compression {
"bzip2" => ("bz2", decompress_bz2),
"xz" => ("xz", decompress_xz),
_ => bail!("unknown compression: {compression}"),
};
Ok(decompress(
FileStream::new(FileKey {
file_hash,
extension,
})
.await?
.into(),
))
}
fn decompress_xz(reader: FileStreamReader) -> Box<BufReader<dyn Read + Send>> {
Box::new(BufReader::new(XzDecoder::new(reader)))
}
fn decompress_bz2(reader: FileStreamReader) -> Box<BufReader<dyn Read + Send>> {
Box::new(BufReader::new(BzDecoder::new(reader)))
}
struct FileStreamReader {
inner: FileStream,
buffer: Bytes,
}
impl From<FileStream> for FileStreamReader {
fn from(value: FileStream) -> Self {
FileStreamReader {
inner: value,
buffer: Bytes::new(),
}
}
}
impl Read for FileStreamReader {
fn read(&mut self, dst: &mut [u8]) -> io::Result<usize> {
let src = self.fill_buf()?;
let n = cmp::min(src.len(), dst.len());
dst[..n].copy_from_slice(&src[..n]);
self.consume(n);
Ok(n)
}
}
impl BufRead for FileStreamReader {
fn fill_buf(&mut self) -> io::Result<&[u8]> {
if !self.buffer.is_empty() {
return Ok(&self.buffer);
}
self.buffer = Handle::current()
.block_on(self.inner.next())
.transpose()?
.unwrap_or_default();
Ok(&self.buffer)
}
fn consume(&mut self, cnt: usize) {
self.buffer.advance(cnt);
}
}
struct FileKey {
file_hash: [u8; 32],
extension: &'static str,
}
impl FileKey {
fn get(
&self,
offset: u64,
e_tag: Option<&str>,
) -> impl Future<Output = io::Result<GetObjectOutput>> + Send + 'static {
let input = GetObjectRequest {
bucket: BUCKET.to_string(),
key: format!(
"nar/{}.nar.{}",
nixbase32::encode(&self.file_hash),
self.extension
),
if_match: e_tag.map(str::to_owned),
range: Some(format!("bytes {}-", offset + 1)),
..Default::default()
};
async {
S3_CLIENT
.get_object(input)
.await
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))
}
}
}
struct FileStream {
key: FileKey,
e_tag: String,
offset: u64,
length: u64,
inner: FileStreamState,
}
enum FileStreamState {
Response(BoxFuture<'static, io::Result<GetObjectOutput>>),
Body(ByteStream),
Eof,
}
impl FileStream {
pub async fn new(key: FileKey) -> io::Result<Self> {
let resp = key.get(0, None).await?;
Ok(FileStream {
key,
e_tag: resp.e_tag.unwrap(),
offset: 0,
length: resp.content_length.unwrap().try_into().unwrap(),
inner: FileStreamState::Body(resp.body.unwrap()),
})
}
}
macro_rules! poll {
($expr:expr) => {
match $expr {
Poll::Pending => {
return Poll::Pending;
}
Poll::Ready(value) => value,
}
};
}
impl Stream for FileStream {
type Item = io::Result<Bytes>;
fn poll_next(self: Pin<&mut Self>, cx: &mut task::Context) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
let chunk = loop {
match &mut this.inner {
FileStreamState::Response(resp) => match poll!(resp.poll_unpin(cx)) {
Err(err) => {
this.inner = FileStreamState::Eof;
return Poll::Ready(Some(Err(err)));
}
Ok(resp) => {
this.inner = FileStreamState::Body(resp.body.unwrap());
}
},
FileStreamState::Body(body) => match poll!(body.poll_next_unpin(cx)) {
None | Some(Err(_)) => {
this.inner = FileStreamState::Response(
this.key.get(this.offset, Some(&this.e_tag)).boxed(),
);
}
Some(Ok(chunk)) => {
break chunk;
}
},
FileStreamState::Eof => {
return Poll::Ready(None);
}
}
};
this.offset += chunk.len() as u64;
if this.offset >= this.length {
this.inner = FileStreamState::Eof;
}
Poll::Ready(Some(Ok(chunk)))
}
}