feat(tvix/store): Add CAHash validation

Validation is done inside ingest_nar_and_hash and
is used by Fetch::NAR and the nar-bridge.

Change-Id: I7e2be4cc13d2447035f1e5a444f44b62339988bf
Reviewed-on: https://cl.tvl.fyi/c/depot/+/12836
Tested-by: BuildkiteCI
Reviewed-by: flokli <flokli@flokli.de>
This commit is contained in:
Vova Kryachko 2024-11-19 17:56:16 -05:00 committed by Vladimir Kryachko
parent ae76eaa761
commit 8ef9ba82a8
9 changed files with 300 additions and 62 deletions

View file

@ -0,0 +1,101 @@
use std::{
io::Result,
pin::Pin,
task::{ready, Context, Poll},
};
use md5::{digest::DynDigest, Digest};
use nix_compat::nixhash::{HashAlgo, NixHash};
use pin_project_lite::pin_project;
use tokio::io::{AsyncRead, ReadBuf};
pin_project! {
/// AsyncRead implementation with a type-erased hasher.
///
/// After it's read the bytes from the underlying reader, it can
/// produce the NixHash value corresponding to the Digest that it's been
/// constructed with.
///
/// Because we are type-erasing the underlying Digest, it uses dynamic dispatch
/// and boxing. While it may seem like it could be slow, in practice it's used
/// in IO-bound workloads so the slowdown should be negligible.
///
/// On the other hand it greatly improves ergonomics of using different hashing
/// algorithms and retrieving the corresponding NixHash values.
pub struct HashingReader<R> {
#[pin]
reader: R,
digest: Box<dyn ToHash>,
}
}
/// Utility trait that simplifies digesting different hashes.
///
/// The main benefit is that each corresponding impl produces its corresponding
/// NixHash value as opposed to a lower level byte slice.
trait ToHash: DynDigest + Send {
fn consume(self: Box<Self>) -> NixHash;
}
impl ToHash for sha1::Sha1 {
fn consume(self: Box<Self>) -> NixHash {
NixHash::Sha1(self.finalize().to_vec().try_into().expect("Tvix bug"))
}
}
impl ToHash for sha2::Sha256 {
fn consume(self: Box<Self>) -> NixHash {
NixHash::Sha256(self.finalize().to_vec().try_into().expect("Tvix bug"))
}
}
impl ToHash for sha2::Sha512 {
fn consume(self: Box<Self>) -> NixHash {
NixHash::Sha512(Box::new(
self.finalize().to_vec().try_into().expect("Tvix bug"),
))
}
}
impl ToHash for md5::Md5 {
fn consume(self: Box<Self>) -> NixHash {
NixHash::Md5(self.finalize().to_vec().try_into().expect("Tvix bug"))
}
}
impl<R> HashingReader<R> {
/// Given a NixHash, creates a HashingReader that uses the same hashing algorithm.
pub fn new_with_algo(algo: HashAlgo, reader: R) -> Self {
match algo {
HashAlgo::Md5 => HashingReader::new::<md5::Md5>(reader),
HashAlgo::Sha1 => HashingReader::new::<sha1::Sha1>(reader),
HashAlgo::Sha256 => HashingReader::new::<sha2::Sha256>(reader),
HashAlgo::Sha512 => HashingReader::new::<sha2::Sha512>(reader),
}
}
fn new<D: ToHash + Digest + 'static>(reader: R) -> Self {
HashingReader {
reader,
digest: Box::new(D::new()),
}
}
/// Returns the [`NixHash`] of the data that's been read from this reader.
pub fn consume(self) -> NixHash {
self.digest.consume()
}
}
impl<R: AsyncRead> AsyncRead for HashingReader<R> {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<Result<()>> {
let me = self.project();
let filled_length = buf.filled().len();
ready!(me.reader.poll_read(cx, buf))?;
me.digest.update(&buf.filled()[filled_length..]);
Poll::Ready(Ok(()))
}
}

View file

