refactor(tvix/glue): move toFile to import builtins
This does import the contents into the store, so it should belong in there. While moving, I also noticed the code copying to the BlobService can be shared with the one used when importing a blob via import helper. It was a bit hidden - due to the contents being available as a bytes, we used a Cursor and wrote it with tokio::io::copy. However, Cursor implements both AsyncRead and Read, so we can factor out the copying code into a copy_to_blobservice helper function and use it in both places. The output name being wrong error kind arguably is still a bit misplaced here, but that's left for later. Change-Id: Iec3c422c12270ee111f864d2b78c0861f78edfa4 Reviewed-on: https://cl.tvl.fyi/c/depot/+/13254 Reviewed-by: Domen Kožar <domen@cachix.org> Tested-by: BuildkiteCI Reviewed-by: raitobezarius <tvl@lahfa.xyz>
This commit is contained in:
parent
ba54ba47ee
commit
4535824166
2 changed files with 171 additions and 156 deletions
|
|
@ -168,21 +168,17 @@ fn handle_fixed_output(
|
|||
#[builtins(state = "Rc<TvixStoreIO>")]
|
||||
pub(crate) mod derivation_builtins {
|
||||
use std::collections::BTreeMap;
|
||||
use std::io::Cursor;
|
||||
|
||||
use bstr::ByteSlice;
|
||||
|
||||
use nix_compat::store_path::hash_placeholder;
|
||||
use tvix_eval::generators::Gen;
|
||||
use tvix_eval::{NixContext, NixContextElement, NixString};
|
||||
|
||||
use crate::builtins::utils::{select_string, strong_importing_coerce_to_string};
|
||||
use crate::fetchurl::fetchurl_derivation_to_fetch;
|
||||
|
||||
use super::*;
|
||||
use bstr::ByteSlice;
|
||||
use md5::Digest;
|
||||
use nix_compat::nixhash::CAHash;
|
||||
use nix_compat::store_path::{build_ca_path, hash_placeholder};
|
||||
use sha2::Sha256;
|
||||
use tvix_castore::Node;
|
||||
use tvix_eval::generators::Gen;
|
||||
use tvix_eval::{NixContext, NixContextElement, NixString};
|
||||
use tvix_store::pathinfoservice::PathInfo;
|
||||
|
||||
#[builtin("placeholder")]
|
||||
async fn builtin_placeholder(co: GenCo, input: Value) -> Result<Value, ErrorKind> {
|
||||
|
|
@ -525,95 +521,4 @@ pub(crate) mod derivation_builtins {
|
|||
|
||||
Ok(out)
|
||||
}
|
||||
|
||||
#[builtin("toFile")]
|
||||
async fn builtin_to_file(
|
||||
state: Rc<TvixStoreIO>,
|
||||
co: GenCo,
|
||||
name: Value,
|
||||
content: Value,
|
||||
) -> Result<Value, ErrorKind> {
|
||||
if name.is_catchable() {
|
||||
return Ok(name);
|
||||
}
|
||||
|
||||
if content.is_catchable() {
|
||||
return Ok(content);
|
||||
}
|
||||
|
||||
let name = name
|
||||
.to_str()
|
||||
.context("evaluating the `name` parameter of builtins.toFile")?;
|
||||
let content = content
|
||||
.to_contextful_str()
|
||||
.context("evaluating the `content` parameter of builtins.toFile")?;
|
||||
|
||||
if content.iter_ctx_derivation().count() > 0
|
||||
|| content.iter_ctx_single_outputs().count() > 0
|
||||
{
|
||||
return Err(ErrorKind::UnexpectedContext);
|
||||
}
|
||||
|
||||
let store_path = state.tokio_handle.block_on(async {
|
||||
// upload contents to the blobservice and create a root node
|
||||
let mut blob_writer = state.blob_service.open_write().await;
|
||||
|
||||
let mut r = Cursor::new(&content);
|
||||
|
||||
let blob_size = tokio::io::copy(&mut r, &mut blob_writer).await?;
|
||||
let blob_digest = blob_writer.close().await?;
|
||||
let ca_hash = CAHash::Text(Sha256::digest(&content).into());
|
||||
|
||||
let root_node = Node::File {
|
||||
digest: blob_digest,
|
||||
size: blob_size,
|
||||
executable: false,
|
||||
};
|
||||
|
||||
// calculate the nar hash
|
||||
let (nar_size, nar_sha256) = state
|
||||
.nar_calculation_service
|
||||
.calculate_nar(&root_node)
|
||||
.await
|
||||
.map_err(|e| ErrorKind::TvixError(Rc::new(e)))?;
|
||||
|
||||
// persist via pathinfo service.
|
||||
state
|
||||
.path_info_service
|
||||
.put(PathInfo {
|
||||
store_path: build_ca_path(
|
||||
name.to_str()?,
|
||||
&ca_hash,
|
||||
content.iter_ctx_plain(),
|
||||
false,
|
||||
)
|
||||
.map_err(|_e| {
|
||||
nix_compat::derivation::DerivationError::InvalidOutputName(
|
||||
name.to_str_lossy().into_owned(),
|
||||
)
|
||||
})
|
||||
.map_err(DerivationError::InvalidDerivation)?,
|
||||
node: root_node,
|
||||
// assemble references from plain context.
|
||||
references: content
|
||||
.iter_ctx_plain()
|
||||
.map(|elem| StorePath::from_absolute_path(elem.as_bytes()))
|
||||
.collect::<Result<_, _>>()
|
||||
.map_err(|e| ErrorKind::TvixError(Rc::new(e)))?,
|
||||
nar_size,
|
||||
nar_sha256,
|
||||
signatures: vec![],
|
||||
deriver: None,
|
||||
ca: Some(ca_hash),
|
||||
})
|
||||
.await
|
||||
.map_err(|e| ErrorKind::TvixError(Rc::new(e)))
|
||||
.map(|path_info| path_info.store_path)
|
||||
})?;
|
||||
|
||||
let abs_path = store_path.to_absolute_path();
|
||||
let context: NixContext = NixContextElement::Plain(abs_path.clone()).into();
|
||||
|
||||
Ok(Value::from(NixString::new_context_from(context, abs_path)))
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -112,16 +112,63 @@ mod import_builtins {
|
|||
use crate::tvix_store_io::TvixStoreIO;
|
||||
use bstr::ByteSlice;
|
||||
use nix_compat::nixhash::{CAHash, NixHash};
|
||||
use nix_compat::store_path::{build_ca_path, StorePathRef};
|
||||
use nix_compat::store_path::{build_ca_path, StorePath, StorePathRef};
|
||||
use sha2::Digest;
|
||||
use std::rc::Rc;
|
||||
use tokio::io::AsyncWriteExt;
|
||||
use tvix_castore::blobservice::BlobService;
|
||||
use tvix_eval::builtins::coerce_value_to_path;
|
||||
use tvix_eval::generators::Gen;
|
||||
use tvix_eval::{generators::GenCo, ErrorKind, Value};
|
||||
use tvix_eval::{FileType, NixContextElement, NixString};
|
||||
use tvix_eval::{AddContext, FileType, NixContext, NixContextElement, NixString};
|
||||
use tvix_store::path_info::PathInfo;
|
||||
|
||||
/// Helper function dealing with uploading something from a std::io::Read to
|
||||
/// the passed [BlobService], returning the B3Digest and size.
|
||||
/// This function is sync (and uses the tokio handle to block).
|
||||
/// A sync closure getting a copy of all bytes read can be passed in,
|
||||
/// allowing to do other hashing where needed.
|
||||
fn copy_to_blobservice<F>(
|
||||
tokio_handle: tokio::runtime::Handle,
|
||||
blob_service: impl BlobService,
|
||||
mut r: impl std::io::Read,
|
||||
mut inspect_f: F,
|
||||
) -> std::io::Result<(tvix_castore::B3Digest, u64)>
|
||||
where
|
||||
F: FnMut(&[u8]),
|
||||
{
|
||||
let mut blob_size = 0;
|
||||
|
||||
let mut blob_writer = tokio_handle.block_on(async { blob_service.open_write().await });
|
||||
|
||||
// read piece by piece and write to blob_writer.
|
||||
// This is a bit manual due to EvalIO being sync, while the blob writer being async.
|
||||
{
|
||||
let mut buf = [0u8; 4096];
|
||||
|
||||
loop {
|
||||
// read bytes into buffer, break out if EOF
|
||||
let len = r.read(&mut buf)?;
|
||||
if len == 0 {
|
||||
break;
|
||||
}
|
||||
blob_size += len as u64;
|
||||
|
||||
let data = &buf[0..len];
|
||||
|
||||
// write to blobwriter
|
||||
tokio_handle.block_on(async { blob_writer.write_all(data).await })?;
|
||||
|
||||
// Call inspect_f
|
||||
inspect_f(data);
|
||||
}
|
||||
|
||||
let blob_digest = tokio_handle.block_on(async { blob_writer.close().await })?;
|
||||
|
||||
Ok((blob_digest, blob_size))
|
||||
}
|
||||
}
|
||||
|
||||
// This is a helper used by both builtins.path and builtins.filterSource.
|
||||
async fn import_helper(
|
||||
state: Rc<TvixStoreIO>,
|
||||
|
|
@ -151,72 +198,43 @@ mod import_builtins {
|
|||
// as that affects the output path calculation.
|
||||
FileType::Regular => {
|
||||
let mut file = state.open(&path)?;
|
||||
let mut h = (!recursive_ingestion).then(sha2::Sha256::new);
|
||||
|
||||
let mut flat_sha256 = (!recursive_ingestion).then(sha2::Sha256::new);
|
||||
let mut blob_size = 0;
|
||||
|
||||
let mut blob_writer = state
|
||||
.tokio_handle
|
||||
.block_on(async { state.blob_service.open_write().await });
|
||||
|
||||
// read piece by piece and write to blob_writer.
|
||||
// This is a bit manual due to EvalIO being sync, while everything else async.
|
||||
{
|
||||
let mut buf = [0u8; 4096];
|
||||
|
||||
loop {
|
||||
// read bytes into buffer, break out if EOF
|
||||
let len = file.read(&mut buf)?;
|
||||
if len == 0 {
|
||||
break;
|
||||
}
|
||||
blob_size += len as u64;
|
||||
|
||||
let data = &buf[0..len];
|
||||
|
||||
// add to blobwriter
|
||||
state
|
||||
.tokio_handle
|
||||
.block_on(async { blob_writer.write_all(data).await })?;
|
||||
|
||||
let (blob_digest, blob_size) = copy_to_blobservice(
|
||||
state.tokio_handle.clone(),
|
||||
&state.blob_service,
|
||||
&mut file,
|
||||
|data| {
|
||||
// update blob_sha256 if needed.
|
||||
if let Some(h) = flat_sha256.as_mut() {
|
||||
if let Some(h) = h.as_mut() {
|
||||
h.update(data)
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
)?;
|
||||
|
||||
// close the blob writer, construct the root node and the blob_sha256 (later used for output path calculation)
|
||||
(
|
||||
Node::File {
|
||||
digest: state
|
||||
.tokio_handle
|
||||
.block_on(async { blob_writer.close().await })?,
|
||||
digest: blob_digest,
|
||||
size: blob_size,
|
||||
executable: false,
|
||||
},
|
||||
{
|
||||
// If non-recursive ingestion is requested…
|
||||
if let Some(flat_sha256) = flat_sha256 {
|
||||
let actual_sha256 = flat_sha256.finalize().into();
|
||||
h.map(|h| {
|
||||
// If non-recursive ingestion was requested, we return that one.
|
||||
let actual_sha256 = h.finalize().into();
|
||||
|
||||
// compare the recorded flat hash with an upfront one if provided.
|
||||
if let Some(expected_sha256) = expected_sha256 {
|
||||
if actual_sha256 != expected_sha256 {
|
||||
return Err(ImportError::HashMismatch(
|
||||
path,
|
||||
NixHash::Sha256(expected_sha256),
|
||||
NixHash::Sha256(actual_sha256),
|
||||
)
|
||||
.into());
|
||||
}
|
||||
// If an expected hash was provided upfront, compare and bail out.
|
||||
if let Some(expected_sha256) = expected_sha256 {
|
||||
if actual_sha256 != expected_sha256 {
|
||||
return Err(ImportError::HashMismatch(
|
||||
path.clone(),
|
||||
NixHash::Sha256(expected_sha256),
|
||||
NixHash::Sha256(actual_sha256),
|
||||
));
|
||||
}
|
||||
|
||||
Some(CAHash::Flat(NixHash::Sha256(actual_sha256)))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
},
|
||||
Ok(CAHash::Flat(NixHash::Sha256(actual_sha256)))
|
||||
})
|
||||
.transpose()?,
|
||||
)
|
||||
}
|
||||
|
||||
|
|
@ -420,6 +438,98 @@ mod import_builtins {
|
|||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[builtin("toFile")]
|
||||
async fn builtin_to_file(
|
||||
state: Rc<TvixStoreIO>,
|
||||
co: GenCo,
|
||||
name: Value,
|
||||
content: Value,
|
||||
) -> Result<Value, ErrorKind> {
|
||||
if name.is_catchable() {
|
||||
return Ok(name);
|
||||
}
|
||||
|
||||
if content.is_catchable() {
|
||||
return Ok(content);
|
||||
}
|
||||
|
||||
let name = name
|
||||
.to_str()
|
||||
.context("evaluating the `name` parameter of builtins.toFile")?;
|
||||
let content = content
|
||||
.to_contextful_str()
|
||||
.context("evaluating the `content` parameter of builtins.toFile")?;
|
||||
|
||||
if content.iter_ctx_derivation().count() > 0
|
||||
|| content.iter_ctx_single_outputs().count() > 0
|
||||
{
|
||||
return Err(ErrorKind::UnexpectedContext);
|
||||
}
|
||||
|
||||
// upload contents to the blobservice and create a root node
|
||||
let mut h = sha2::Sha256::new();
|
||||
let (blob_digest, blob_size) = copy_to_blobservice(
|
||||
state.tokio_handle.clone(),
|
||||
&state.blob_service,
|
||||
std::io::Cursor::new(&content),
|
||||
|data| h.update(data),
|
||||
)?;
|
||||
|
||||
let root_node = Node::File {
|
||||
digest: blob_digest,
|
||||
size: blob_size,
|
||||
executable: false,
|
||||
};
|
||||
|
||||
// calculate the nar hash
|
||||
let (nar_size, nar_sha256) = state
|
||||
.nar_calculation_service
|
||||
.calculate_nar(&root_node)
|
||||
.await
|
||||
.map_err(|e| ErrorKind::TvixError(Rc::new(e)))?;
|
||||
|
||||
let ca_hash = CAHash::Text(h.finalize().into());
|
||||
|
||||
// persist via pathinfo service.
|
||||
let store_path = state
|
||||
.tokio_handle
|
||||
.block_on(
|
||||
state.path_info_service.put(PathInfo {
|
||||
store_path: build_ca_path(
|
||||
name.to_str()?,
|
||||
&ca_hash,
|
||||
content.iter_ctx_plain(),
|
||||
false,
|
||||
)
|
||||
.map_err(|_e| {
|
||||
nix_compat::derivation::DerivationError::InvalidOutputName(
|
||||
name.to_str_lossy().into_owned(),
|
||||
)
|
||||
})
|
||||
.map_err(crate::builtins::DerivationError::InvalidDerivation)?,
|
||||
node: root_node,
|
||||
// assemble references from plain context.
|
||||
references: content
|
||||
.iter_ctx_plain()
|
||||
.map(|elem| StorePath::from_absolute_path(elem.as_bytes()))
|
||||
.collect::<Result<_, _>>()
|
||||
.map_err(|e| ErrorKind::TvixError(Rc::new(e)))?,
|
||||
nar_size,
|
||||
nar_sha256,
|
||||
signatures: vec![],
|
||||
deriver: None,
|
||||
ca: Some(ca_hash),
|
||||
}),
|
||||
)
|
||||
.map_err(|e| ErrorKind::TvixError(Rc::new(e)))
|
||||
.map(|path_info| path_info.store_path)?;
|
||||
|
||||
let abs_path = store_path.to_absolute_path();
|
||||
let context: NixContext = NixContextElement::Plain(abs_path.clone()).into();
|
||||
|
||||
Ok(Value::from(NixString::new_context_from(context, abs_path)))
|
||||
}
|
||||
}
|
||||
|
||||
pub use import_builtins::builtins as import_builtins;
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue