feat(tvix/tools/crunch-v2): add CLI args

Use clap derive to make the input and output files configurable, as well
as the chunk size parameters.

Change-Id: I02b29126f3bd2c13ba2c6e7e0aa4ff048ff803ed
Reviewed-on: https://cl.tvl.fyi/c/depot/+/10691
Autosubmit: flokli <flokli@flokli.de>
Tested-by: BuildkiteCI
Reviewed-by: edef <edef@edef.eu>
This commit is contained in:
Florian Klink 2024-01-25 14:47:07 +02:00 committed by clbot
parent 4f22203a3a
commit b38be028d9
6 changed files with 885 additions and 34 deletions

View file

@ -5,13 +5,12 @@
//! They are concatenated without any additional structure, so nothing but the chunk list is preserved.
use anyhow::Result;
use clap::Parser;
use indicatif::{ProgressBar, ProgressStyle};
use std::fs::File;
use std::path::PathBuf;
use crunch_v2::{
proto::{self, path::Node},
FILES,
};
use crunch_v2::proto::{self, path::Node};
use prost::Message;
use polars::{
@ -23,15 +22,32 @@ use polars::{
series::IntoSeries,
};
fn main() -> Result<()> {
let w = ParquetWriter::new(File::create("crunch.parquet")?);
#[derive(Parser)]
struct Args {
/// Path to the sled database that's read from.
#[clap(default_value = "crunch.db")]
infile: PathBuf,
let progress = ProgressBar::new(FILES.len() as u64).with_style(ProgressStyle::with_template(
"{elapsed_precise}/{duration_precise} {wide_bar} {pos}/{len}",
)?);
/// Path to the resulting parquet file that's written.
#[clap(default_value = "crunch.parquet")]
outfile: PathBuf,
}
fn main() -> Result<()> {
let args = Args::parse();
let w = ParquetWriter::new(File::create(args.outfile)?);
let db: sled::Db = sled::open(&args.infile).unwrap();
let files_tree: sled::Tree = db.open_tree("files").unwrap();
let progress =
ProgressBar::new(files_tree.len() as u64).with_style(ProgressStyle::with_template(
"{elapsed_precise}/{duration_precise} {wide_bar} {pos}/{len}",
)?);
let mut frame = FrameBuilder::new();
for entry in &*FILES {
for entry in &files_tree {
let (file_hash, pb) = entry?;
frame.push(
file_hash[..].try_into().unwrap(),

View file

@ -1,10 +1,3 @@
use lazy_static::lazy_static;
pub mod proto {
include!(concat!(env!("OUT_DIR"), "/tvix.flatstore.v1.rs"));
}
lazy_static! {
static ref DB: sled::Db = sled::open("crunch.db").unwrap();
pub static ref FILES: sled::Tree = DB.open_tree("files").unwrap();
}

View file

@ -9,16 +9,17 @@
//!
//! flatstore protobufs are written to a sled database named `crunch.db`, addressed by file hash.
use crunch_v2::{proto, FILES};
use crunch_v2::proto;
mod remote;
use anyhow::Result;
use clap::Parser;
use futures::{stream, StreamExt, TryStreamExt};
use indicatif::{ProgressBar, ProgressStyle};
use std::{
env,
io::{self, BufRead, Read, Write},
path::PathBuf,
ptr,
};
@ -34,13 +35,39 @@ use digest::Digest;
use prost::Message;
use sha2::Sha256;
#[derive(Parser)]
struct Args {
/// Path to an existing parquet file.
/// The `file_hash` column should contain SHA-256 hashes of the compressed
/// data, corresponding to the `FileHash` narinfo field.
/// The `compression` column should contain either `"bzip2"` or `"xz"`,
/// corresponding to the `Compression` narinfo field.
/// Additional columns are ignored, but can be used by the SQL filter expression.
#[clap(long, default_value = "ingest.parquet")]
infile: PathBuf,
/// Filter expression to filter elements in the parquet file for.
filter: String,
/// Average chunk size for FastCDC, in KiB.
/// min value is half, max value double of that number.
#[clap(long, default_value_t = 256)]
avg_chunk_size: u32,
/// Path to the sled database where results are written to (flatstore
/// protobufs, addressed by file hash).
#[clap(long, default_value = "crunch.db")]
outfile: PathBuf,
}
#[tokio::main]
async fn main() -> Result<()> {
let mut args = env::args();
args.next().unwrap();
let args = Args::parse();
let filter = sql_expr(args.next().unwrap())?;
let df = LazyFrame::scan_parquet("ingest.parquet", ScanArgsParquet::default())?
let filter = sql_expr(args.filter)?;
let avg_chunk_size = args.avg_chunk_size * 1024;
let df = LazyFrame::scan_parquet(&args.infile, ScanArgsParquet::default())?
.filter(filter)
.select([col("file_hash"), col("compression")])
.drop_nulls(None)
@ -62,12 +89,16 @@ async fn main() -> Result<()> {
.into_iter()
.map(|c| c.unwrap());
let db: sled::Db = sled::open(args.outfile).unwrap();
let files_tree = db.open_tree("files").unwrap();
let res = stream::iter(file_hash.zip(compression))
.map(Ok)
.try_for_each_concurrent(Some(16), |(file_hash, compression)| {
let progress = progress.clone();
let files_tree = files_tree.clone();
async move {
if FILES.contains_key(&file_hash)? {
if files_tree.contains_key(&file_hash)? {
progress.inc(1);
return Ok(());
}
@ -77,12 +108,15 @@ async fn main() -> Result<()> {
tokio::task::spawn_blocking(move || {
let mut reader = Sha256Reader::from(reader);
let path = ingest(nar::open(&mut reader)?, vec![]).map(|node| proto::Path {
nar_hash: reader.finalize().as_slice().into(),
node: Some(node),
})?;
let path =
ingest(nar::open(&mut reader)?, vec![], avg_chunk_size).map(|node| {
proto::Path {
nar_hash: reader.finalize().as_slice().into(),
node: Some(node),
}
})?;
FILES.insert(file_hash, path.encode_to_vec())?;
files_tree.insert(file_hash, path.encode_to_vec())?;
progress.inc(1);
Ok::<_, anyhow::Error>(())
@ -92,7 +126,7 @@ async fn main() -> Result<()> {
})
.await;
let flush = crunch_v2::FILES.flush_async().await;
let flush = files_tree.flush_async().await;
res?;
flush?;
@ -100,7 +134,7 @@ async fn main() -> Result<()> {
Ok(())
}
fn ingest(node: nar::Node, name: Vec<u8>) -> Result<proto::path::Node> {
fn ingest(node: nar::Node, name: Vec<u8>, avg_chunk_size: u32) -> Result<proto::path::Node> {
match node {
nar::Node::Symlink { target } => Ok(proto::path::Node::Symlink(proto::SymlinkNode {
name,
@ -113,7 +147,7 @@ fn ingest(node: nar::Node, name: Vec<u8>) -> Result<proto::path::Node> {
let mut symlinks = vec![];
while let Some(node) = reader.next()? {
match ingest(node.node, node.name)? {
match ingest(node.node, node.name, avg_chunk_size)? {
proto::path::Node::Directory(node) => {
directories.push(node);
}
@ -138,7 +172,12 @@ fn ingest(node: nar::Node, name: Vec<u8>) -> Result<proto::path::Node> {
let mut reader = B3Reader::from(reader);
let mut chunks = vec![];
for chunk in StreamCDC::new(&mut reader, 1 << 17, 1 << 18, 1 << 19) {
for chunk in StreamCDC::new(
&mut reader,
avg_chunk_size / 2,
avg_chunk_size,
avg_chunk_size * 2,
) {
let ChunkData {
length: size, data, ..
} = chunk?;