@ -1,4 +1,7 @@
use nix_compat::nar::reader::r#async as nar_reader;
use nix_compat::{
nar::reader::r#async as nar_reader,
nixhash::{CAHash, NixHash},
};
use sha2::Digest;
use tokio::{
io::{AsyncBufRead, AsyncRead},
@ -15,6 +18,24 @@ use tvix_castore::{
Node, PathBuf,
};
use super::hashing_reader::HashingReader;
/// Represents errors that can happen during nar ingestion.
#[derive(Debug, thiserror::Error)]
pub enum NarIngestionError {
#[error("{0}")]
IngestionError(#[from] IngestionError<Error>),
#[error("Hash mismatch, expected: {expected}, got: {actual}.")]
HashMismatch { expected: NixHash, actual: NixHash },
#[error("Expected the nar to contain a single file.")]
TypeMismatch,
#[error("Ingestion failed: {0}")]
Io(#[from] std::io::Error),
}
/// Ingests the contents from a [AsyncRead] providing NAR into the tvix store,
/// interacting with a [BlobService] and [DirectoryService].
/// Returns the castore root node, as well as the sha256 and size of the NAR
@ -23,7 +44,8 @@ pub async fn ingest_nar_and_hash<R, BS, DS>(
blob_service: BS,
directory_service: DS,
r: &mut R,
) -> Result<(Node, [u8; 32], u64), IngestionError<Error>>
expected_cahash: &Option<CAHash>,
) -> Result<(Node, [u8; 32], u64), NarIngestionError>
where
R: AsyncRead + Unpin + Send,
BS: BlobService + Clone + 'static,
@ -33,20 +55,65 @@ where
let mut nar_size = 0;
// Assemble NarHash and NarSize as we read bytes.
let r = tokio_util::io::InspectReader::new(r, |b| {
let mut r = tokio_util::io::InspectReader::new(r, |b| {
nar_size += b.len() as u64;
use std::io::Write;
nar_hash.write_all(b).unwrap();
nar_hash.update(b);
});
// HACK: InspectReader doesn't implement AsyncBufRead.
// See if this can be propagated through and we can then require our input
// reader to be buffered too.
let mut r = tokio::io::BufReader::new(r);
match expected_cahash {
Some(CAHash::Nar(expected_hash)) => {
// We technically don't need the Sha256 hasher as we are already computing the nar hash with the reader above,
// but it makes the control flow more uniform and easier to understand.
let mut ca_reader = HashingReader::new_with_algo(expected_hash.algo(), &mut r);
let mut r = tokio::io::BufReader::new(&mut ca_reader);
let root_node = ingest_nar(blob_service, directory_service, &mut r).await?;
let actual_hash = ca_reader.consume();
let root_node = ingest_nar(blob_service, directory_service, &mut r).await?;
if actual_hash != *expected_hash {
return Err(NarIngestionError::HashMismatch {
expected: expected_hash.clone(),
actual: actual_hash,
});
}
Ok((root_node, nar_hash.finalize().into(), nar_size))
}
Some(CAHash::Flat(expected_hash)) => {
let mut r = tokio::io::BufReader::new(&mut r);
let root_node = ingest_nar(blob_service.clone(), directory_service, &mut r).await?;
match &root_node {
Node::File { digest, .. } => match blob_service.open_read(digest).await? {
Some(blob_reader) => {
let mut ca_reader =
HashingReader::new_with_algo(expected_hash.algo(), blob_reader);
tokio::io::copy(&mut ca_reader, &mut tokio::io::empty()).await?;
let actual_hash = ca_reader.consume();
Ok((root_node, nar_hash.finalize().into(), nar_size))
if actual_hash != *expected_hash {
return Err(NarIngestionError::HashMismatch {
expected: expected_hash.clone(),
actual: actual_hash,
});
}
Ok((root_node, nar_hash.finalize().into(), nar_size))
}
None => Err(NarIngestionError::Io(std::io::Error::other(
"Ingested data not found",
))),
},
_ => Err(NarIngestionError::TypeMismatch),
}
}
// We either got CAHash::Text, or no CAHash at all, so we just don't do any additional
// hash calculation/validation.
// FUTUREWORK: We should figure out what to do with CAHash::Text, according to nix-cpp
// they don't handle it either:
// https://github.com/NixOS/nix/blob/3e9cc78eb5e5c4f1e762e201856273809fd92e71/src/libstore/local-store.cc#L1099-L1133
_ => {
let mut r = tokio::io::BufReader::new(&mut r);
let root_node = ingest_nar(blob_service, directory_service, &mut r).await?;
Ok((root_node, nar_hash.finalize().into(), nar_size))
}
}
}
/// Ingests the contents from a [AsyncRead] providing NAR into the tvix store,
@ -162,10 +229,12 @@ pub enum Error {
#[cfg(test)]
mod test {
use crate::nar::ingest_nar;
use crate::nar::{ingest_nar, ingest_nar_and_hash, NarIngestionError};
use std::io::Cursor;
use std::sync::Arc;
use hex_literal::hex;
use nix_compat::nixhash::{CAHash, NixHash};
use rstest::*;
use tokio_stream::StreamExt;
use tvix_castore::blobservice::BlobService;
@ -267,4 +336,77 @@ mod test {
assert_eq!(DIRECTORY_COMPLICATED.clone(), directories[0]);
assert_eq!(DIRECTORY_WITH_KEEP.clone(), directories[1]);
}
#[rstest]
#[case::nar_sha256(Some(CAHash::Nar(NixHash::Sha256(hex!("fbd52279a8df024c9fd5718de4103bf5e760dc7f2cf49044ee7dea87ab16911a")))), &NAR_CONTENTS_COMPLICATED.clone())]
#[case::nar_sha512(Some(CAHash::Nar(NixHash::Sha512(Box::new(hex!("ff5d43941411f35f09211f8596b426ee6e4dd3af1639e0ed2273cbe44b818fc4a59e3af02a057c5b18fbfcf435497de5f1994206c137f469b3df674966a922f0"))))), &NAR_CONTENTS_COMPLICATED.clone())]
#[case::flat_md5(Some(CAHash::Flat(NixHash::Md5(hex!("fd076287532e86365e841e92bfc50d8c")))), &NAR_CONTENTS_HELLOWORLD.clone(), )]
#[case::nar_symlink_sha1(Some(CAHash::Nar(NixHash::Sha1(hex!("f24eeaaa9cc016bab030bf007cb1be6483e7ba9e")))), &NAR_CONTENTS_SYMLINK.clone())]
#[tokio::test]
async fn ingest_with_cahash_mismatch(
blob_service: Arc<dyn BlobService>,
directory_service: Arc<dyn DirectoryService>,
#[case] ca_hash: Option<CAHash>,
#[case] nar_content: &Vec<u8>,
) {
let err = ingest_nar_and_hash(
blob_service.clone(),
directory_service.clone(),
&mut Cursor::new(nar_content),
&ca_hash,
)
.await
.expect_err("Ingestion should have failed");
assert!(
matches!(err, NarIngestionError::HashMismatch { .. }),
"CAHash should have mismatched"
);
}
#[rstest]
#[case::nar_sha256(Some(CAHash::Nar(NixHash::Sha256(hex!("ebd52279a8df024c9fd5718de4103bf5e760dc7f2cf49044ee7dea87ab16911a")))), &NAR_CONTENTS_COMPLICATED.clone())]
#[case::nar_sha512(Some(CAHash::Nar(NixHash::Sha512(Box::new(hex!("1f5d43941411f35f09211f8596b426ee6e4dd3af1639e0ed2273cbe44b818fc4a59e3af02a057c5b18fbfcf435497de5f1994206c137f469b3df674966a922f0"))))), &NAR_CONTENTS_COMPLICATED.clone())]
#[case::flat_md5(Some(CAHash::Flat(NixHash::Md5(hex!("ed076287532e86365e841e92bfc50d8c")))), &NAR_CONTENTS_HELLOWORLD.clone())]
#[case::nar_symlink_sha1(Some(CAHash::Nar(NixHash::Sha1(hex!("424eeaaa9cc016bab030bf007cb1be6483e7ba9e")))), &NAR_CONTENTS_SYMLINK.clone())]
#[tokio::test]
async fn ingest_with_cahash_correct(
blob_service: Arc<dyn BlobService>,
directory_service: Arc<dyn DirectoryService>,
#[case] ca_hash: Option<CAHash>,
#[case] nar_content: &Vec<u8>,
) {
let _ = ingest_nar_and_hash(
blob_service.clone(),
directory_service,
&mut Cursor::new(nar_content),
&ca_hash,
)
.await
.expect("CAHash should have matched");
}
#[rstest]
#[case::nar_sha256(Some(CAHash::Flat(NixHash::Sha256(hex!("ebd52279a8df024c9fd5718de4103bf5e760dc7f2cf49044ee7dea87ab16911a")))), &NAR_CONTENTS_COMPLICATED.clone())]
#[case::nar_symlink_sha1(Some(CAHash::Flat(NixHash::Sha1(hex!("424eeaaa9cc016bab030bf007cb1be6483e7ba9e")))), &NAR_CONTENTS_SYMLINK.clone())]
#[tokio::test]
async fn ingest_with_flat_non_file(
blob_service: Arc<dyn BlobService>,
directory_service: Arc<dyn DirectoryService>,
#[case] ca_hash: Option<CAHash>,
#[case] nar_content: &Vec<u8>,
) {
let err = ingest_nar_and_hash(
blob_service,
directory_service,
&mut Cursor::new(nar_content),
&ca_hash,
)
.await
.expect_err("Ingestion should have failed");
assert!(
matches!(err, NarIngestionError::TypeMismatch),
"Flat cahash should only be allowed for single file nars"
);
}
}

View file

@ -1,11 +1,11 @@
use tonic::async_trait;
use tvix_castore::B3Digest;
mod hashing_reader;
mod import;
mod renderer;
pub mod seekable;
pub use import::ingest_nar;
pub use import::ingest_nar_and_hash;
pub use import::{ingest_nar, ingest_nar_and_hash, NarIngestionError};
pub use renderer::calculate_size_and_sha256;
pub use renderer::write_nar;
pub use renderer::SimpleRenderer;

View file

@ -204,6 +204,7 @@ where
self.blob_service.clone(),
self.directory_service.clone(),
&mut r,
&narinfo.ca,
)
.await
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;