feat(snix-glue): Propagate build inputs correctly.

We previously used to calculate the transitive input closure of builds
using eval state, i.e. based on known_paths from the drv that is being
built. This caused had 2 issues:

* The build included a lot of unnecessary build time dependencies of drv's that
  the build depends on in addition to runtime dependencies.
* Some runtime dependencies were missing causing the build to fail, see #106

This implementation uses only runtime dependencies of direct
dependencies and makes sure to include any transitive runtime dependencies,
this is achieved by querying path_info_service for "references".

fixed #106

Change-Id: Id734bed7b0cf50e2dac96501a9bc70655ed15054
Reviewed-on: https://cl.snix.dev/c/snix/+/30308
Tested-by: besadii
Reviewed-by: Florian Klink <flokli@flokli.de>
This commit is contained in:
Vova Kryachko 2025-04-11 16:00:14 +00:00
parent 851cfc7278
commit 934b52c136
7 changed files with 100 additions and 162 deletions

1
snix/Cargo.lock generated
View file

@ -4329,6 +4329,7 @@ name = "snix-glue"
version = "0.1.0"
dependencies = [
"async-compression",
"async-stream",
"bstr",
"bytes",
"clap",

View file

@ -14243,6 +14243,10 @@ rec {
packageId = "async-compression";
features = [ "tokio" "gzip" "bzip2" "xz" ];
}
{
name = "async-stream";
packageId = "async-stream";
}
{
name = "bstr";
packageId = "bstr";

View file

@ -10,6 +10,7 @@ async-compression = { workspace = true, features = [
"bzip2",
"xz",
] }
async-stream.workspace = true
bstr.workspace = true
bytes.workspace = true
data-encoding.workspace = true

View file

@ -123,7 +123,7 @@ pub(crate) mod fetcher_builtins {
}
None => {
// If we don't have enough info, do the fetch now.
let (store_path, _root_node) = state
let (store_path, _path_info) = state
.tokio_handle
.block_on(async { state.fetcher.ingest_and_persist(&name, fetch).await })
.map_err(|e| ErrorKind::SnixError(Rc::new(e)))?;

View file

@ -525,7 +525,7 @@ where
&self,
name: &'a str,
fetch: Fetch,
) -> Result<(StorePathRef<'a>, Node), FetcherError> {
) -> Result<(StorePathRef<'a>, PathInfo), FetcherError> {
// Fetch file, return the (unnamed) (File)Node of its contents, ca hash and filesize.
let (node, ca_hash, size) = self.ingest(fetch).await?;
@ -560,11 +560,11 @@ where
};
self.path_info_service
.put(path_info)
.put(path_info.clone())
.await
.map_err(|e| FetcherError::Io(e.into()))?;
Ok((store_path, node))
Ok((store_path, path_info))
}
}

View file

