From 4535824166c9e42304f19c34d56d2281b2b886ab Mon Sep 17 00:00:00 2001 From: Florian Klink Date: Thu, 13 Mar 2025 09:30:44 +0100 Subject: [PATCH] refactor(tvix/glue): move toFile to import builtins MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This does import the contents into the store, so it should belong in there. While moving, I also noticed the code copying to the BlobService can be shared with the one used when importing a blob via import helper. It was a bit hidden - due to the contents being available as a bytes, we used a Cursor and wrote it with tokio::io::copy. However, Cursor implements both AsyncRead and Read, so we can factor out the copying code into a copy_to_blobservice helper function and use it in both places. The output name being wrong error kind arguably is still a bit misplaced here, but that's left for later. Change-Id: Iec3c422c12270ee111f864d2b78c0861f78edfa4 Reviewed-on: https://cl.tvl.fyi/c/depot/+/13254 Reviewed-by: Domen Kožar Tested-by: BuildkiteCI Reviewed-by: raitobezarius --- tvix/glue/src/builtins/derivation.rs | 107 +------------ tvix/glue/src/builtins/import.rs | 220 ++++++++++++++++++++------- 2 files changed, 171 insertions(+), 156 deletions(-) diff --git a/tvix/glue/src/builtins/derivation.rs b/tvix/glue/src/builtins/derivation.rs index 3048fd839..8ee003dc2 100644 --- a/tvix/glue/src/builtins/derivation.rs +++ b/tvix/glue/src/builtins/derivation.rs @@ -168,21 +168,17 @@ fn handle_fixed_output( #[builtins(state = "Rc")] pub(crate) mod derivation_builtins { use std::collections::BTreeMap; - use std::io::Cursor; + + use bstr::ByteSlice; + + use nix_compat::store_path::hash_placeholder; + use tvix_eval::generators::Gen; + use tvix_eval::{NixContext, NixContextElement, NixString}; use crate::builtins::utils::{select_string, strong_importing_coerce_to_string}; use crate::fetchurl::fetchurl_derivation_to_fetch; use super::*; - use bstr::ByteSlice; - use md5::Digest; - use nix_compat::nixhash::CAHash; - use nix_compat::store_path::{build_ca_path, hash_placeholder}; - use sha2::Sha256; - use tvix_castore::Node; - use tvix_eval::generators::Gen; - use tvix_eval::{NixContext, NixContextElement, NixString}; - use tvix_store::pathinfoservice::PathInfo; #[builtin("placeholder")] async fn builtin_placeholder(co: GenCo, input: Value) -> Result { @@ -525,95 +521,4 @@ pub(crate) mod derivation_builtins { Ok(out) } - - #[builtin("toFile")] - async fn builtin_to_file( - state: Rc, - co: GenCo, - name: Value, - content: Value, - ) -> Result { - if name.is_catchable() { - return Ok(name); - } - - if content.is_catchable() { - return Ok(content); - } - - let name = name - .to_str() - .context("evaluating the `name` parameter of builtins.toFile")?; - let content = content - .to_contextful_str() - .context("evaluating the `content` parameter of builtins.toFile")?; - - if content.iter_ctx_derivation().count() > 0 - || content.iter_ctx_single_outputs().count() > 0 - { - return Err(ErrorKind::UnexpectedContext); - } - - let store_path = state.tokio_handle.block_on(async { - // upload contents to the blobservice and create a root node - let mut blob_writer = state.blob_service.open_write().await; - - let mut r = Cursor::new(&content); - - let blob_size = tokio::io::copy(&mut r, &mut blob_writer).await?; - let blob_digest = blob_writer.close().await?; - let ca_hash = CAHash::Text(Sha256::digest(&content).into()); - - let root_node = Node::File { - digest: blob_digest, - size: blob_size, - executable: false, - }; - - // calculate the nar hash - let (nar_size, nar_sha256) = state - .nar_calculation_service - .calculate_nar(&root_node) - .await - .map_err(|e| ErrorKind::TvixError(Rc::new(e)))?; - - // persist via pathinfo service. - state - .path_info_service - .put(PathInfo { - store_path: build_ca_path( - name.to_str()?, - &ca_hash, - content.iter_ctx_plain(), - false, - ) - .map_err(|_e| { - nix_compat::derivation::DerivationError::InvalidOutputName( - name.to_str_lossy().into_owned(), - ) - }) - .map_err(DerivationError::InvalidDerivation)?, - node: root_node, - // assemble references from plain context. - references: content - .iter_ctx_plain() - .map(|elem| StorePath::from_absolute_path(elem.as_bytes())) - .collect::>() - .map_err(|e| ErrorKind::TvixError(Rc::new(e)))?, - nar_size, - nar_sha256, - signatures: vec![], - deriver: None, - ca: Some(ca_hash), - }) - .await - .map_err(|e| ErrorKind::TvixError(Rc::new(e))) - .map(|path_info| path_info.store_path) - })?; - - let abs_path = store_path.to_absolute_path(); - let context: NixContext = NixContextElement::Plain(abs_path.clone()).into(); - - Ok(Value::from(NixString::new_context_from(context, abs_path))) - } } diff --git a/tvix/glue/src/builtins/import.rs b/tvix/glue/src/builtins/import.rs index 83b91165c..bdcfd06ab 100644 --- a/tvix/glue/src/builtins/import.rs +++ b/tvix/glue/src/builtins/import.rs @@ -112,16 +112,63 @@ mod import_builtins { use crate::tvix_store_io::TvixStoreIO; use bstr::ByteSlice; use nix_compat::nixhash::{CAHash, NixHash}; - use nix_compat::store_path::{build_ca_path, StorePathRef}; + use nix_compat::store_path::{build_ca_path, StorePath, StorePathRef}; use sha2::Digest; use std::rc::Rc; use tokio::io::AsyncWriteExt; + use tvix_castore::blobservice::BlobService; use tvix_eval::builtins::coerce_value_to_path; use tvix_eval::generators::Gen; use tvix_eval::{generators::GenCo, ErrorKind, Value}; - use tvix_eval::{FileType, NixContextElement, NixString}; + use tvix_eval::{AddContext, FileType, NixContext, NixContextElement, NixString}; use tvix_store::path_info::PathInfo; + /// Helper function dealing with uploading something from a std::io::Read to + /// the passed [BlobService], returning the B3Digest and size. + /// This function is sync (and uses the tokio handle to block). + /// A sync closure getting a copy of all bytes read can be passed in, + /// allowing to do other hashing where needed. + fn copy_to_blobservice( + tokio_handle: tokio::runtime::Handle, + blob_service: impl BlobService, + mut r: impl std::io::Read, + mut inspect_f: F, + ) -> std::io::Result<(tvix_castore::B3Digest, u64)> + where + F: FnMut(&[u8]), + { + let mut blob_size = 0; + + let mut blob_writer = tokio_handle.block_on(async { blob_service.open_write().await }); + + // read piece by piece and write to blob_writer. + // This is a bit manual due to EvalIO being sync, while the blob writer being async. + { + let mut buf = [0u8; 4096]; + + loop { + // read bytes into buffer, break out if EOF + let len = r.read(&mut buf)?; + if len == 0 { + break; + } + blob_size += len as u64; + + let data = &buf[0..len]; + + // write to blobwriter + tokio_handle.block_on(async { blob_writer.write_all(data).await })?; + + // Call inspect_f + inspect_f(data); + } + + let blob_digest = tokio_handle.block_on(async { blob_writer.close().await })?; + + Ok((blob_digest, blob_size)) + } + } + // This is a helper used by both builtins.path and builtins.filterSource. async fn import_helper( state: Rc, @@ -151,72 +198,43 @@ mod import_builtins { // as that affects the output path calculation. FileType::Regular => { let mut file = state.open(&path)?; + let mut h = (!recursive_ingestion).then(sha2::Sha256::new); - let mut flat_sha256 = (!recursive_ingestion).then(sha2::Sha256::new); - let mut blob_size = 0; - - let mut blob_writer = state - .tokio_handle - .block_on(async { state.blob_service.open_write().await }); - - // read piece by piece and write to blob_writer. - // This is a bit manual due to EvalIO being sync, while everything else async. - { - let mut buf = [0u8; 4096]; - - loop { - // read bytes into buffer, break out if EOF - let len = file.read(&mut buf)?; - if len == 0 { - break; - } - blob_size += len as u64; - - let data = &buf[0..len]; - - // add to blobwriter - state - .tokio_handle - .block_on(async { blob_writer.write_all(data).await })?; - + let (blob_digest, blob_size) = copy_to_blobservice( + state.tokio_handle.clone(), + &state.blob_service, + &mut file, + |data| { // update blob_sha256 if needed. - if let Some(h) = flat_sha256.as_mut() { + if let Some(h) = h.as_mut() { h.update(data) } - } - } + }, + )?; - // close the blob writer, construct the root node and the blob_sha256 (later used for output path calculation) ( Node::File { - digest: state - .tokio_handle - .block_on(async { blob_writer.close().await })?, + digest: blob_digest, size: blob_size, executable: false, }, - { - // If non-recursive ingestion is requested… - if let Some(flat_sha256) = flat_sha256 { - let actual_sha256 = flat_sha256.finalize().into(); + h.map(|h| { + // If non-recursive ingestion was requested, we return that one. + let actual_sha256 = h.finalize().into(); - // compare the recorded flat hash with an upfront one if provided. - if let Some(expected_sha256) = expected_sha256 { - if actual_sha256 != expected_sha256 { - return Err(ImportError::HashMismatch( - path, - NixHash::Sha256(expected_sha256), - NixHash::Sha256(actual_sha256), - ) - .into()); - } + // If an expected hash was provided upfront, compare and bail out. + if let Some(expected_sha256) = expected_sha256 { + if actual_sha256 != expected_sha256 { + return Err(ImportError::HashMismatch( + path.clone(), + NixHash::Sha256(expected_sha256), + NixHash::Sha256(actual_sha256), + )); } - - Some(CAHash::Flat(NixHash::Sha256(actual_sha256))) - } else { - None } - }, + Ok(CAHash::Flat(NixHash::Sha256(actual_sha256))) + }) + .transpose()?, ) } @@ -420,6 +438,98 @@ mod import_builtins { }) } } + + #[builtin("toFile")] + async fn builtin_to_file( + state: Rc, + co: GenCo, + name: Value, + content: Value, + ) -> Result { + if name.is_catchable() { + return Ok(name); + } + + if content.is_catchable() { + return Ok(content); + } + + let name = name + .to_str() + .context("evaluating the `name` parameter of builtins.toFile")?; + let content = content + .to_contextful_str() + .context("evaluating the `content` parameter of builtins.toFile")?; + + if content.iter_ctx_derivation().count() > 0 + || content.iter_ctx_single_outputs().count() > 0 + { + return Err(ErrorKind::UnexpectedContext); + } + + // upload contents to the blobservice and create a root node + let mut h = sha2::Sha256::new(); + let (blob_digest, blob_size) = copy_to_blobservice( + state.tokio_handle.clone(), + &state.blob_service, + std::io::Cursor::new(&content), + |data| h.update(data), + )?; + + let root_node = Node::File { + digest: blob_digest, + size: blob_size, + executable: false, + }; + + // calculate the nar hash + let (nar_size, nar_sha256) = state + .nar_calculation_service + .calculate_nar(&root_node) + .await + .map_err(|e| ErrorKind::TvixError(Rc::new(e)))?; + + let ca_hash = CAHash::Text(h.finalize().into()); + + // persist via pathinfo service. + let store_path = state + .tokio_handle + .block_on( + state.path_info_service.put(PathInfo { + store_path: build_ca_path( + name.to_str()?, + &ca_hash, + content.iter_ctx_plain(), + false, + ) + .map_err(|_e| { + nix_compat::derivation::DerivationError::InvalidOutputName( + name.to_str_lossy().into_owned(), + ) + }) + .map_err(crate::builtins::DerivationError::InvalidDerivation)?, + node: root_node, + // assemble references from plain context. + references: content + .iter_ctx_plain() + .map(|elem| StorePath::from_absolute_path(elem.as_bytes())) + .collect::>() + .map_err(|e| ErrorKind::TvixError(Rc::new(e)))?, + nar_size, + nar_sha256, + signatures: vec![], + deriver: None, + ca: Some(ca_hash), + }), + ) + .map_err(|e| ErrorKind::TvixError(Rc::new(e))) + .map(|path_info| path_info.store_path)?; + + let abs_path = store_path.to_absolute_path(); + let context: NixContext = NixContextElement::Plain(abs_path.clone()).into(); + + Ok(Value::from(NixString::new_context_from(context, abs_path))) + } } pub use import_builtins::builtins as import_builtins;