feat(tvix/eval): implement builtins.filterSource

We add a new set of builtins called `import_builtins`, which
will contain import-related builtins, such as `builtins.path` and
`builtins.filterSource`. Both can import paths into the store, with
various knobs to alter the result, e.g. filtering, renaming, expected
hashes.

We introduce `filtered_ingest` which will drive the filtered ingestion
via the Nix function via the generator machinery, and then we register
the root node to the path info service inside the store.

`builtins.filterSource` is very simple, `builtins.path` is a more
complicated model requiring the same logic albeit more sophisticated
with name customization, file ingestion method and expected SHA-256.

Change-Id: I1083f37808b35f7b37818c8ffb9543d9682b2de2
Reviewed-on: https://cl.tvl.fyi/c/depot/+/10654
Autosubmit: raitobezarius <tvl@lahfa.xyz>
Tested-by: BuildkiteCI
Reviewed-by: flokli <flokli@flokli.de>
This commit is contained in:
Ryan Lahfa 2024-01-17 07:45:55 +01:00 committed by clbot
parent 20833656ae
commit 7388078630
9 changed files with 521 additions and 69 deletions

View file

@ -0,0 +1,148 @@
//! Implements builtins used to import paths in the store.
use futures::pin_mut;
use std::path::Path;
use tvix_eval::{
builtin_macros::builtins,
generators::{self, GenCo},
ErrorKind, Value,
};
use std::rc::Rc;
async fn filtered_ingest(
state: Rc<TvixStoreIO>,
co: GenCo,
path: &Path,
filter: Option<&Value>,
) -> Result<tvix_castore::proto::node::Node, ErrorKind> {
let mut entries_per_depths: Vec<Vec<walkdir::DirEntry>> = vec![Vec::new()];
let mut it = walkdir::WalkDir::new(path)
.follow_links(false)
.follow_root_links(false)
.contents_first(false)
.sort_by_file_name()
.into_iter();
// Skip root node.
entries_per_depths[0].push(
it.next()
.ok_or_else(|| ErrorKind::IO {
path: Some(path.to_path_buf()),
error: std::io::Error::new(std::io::ErrorKind::NotFound, "No root node emitted")
.into(),
})?
.map_err(|err| ErrorKind::IO {
path: Some(path.to_path_buf()),
error: std::io::Error::from(err).into(),
})?,
);
while let Some(entry) = it.next() {
// Entry could be a NotFound, if the root path specified does not exist.
let entry = entry.map_err(|err| ErrorKind::IO {
path: err.path().map(|p| p.to_path_buf()),
error: std::io::Error::from(err).into(),
})?;
// As per Nix documentation `:doc builtins.filterSource`.
let file_type = if entry.file_type().is_dir() {
"directory"
} else if entry.file_type().is_file() {
"regular"
} else if entry.file_type().is_symlink() {
"symlink"
} else {
"unknown"
};
let should_keep: bool = if let Some(filter) = filter {
generators::request_force(
&co,
generators::request_call_with(
&co,
filter.clone(),
[
Value::String(Box::new(entry.path().as_os_str().as_encoded_bytes().into())),
Value::String(Box::new(file_type.into())),
],
)
.await,
)
.await
.as_bool()?
} else {
true
};
if !should_keep {
if file_type == "directory" {
it.skip_current_dir();
}
continue;
}
if entry.depth() >= entries_per_depths.len() {
debug_assert!(
entry.depth() == entries_per_depths.len(),
"Received unexpected entry with depth {} during descent, previously at {}",
entry.depth(),
entries_per_depths.len()
);
entries_per_depths.push(vec![entry]);
} else {
entries_per_depths[entry.depth()].push(entry);
}
// FUTUREWORK: determine when it's the right moment to flush a level to the ingester.
}
let entries_stream = tvix_castore::import::leveled_entries_to_stream(entries_per_depths);
pin_mut!(entries_stream);
state
.ingest_entries_sync(entries_stream)
.map_err(|err| ErrorKind::IO {
path: Some(path.to_path_buf()),
error: err.into(),
})
}
#[builtins(state = "Rc<TvixStoreIO>")]
mod import_builtins {
use std::rc::Rc;
use super::*;
use tvix_eval::generators::Gen;
use tvix_eval::{generators::GenCo, ErrorKind, Value};
use crate::tvix_store_io::TvixStoreIO;
#[builtin("filterSource")]
async fn builtin_filter_source(
state: Rc<TvixStoreIO>,
co: GenCo,
#[lazy] filter: Value,
path: Value,
) -> Result<Value, ErrorKind> {
let p = path.to_path()?;
let root_node = filtered_ingest(state.clone(), co, &p, Some(&filter)).await?;
let name = tvix_store::import::path_to_name(&p)?;
Ok(state
.register_node_in_path_info_service_sync(name, &p, root_node)
.map_err(|err| ErrorKind::IO {
path: Some(p.to_path_buf()),
error: err.into(),
})?
.to_absolute_path()
.into())
}
}
pub use import_builtins::builtins as import_builtins;
use crate::tvix_store_io::TvixStoreIO;