@ -2,13 +2,17 @@
//! [nix_compat::derivation::Derivation] to [snix_build::buildservice::BuildRequest].
use std::collections::{BTreeMap, HashSet, VecDeque};
use std::future::Future;
use std::path::PathBuf;
use async_stream::try_stream;
use bytes::Bytes;
use futures::Stream;
use nix_compat::{derivation::Derivation, nixbase32, store_path::StorePath};
use sha2::{Digest, Sha256};
use snix_build::buildservice::{AdditionalFile, BuildConstraints, BuildRequest, EnvVar};
use snix_castore::Node;
use snix_store::path_info::PathInfo;
use crate::known_paths::KnownPaths;
@ -29,107 +33,54 @@ const NIX_ENVIRONMENT_VARS: [(&str, &str); 12] = [
("TMPDIR", "/build"),
];
/// Bfs queue needs to track both leaf store paths as well as
/// input derivation outputs.
#[derive(Eq, Hash, PartialEq, Clone)]
enum DerivationQueueItem<'a> {
/// Leaf input of a derivation
InputSource(&'a StorePath<String>),
/// Derivation input that can transitively produce more paths
/// that are needed for a given output.
InputDerivation {
drv_path: &'a StorePath<String>,
output: &'a String,
},
}
/// Iterator that yields store paths in a breadth-first order.
/// It is used to get all inputs for a derivation and the needles for refscanning.
struct BfsDerivationInputs<'a> {
queue: VecDeque<DerivationQueueItem<'a>>,
visited: HashSet<DerivationQueueItem<'a>>,
known_paths: &'a KnownPaths,
}
impl<'a> Iterator for BfsDerivationInputs<'a> {
type Item = &'a StorePath<String>;
fn next(&mut self) -> Option<Self::Item> {
while let Some(item) = self.queue.pop_front() {
if !self.visited.insert(item.clone()) {
continue;
}
match item {
DerivationQueueItem::InputSource(path) => {
return Some(path);
}
DerivationQueueItem::InputDerivation { drv_path, output } => {
let drv = self
.known_paths
.get_drv_by_drvpath(drv_path)
.expect("drv Bug!!!");
let output_path = drv
.outputs
.get(output)
.expect("No output bug!")
.path
.as_ref()
.expect("output has no store path");
if self
.visited
.insert(DerivationQueueItem::InputSource(output_path))
{
self.queue.extend(
drv.input_sources
.iter()
.map(DerivationQueueItem::InputSource)
.chain(drv.input_derivations.iter().flat_map(
|(drv_path, outs)| {
outs.iter().map(move |output| {
DerivationQueueItem::InputDerivation {
drv_path,
output,
}
})
},
)),
);
}
return Some(output_path);
}
}
}
None
}
}
/// Get an iterator of a transitive input closure for a derivation.
/// Get a stream of a transitive input closure for a derivation.
/// It's used for input propagation into the build and nixbase32 needle propagation
/// for build output refscanning.
pub(crate) fn get_all_inputs<'a>(
pub(crate) fn get_all_inputs<'a, F, Fut>(
derivation: &'a Derivation,
known_paths: &'a KnownPaths,
) -> impl Iterator<Item = &'a StorePath<String>> {
BfsDerivationInputs {
queue: derivation
get_path_info: F,
) -> impl Stream<Item = Result<(StorePath<String>, Node), std::io::Error>>
where
F: Fn(StorePath<String>) -> Fut,
Fut: Future<Output = std::io::Result<Option<PathInfo>>>,
{
let mut visited: HashSet<StorePath<String>> = HashSet::new();
let mut queue: VecDeque<StorePath<String>> = derivation
.input_sources
.iter()
.map(DerivationQueueItem::InputSource)
.cloned()
.chain(
derivation
.input_derivations
.iter()
.flat_map(|(drv_path, outs)| {
outs.iter()
.map(move |output| DerivationQueueItem::InputDerivation {
drv_path,
output,
let drv = known_paths.get_drv_by_drvpath(drv_path).expect("drv Bug!!");
outs.iter().map(move |output| {
drv.outputs
.get(output)
.expect("No output bug!")
.path
.as_ref()
.expect("output has no store path")
.clone()
})
}),
)
.collect(),
visited: HashSet::new(),
known_paths,
.collect();
try_stream! {
while let Some(store_path) = queue.pop_front() {
let info = get_path_info(store_path).await?.ok_or(std::io::Error::other("path_info not present"))?;
for reference in info.references {
if visited.insert(reference.clone()) {
queue.push_back(reference);
}
}
yield (info.store_path, info.node);
}
}
}

View file

@ -1,10 +1,9 @@
//! This module provides an implementation of EvalIO talking to snix-store.
use futures::{StreamExt, TryStreamExt};
use futures::TryStreamExt;
use nix_compat::{nixhash::CAHash, store_path::StorePath};
use snix_build::buildservice::BuildService;
use snix_eval::{EvalIO, FileType, StdIO};
use snix_store::nar::NarCalculationService;
use std::collections::BTreeMap;
use std::{
cell::RefCell,
io,
@ -100,24 +99,24 @@ impl SnixStoreIO {
/// In case there is no PathInfo yet, this means we need to build it
/// (which currently is stubbed out still).
#[instrument(skip(self, store_path), fields(store_path=%store_path, indicatif.pb_show=tracing::field::Empty), ret(level = Level::TRACE), err(level = Level::TRACE))]
async fn store_path_to_node(
async fn store_path_to_path_info(
&self,
store_path: &StorePath<String>,
sub_path: &Path,
) -> io::Result<Option<Node>> {
) -> io::Result<Option<PathInfo>> {
// Find the root node for the store_path.
// It asks the PathInfoService first, but in case there was a Derivation
// produced that would build it, fall back to triggering the build.
// To populate the input nodes, it might recursively trigger builds of
// its dependencies too.
let root_node = match self
let mut path_info = match self
.path_info_service
.as_ref()
.get(*store_path.digest())
.await?
{
// TODO: use stricter typed BuildRequest here.
Some(path_info) => path_info.node,
Some(path_info) => path_info,
// If there's no PathInfo found, this normally means we have to
// trigger the build (and insert into PathInfoService, after
// reference scanning).
@ -140,7 +139,7 @@ impl SnixStoreIO {
match maybe_fetch {
Some((name, fetch)) => {
let (sp, root_node) = self
let (sp, path_info) = self
.fetcher
.ingest_and_persist(&name, fetch)
.await
@ -154,7 +153,7 @@ impl SnixStoreIO {
"store path returned from fetcher must match store path we have in fetchers"
);
root_node
path_info
}
None => {
// Look up the derivation for this output path.
@ -177,34 +176,17 @@ impl SnixStoreIO {
span.pb_set_style(&snix_tracing::PB_SPINNER_STYLE);
span.pb_set_message(&format!("⏳Waiting for inputs {}", &store_path));
// Maps from the index in refscan_needles to the full store path
// Used to map back to the actual store path from the found needles
// Importantly, this must match the order of the needles generated in derivation_to_build_request
let inputs =
crate::snix_build::get_all_inputs(&drv, &self.known_paths.borrow())
.map(StorePath::to_owned)
.collect::<Vec<_>>();
// derivation_to_build_request needs castore nodes for all inputs.
// Provide them, which means, here is where we recursively build
// all dependencies.
let resolved_inputs: BTreeMap<StorePath<String>, Node> =
futures::stream::iter(inputs.iter())
.then(|input_source| {
Box::pin({
let input_source = input_source.clone();
async move {
let node = self
.store_path_to_node(&input_source, Path::new(""))
.await?;
if let Some(node) = node {
Ok((input_source, node))
} else {
Err(io::Error::other("no node produced"))
}
}
let resolved_inputs = {
let known_paths = &self.known_paths.borrow();
crate::snix_build::get_all_inputs(&drv, known_paths, |path| {
Box::pin(async move {
self.store_path_to_path_info(&path, Path::new("")).await
})
})
}
.try_collect()
.await?;
@ -221,6 +203,7 @@ impl SnixStoreIO {
.await
.map_err(|e| std::io::Error::new(io::ErrorKind::Other, e))?;
let mut out_path_info: Option<PathInfo> = None;
// For each output, insert a PathInfo.
for ((output, output_needles), drv_output) in build_result
.outputs
@ -231,7 +214,7 @@ impl SnixStoreIO {
.zip(build_result.outputs_needles.iter())
.zip(drv.outputs.iter())
{
let (_, output_node) = output
let (output_name, output_node) = output
.clone()
.try_into_name_and_node()
.expect("invalid node");
@ -300,23 +283,15 @@ impl SnixStoreIO {
};
self.path_info_service
.put(path_info)
.put(path_info.clone())
.await
.map_err(|e| std::io::Error::new(io::ErrorKind::Other, e))?;
if output_name.as_ref() == store_path.to_string().as_bytes() {
out_path_info = Some(path_info);
}
}
// find the output for the store path requested
let s = store_path.to_string();
build_result
.outputs
.into_iter()
.map(|e| e.try_into_name_and_node().expect("invalid node"))
.find(|(output_name, _output_node)| {
output_name.as_ref() == s.as_bytes()
})
.expect("build didn't produce the store path")
.1
out_path_info.ok_or(io::Error::other("build didn't produce store path"))?
}
}
}
@ -326,9 +301,15 @@ impl SnixStoreIO {
// We convert sub_path to the castore model here.
let sub_path = snix_castore::PathBuf::from_host_path(sub_path, true)?;
directoryservice::descend_to(&self.directory_service, root_node, sub_path)
Ok(
directoryservice::descend_to(&self.directory_service, path_info.node.clone(), sub_path)
.await
.map_err(|e| std::io::Error::new(io::ErrorKind::Other, e))
.map_err(|e| std::io::Error::new(io::ErrorKind::Other, e))?
.map(|node| {
path_info.node = node;
path_info
}),
)
}
}
@ -338,7 +319,7 @@ impl EvalIO for SnixStoreIO {
if let Ok((store_path, sub_path)) = StorePath::from_absolute_path_full(path) {
if self
.tokio_handle
.block_on(self.store_path_to_node(&store_path, sub_path))?
.block_on(self.store_path_to_path_info(&store_path, sub_path))?
.is_some()
{
Ok(true)
@ -356,12 +337,12 @@ impl EvalIO for SnixStoreIO {
#[instrument(skip(self), err)]
fn open(&self, path: &Path) -> io::Result<Box<dyn io::Read>> {
if let Ok((store_path, sub_path)) = StorePath::from_absolute_path_full(path) {
if let Some(node) = self
if let Some(path_info) = self
.tokio_handle
.block_on(async { self.store_path_to_node(&store_path, sub_path).await })?
.block_on(async { self.store_path_to_path_info(&store_path, sub_path).await })?
{
// depending on the node type, treat open differently
match node {
match path_info.node {
Node::Directory { .. } => {
// This would normally be a io::ErrorKind::IsADirectory (still unstable)
Err(io::Error::new(
@ -410,11 +391,11 @@ impl EvalIO for SnixStoreIO {
#[instrument(skip(self), ret(level = Level::TRACE), err)]
fn file_type(&self, path: &Path) -> io::Result<FileType> {
if let Ok((store_path, sub_path)) = StorePath::from_absolute_path_full(path) {
if let Some(node) = self
if let Some(path_info) = self
.tokio_handle
.block_on(async { self.store_path_to_node(&store_path, sub_path).await })?
.block_on(async { self.store_path_to_path_info(&store_path, sub_path).await })?
{
match node {
match path_info.node {
Node::Directory { .. } => Ok(FileType::Directory),
Node::File { .. } => Ok(FileType::Regular),
Node::Symlink { .. } => Ok(FileType::Symlink),
@ -430,11 +411,11 @@ impl EvalIO for SnixStoreIO {
#[instrument(skip(self), ret(level = Level::TRACE), err)]
fn read_dir(&self, path: &Path) -> io::Result<Vec<(bytes::Bytes, FileType)>> {
if let Ok((store_path, sub_path)) = StorePath::from_absolute_path_full(path) {
if let Some(node) = self
if let Some(path_info) = self
.tokio_handle
.block_on(async { self.store_path_to_node(&store_path, sub_path).await })?
.block_on(async { self.store_path_to_path_info(&store_path, sub_path).await })?
{
match node {
match path_info.node {
Node::Directory { digest, .. } => {
// fetch the Directory itself.
if let Some(directory) = self.tokio_handle.block_on({