View file

@ -8,6 +8,8 @@ mod derivation;
mod derivation_error;
mod fetchers;
mod import;
pub use derivation_error::Error as DerivationError;
/// Adds derivation-related builtins to the passed [tvix_eval::Evaluation].
@ -35,14 +37,26 @@ pub fn add_fetcher_builtins<IO>(eval: &mut tvix_eval::Evaluation<IO>, io: Rc<Tvi
.extend(fetchers::fetcher_builtins::builtins(io));
}
/// Adds import-related builtins to the passed [tvix_eval::Evaluation].
///
/// These are `filterSource` and `path`
///
/// As they need to interact with the store implementation, we pass [`TvixStoreIO`].
pub fn add_import_builtins<IO>(eval: &mut tvix_eval::Evaluation<IO>, io: Rc<TvixStoreIO>) {
eval.builtins.extend(import::import_builtins(io));
// TODO(raitobezarius): evaluate expressing filterSource as Nix code using path (b/372)
}
#[cfg(test)]
mod tests {
use std::{rc::Rc, sync::Arc};
use std::{fs, rc::Rc, sync::Arc};
use crate::tvix_store_io::TvixStoreIO;
use super::{add_derivation_builtins, add_fetcher_builtins};
use super::{add_derivation_builtins, add_fetcher_builtins, add_import_builtins};
use nix_compat::store_path::hash_placeholder;
use tempfile::TempDir;
use test_case::test_case;
use tvix_build::buildservice::DummyBuildService;
use tvix_eval::{EvalIO, EvaluationResult};
@ -69,7 +83,8 @@ mod tests {
let mut eval = tvix_eval::Evaluation::new(io.clone() as Rc<dyn EvalIO>, false);
add_derivation_builtins(&mut eval, io.clone());
add_fetcher_builtins(&mut eval, io);
add_fetcher_builtins(&mut eval, io.clone());
add_import_builtins(&mut eval, io);
// run the evaluation itself.
eval.evaluate(str, None)
@ -333,4 +348,153 @@ mod tests {
"warnings should not be empty"
);
}
/// Invokes `builtins.filterSource` on various carefully-crated subdirs, and
/// ensures the resulting store paths matches what Nix produces.
/// @fixtures is replaced to the fixtures directory.
#[cfg(target_family = "unix")]
#[test_case(r#"(builtins.filterSource (p: t: true) @fixtures)"#, "/nix/store/bqh6kd0x3vps2rzagzpl7qmbbgnx19cp-import_fixtures"; "complicated directory: filter nothing")]
#[test_case(r#"(builtins.filterSource (p: t: false) @fixtures)"#, "/nix/store/giq6czz24lpjg97xxcxk6rg950lcpib1-import_fixtures"; "complicated directory: filter everything")]
#[test_case(r#"(builtins.filterSource (p: t: t != "directory") @fixtures/a_dir)"#, "/nix/store/8vbqaxapywkvv1hacdja3pi075r14d43-a_dir"; "simple directory with one file: filter directories")]
#[test_case(r#"(builtins.filterSource (p: t: t != "regular") @fixtures/a_dir)"#, "/nix/store/zphlqc93s2iq4xm393l06hzf8hp85r4z-a_dir"; "simple directory with one file: filter files")]
#[test_case(r#"(builtins.filterSource (p: t: t != "symlink") @fixtures/a_dir)"#, "/nix/store/8vbqaxapywkvv1hacdja3pi075r14d43-a_dir"; "simple directory with one file: filter symlinks")]
#[test_case(r#"(builtins.filterSource (p: t: true) @fixtures/a_dir)"#, "/nix/store/8vbqaxapywkvv1hacdja3pi075r14d43-a_dir"; "simple directory with one file: filter nothing")]
#[test_case(r#"(builtins.filterSource (p: t: false) @fixtures/a_dir)"#, "/nix/store/zphlqc93s2iq4xm393l06hzf8hp85r4z-a_dir"; "simple directory with one file: filter everything")]
#[test_case(r#"builtins.filterSource (p: t: t != "directory") @fixtures/b_dir"#, "/nix/store/xzsfzdgrxg93icaamjm8zq1jq6xvf2fz-b_dir"; "simple directory with one directory: filter directories")]
#[test_case(r#"builtins.filterSource (p: t: t != "regular") @fixtures/b_dir"#, "/nix/store/8rjx64mm7173xp60rahv7cl3ixfkv3rf-b_dir"; "simple directory with one directory: filter files")]
#[test_case(r#"builtins.filterSource (p: t: t != "symlink") @fixtures/b_dir"#, "/nix/store/8rjx64mm7173xp60rahv7cl3ixfkv3rf-b_dir"; "simple directory with one directory: filter symlinks")]
#[test_case(r#"builtins.filterSource (p: t: true) @fixtures/b_dir"#, "/nix/store/8rjx64mm7173xp60rahv7cl3ixfkv3rf-b_dir"; "simple directory with one directory: filter nothing")]
#[test_case(r#"builtins.filterSource (p: t: false) @fixtures/b_dir"#, "/nix/store/xzsfzdgrxg93icaamjm8zq1jq6xvf2fz-b_dir"; "simple directory with one directory: filter everything")]
#[test_case(r#"builtins.filterSource (p: t: t != "directory") @fixtures/c_dir"#, "/nix/store/riigfmmzzrq65zqiffcjk5sbqr9c9h09-c_dir"; "simple directory with one symlink to a file: filter directory")]
#[test_case(r#"builtins.filterSource (p: t: t != "regular") @fixtures/c_dir"#, "/nix/store/riigfmmzzrq65zqiffcjk5sbqr9c9h09-c_dir"; "simple directory with one symlink to a file: filter files")]
#[test_case(r#"builtins.filterSource (p: t: t != "symlink") @fixtures/c_dir"#, "/nix/store/y5g1fz04vzjvf422q92qmv532axj5q26-c_dir"; "simple directory with one symlink to a file: filter symlinks")]
#[test_case(r#"builtins.filterSource (p: t: true) @fixtures/c_dir"#, "/nix/store/riigfmmzzrq65zqiffcjk5sbqr9c9h09-c_dir"; "simple directory with one symlink to a file: filter nothing")]
#[test_case(r#"builtins.filterSource (p: t: false) @fixtures/c_dir"#, "/nix/store/y5g1fz04vzjvf422q92qmv532axj5q26-c_dir"; "simple directory with one symlink to a file: filter everything")]
#[test_case(r#"builtins.filterSource (p: t: t != "directory") @fixtures/d_dir"#, "/nix/store/f2d1aixwiqy4lbzrd040ala2s4m2z199-d_dir"; "simple directory with dangling symlink: filter directory")]
#[test_case(r#"builtins.filterSource (p: t: t != "regular") @fixtures/d_dir"#, "/nix/store/f2d1aixwiqy4lbzrd040ala2s4m2z199-d_dir"; "simple directory with dangling symlink: filter file")]
#[test_case(r#"builtins.filterSource (p: t: t != "symlink") @fixtures/d_dir"#, "/nix/store/7l371xax8kknhpska4wrmyll1mzlhzvl-d_dir"; "simple directory with dangling symlink: filter symlinks")]
#[test_case(r#"builtins.filterSource (p: t: true) @fixtures/d_dir"#, "/nix/store/f2d1aixwiqy4lbzrd040ala2s4m2z199-d_dir"; "simple directory with dangling symlink: filter nothing")]
#[test_case(r#"builtins.filterSource (p: t: false) @fixtures/d_dir"#, "/nix/store/7l371xax8kknhpska4wrmyll1mzlhzvl-d_dir"; "simple directory with dangling symlink: filter everything")]
#[test_case(r#"builtins.filterSource (p: t: t != "directory") @fixtures/symlink_to_a_dir"#, "/nix/store/apmdprm8fwl2zrjpbyfcd99zrnhvf47q-symlink_to_a_dir"; "simple symlinked directory with one file: filter directories")]
#[test_case(r#"builtins.filterSource (p: t: t != "regular") @fixtures/symlink_to_a_dir"#, "/nix/store/apmdprm8fwl2zrjpbyfcd99zrnhvf47q-symlink_to_a_dir"; "simple symlinked directory with one file: filter file")]
#[test_case(r#"builtins.filterSource (p: t: t != "symlink") @fixtures/symlink_to_a_dir"#, "/nix/store/apmdprm8fwl2zrjpbyfcd99zrnhvf47q-symlink_to_a_dir"; "simple symlinked directory with one file: filter symlinks")]
#[test_case(r#"builtins.filterSource (p: t: true) @fixtures/symlink_to_a_dir"#, "/nix/store/apmdprm8fwl2zrjpbyfcd99zrnhvf47q-symlink_to_a_dir"; "simple symlinked directory with one file: filter nothing")]
#[test_case(r#"builtins.filterSource (p: t: false) @fixtures/symlink_to_a_dir"#, "/nix/store/apmdprm8fwl2zrjpbyfcd99zrnhvf47q-symlink_to_a_dir"; "simple symlinked directory with one file: filter everything")]
fn builtins_filter_source_succeed(code: &str, expected_outpath: &str) {
// populate the fixtures dir
let temp = TempDir::new().expect("create temporary directory");
let p = temp.path().join("import_fixtures");
// create the fixtures directory.
// We produce them at runtime rather than shipping it inside the source
// tree, as git can't model certain things - like directories without any
// items.
{
fs::create_dir(&p).expect("creating import_fixtures");
// `/a_dir` contains an empty `a_file` file
fs::create_dir(p.join("a_dir")).expect("creating /a_dir");
fs::write(p.join("a_dir").join("a_file"), "").expect("creating /a_dir/a_file");
// `/a_file` is an empty file
fs::write(p.join("a_file"), "").expect("creating /a_file");
// `/b_dir` contains an empty "a_dir" directory
fs::create_dir_all(p.join("b_dir").join("a_dir")).expect("creating /b_dir/a_dir");
// `/c_dir` contains a `symlink_to_a_file` symlink, pointing to `../a_dir/a_file`.
fs::create_dir(p.join("c_dir")).expect("creating /c_dir");
std::os::unix::fs::symlink(
"../a_dir/a_file",
p.join("c_dir").join("symlink_to_a_file"),
)
.expect("creating /c_dir/symlink_to_a_file");
// `/d_dir` contains a `dangling_symlink`, pointing to `a_dir/a_file`,
// which does not exist.
fs::create_dir(p.join("d_dir")).expect("creating /d_dir");
std::os::unix::fs::symlink("a_dir/a_file", p.join("d_dir").join("dangling_symlink"))
.expect("creating /d_dir/dangling_symlink");
// `/symlink_to_a_dir` is a symlink to `a_dir`, which exists.
std::os::unix::fs::symlink("a_dir", p.join("symlink_to_a_dir"))
.expect("creating /symlink_to_a_dir");
}
// replace @fixtures with the temporary path containing the fixtures
let code_replaced = code.replace("@fixtures", &p.to_string_lossy());
let eval_result = eval(&code_replaced);
let value = eval_result.value.expect("must succeed");
match value {
tvix_eval::Value::String(s) => {
assert_eq!(expected_outpath, s.as_bstr());
}
_ => panic!("unexpected value type: {:?}", value),
}
assert!(eval_result.errors.is_empty(), "errors should be empty");
}
// All tests filter out some unsupported (not representable in castore) nodes, confirming
// invalid, but filtered-out nodes don't prevent ingestion of a path.
#[cfg(target_family = "unix")]
// There is a set of invalid filetypes.
// We write a filter function for most subsets, excluding one that filters all of them.
// We expect these cases to make the evaluation fail as there are still invalid files present
// after the filtering.
#[test_case(r#"(builtins.filterSource (p: t: t == "unknown") @fixtures)"#, false; "complicated directory: filter unsupported types")]
#[test_case(r#"(builtins.filterSource (p: t: (builtins.baseNameOf p) != "a_charnode") @fixtures)"#, false; "complicated directory: filter character device nodes")]
#[test_case(r#"(builtins.filterSource (p: t: (builtins.baseNameOf p) != "a_socket") @fixtures)"#, false; "complicated directory: filter sockets")]
#[test_case(r#"(builtins.filterSource (p: t: (builtins.baseNameOf p) != "a_fifo") @fixtures)"#, false; "complicated directory: filter FIFOs")]
// We filter all invalid filetypes, so the evaluation has to succeed.
#[test_case(r#"(builtins.filterSource (p: t: t != "unknown") @fixtures)"#, true; "complicated directory: filter out unsupported types")]
fn builtins_filter_source_unsupported_files(code: &str, success: bool) {
use nix::sys::stat;
use nix::unistd;
use std::os::unix::net::UnixListener;
use tempfile::TempDir;
// We prepare a directory containing some unsupported file nodes:
// - character device
// - socket
// - FIFO
// and we run the evaluation inside that CWD.
//
// block devices cannot be tested because we don't have the right permissions.
let temp = TempDir::with_prefix("foo").expect("Failed to create a temporary directory");
// read, write, execute to the owner.
unistd::mkfifo(&temp.path().join("a_fifo"), stat::Mode::S_IRWXU)
.expect("Failed to create the FIFO");
UnixListener::bind(temp.path().join("a_socket")).expect("Failed to create the socket");
stat::mknod(
&temp.path().join("a_charnode"),
stat::SFlag::S_IFCHR,
stat::Mode::S_IRWXU,
0,
)
.expect("Failed to create a character device node");
let code_replaced = code.replace("@fixtures", &temp.path().to_string_lossy());
let eval_result = eval(&code_replaced);
if success {
assert!(
eval_result.value.is_some(),
"unexpected failure on a directory of unsupported file types but all filtered: {:?}",
eval_result.errors
);
} else {
assert!(
eval_result.value.is_none(),
"unexpected success on unsupported file type ingestion: {:?}",
eval_result.value
);
}
}
}

View file

@ -13,7 +13,7 @@ use tvix_store::pathinfoservice::{MemoryPathInfoService, PathInfoService};
use rstest::rstest;
use crate::{
builtins::{add_derivation_builtins, add_fetcher_builtins},
builtins::{add_derivation_builtins, add_fetcher_builtins, add_import_builtins},
tvix_store_io::TvixStoreIO,
};
@ -54,7 +54,8 @@ fn eval_test(code_path: PathBuf, expect_success: bool) {
eval.strict = true;
add_derivation_builtins(&mut eval, tvix_store_io.clone());
add_fetcher_builtins(&mut eval, tvix_store_io);
add_fetcher_builtins(&mut eval, tvix_store_io.clone());
add_import_builtins(&mut eval, tvix_store_io.clone());
let result = eval.evaluate(code, Some(code_path.clone()));
let failed = match result.value {

View file

@ -2,6 +2,7 @@
use async_recursion::async_recursion;
use bytes::Bytes;
use futures::Stream;
use futures::{StreamExt, TryStreamExt};
use nix_compat::{
nixhash::CAHash,
@ -18,6 +19,7 @@ use tokio::io::AsyncReadExt;
use tracing::{error, instrument, warn, Level};
use tvix_build::buildservice::BuildService;
use tvix_eval::{EvalIO, FileType, StdIO};
use walkdir::DirEntry;
use tvix_castore::{
blobservice::BlobService,
@ -282,6 +284,79 @@ impl TvixStoreIO {
self.tokio_handle
.block_on(async { self.store_path_to_node(store_path, sub_path).await })
}
/// This forwards the ingestion to the [`tvix_castore::import::ingest_entries`]
/// with a [`tokio::runtime::Handle::block_on`] call for synchronicity.
pub(crate) fn ingest_entries_sync<S>(&self, entries_stream: S) -> io::Result<Node>
where
S: Stream<Item = DirEntry> + std::marker::Unpin,
{
self.tokio_handle.block_on(async move {
tvix_castore::import::ingest_entries(
&self.blob_service,
&self.directory_service,
entries_stream,
)
.await
.map_err(|err| std::io::Error::new(io::ErrorKind::Other, err))
})
}
pub(crate) async fn node_to_path_info(
&self,
name: &str,
path: &Path,
root_node: Node,
) -> io::Result<(PathInfo, StorePath)> {
// Ask the PathInfoService for the NAR size and sha256
let (nar_size, nar_sha256) = self
.path_info_service
.as_ref()
.calculate_nar(&root_node)
.await?;
// Calculate the output path. This might still fail, as some names are illegal.
let output_path = nix_compat::store_path::build_nar_based_store_path(&nar_sha256, name)
.map_err(|_| {
std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!("invalid name: {}", name),
)
})?;
// assemble a new root_node with a name that is derived from the nar hash.
let root_node = root_node.rename(output_path.to_string().into_bytes().into());
tvix_store::import::log_node(&root_node, path);
let path_info =
tvix_store::import::derive_nar_ca_path_info(nar_size, nar_sha256, root_node);
Ok((path_info, output_path.to_owned()))
}
pub(crate) async fn register_node_in_path_info_service(
&self,
name: &str,
path: &Path,
root_node: Node,
) -> io::Result<StorePath> {
let (path_info, output_path) = self.node_to_path_info(name, path, root_node).await?;
let _path_info = self.path_info_service.as_ref().put(path_info).await?;
Ok(output_path)
}
pub(crate) fn register_node_in_path_info_service_sync(
&self,
name: &str,
path: &Path,
root_node: Node,
) -> io::Result<StorePath> {
self.tokio_handle.block_on(async {
self.register_node_in_path_info_service(name, path, root_node)
.await
})
}
}
impl EvalIO for TvixStoreIO {
@ -475,9 +550,8 @@ mod tests {
use tvix_eval::{EvalIO, EvaluationResult};
use tvix_store::pathinfoservice::MemoryPathInfoService;
use crate::builtins::{add_derivation_builtins, add_fetcher_builtins};
use super::TvixStoreIO;
use crate::builtins::{add_derivation_builtins, add_fetcher_builtins, add_import_builtins};
/// evaluates a given nix expression and returns the result.
/// Takes care of setting up the evaluator so it knows about the
@ -504,6 +578,7 @@ mod tests {
add_derivation_builtins(&mut eval, io.clone());
add_fetcher_builtins(&mut eval, io.clone());
add_import_builtins(&mut eval, io);
// run the evaluation itself.
eval.evaluate(str, None)