refactor(tvix/glue): move Fetch[er] into its own types, fetch lazily

We actually want to delay fetching until we actually need the file. A
simple evaluation asking for `.outPath` or `.drvPath` should work even
in a pure offline environment.

Before this CL, the fetching logic was quite distributed between
tvix_store_io, and builtins/fetchers.rs.

Rather than having various functions and conversions between structs,
describe a Fetch as an enum type, with the fields describing the fetch.

Define a store_path() function on top of `Fetch` which can be used to
ask for the calculated store path (if the digest has been provided
upfront).

Have a `Fetcher` struct, and give it a `fetch_and_persist` function,
taking a `Fetch` as well as a desired name, and have it deal with all
the logic of persisting the PathInfos. It also returns a StorePathRef,
similar to the `.store_path()` method on a `Fetch` struct.

In a followup CL, we can extend KnownPaths to track fetches AND
derivations, and then use `Fetcher` when we need to do IO into that
store path.

Change-Id: Ib39a96baeb661750a8706b461f8ba4abb342e777
Reviewed-on: https://cl.tvl.fyi/c/depot/+/11500
Reviewed-by: raitobezarius <tvl@lahfa.xyz>
Autosubmit: flokli <flokli@flokli.de>
Tested-by: BuildkiteCI
This commit is contained in:
Florian Klink 2024-04-22 14:02:48 +03:00 committed by clbot
parent dc444e55dc
commit 091de12a9a
8 changed files with 538 additions and 399 deletions

View file

@ -4,10 +4,9 @@ use async_recursion::async_recursion;
use bytes::Bytes;
use futures::{StreamExt, TryStreamExt};
use nix_compat::nixhash::NixHash;
use nix_compat::store_path::{build_ca_path, StorePathRef};
use nix_compat::store_path::StorePathRef;
use nix_compat::{nixhash::CAHash, store_path::StorePath};
use sha2::{Digest, Sha256};
use std::rc::Rc;
use std::{
cell::RefCell,
collections::BTreeSet,
@ -15,24 +14,22 @@ use std::{
path::{Path, PathBuf},
sync::Arc,
};
use tokio::io::AsyncBufRead;
use tokio_util::io::{InspectReader, SyncIoBridge};
use tokio_util::io::SyncIoBridge;
use tracing::{error, instrument, warn, Level};
use tvix_build::buildservice::BuildService;
use tvix_castore::import::archive::ingest_archive;
use tvix_eval::{ErrorKind, EvalIO, FileType, StdIO};
use tvix_castore::proto::node::Node;
use tvix_eval::{EvalIO, FileType, StdIO};
use tvix_store::utils::AsyncIoBridge;
use tvix_castore::{
blobservice::BlobService,
directoryservice::{self, DirectoryService},
proto::{node::Node, FileNode, NamedNode},
proto::NamedNode,
B3Digest,
};
use tvix_store::{pathinfoservice::PathInfoService, proto::PathInfo};
use crate::builtins::FetcherError;
use crate::decompression::DecompressedReader;
use crate::fetchers::Fetcher;
use crate::known_paths::KnownPaths;
use crate::tvix_build::derivation_to_build_request;
@ -60,7 +57,10 @@ pub struct TvixStoreIO {
#[allow(dead_code)]
build_service: Arc<dyn BuildService>,
pub(crate) tokio_handle: tokio::runtime::Handle,
http_client: reqwest::Client,
pub(crate) fetcher:
Fetcher<Arc<dyn BlobService>, Arc<dyn DirectoryService>, Arc<dyn PathInfoService>>,
pub(crate) known_paths: RefCell<KnownPaths>,
}
@ -73,13 +73,13 @@ impl TvixStoreIO {
tokio_handle: tokio::runtime::Handle,
) -> Self {
Self {
blob_service,
directory_service,
path_info_service,
blob_service: blob_service.clone(),
directory_service: directory_service.clone(),
path_info_service: path_info_service.clone(),
std_io: StdIO {},
build_service,
tokio_handle,
http_client: reqwest::Client::new(),
fetcher: Fetcher::new(blob_service, directory_service, path_info_service),
known_paths: Default::default(),
}
}
@ -358,130 +358,6 @@ impl TvixStoreIO {
.await?
.is_some())
}
async fn download<'a>(&self, url: &str) -> Result<impl AsyncBufRead + Unpin + 'a, ErrorKind> {
let resp = self
.http_client
.get(url)
.send()
.await
.map_err(FetcherError::from)?;
Ok(tokio_util::io::StreamReader::new(
resp.bytes_stream().map_err(|e| {
let e = e.without_url();
warn!(%e, "failed to get response body");
io::Error::new(io::ErrorKind::BrokenPipe, e.to_string())
}),
))
}
pub async fn fetch_url(
&self,
url: &str,
name: &str,
hash: Option<&NixHash>,
) -> Result<StorePath, ErrorKind> {
let mut sha = Sha256::new();
let data = self.download(url).await?;
let mut data = InspectReader::new(data, |b| sha.update(b));
let mut blob = self.blob_service.open_write().await;
let size = tokio::io::copy(&mut data, blob.as_mut()).await?;
drop(data);
let blob_digest = blob.close().await?;
let got = NixHash::Sha256(sha.finalize().into());
let hash = CAHash::Flat(if let Some(wanted) = hash {
if *wanted != got {
return Err(FetcherError::HashMismatch {
url: url.to_owned(),
wanted: wanted.clone(),
got,
}
.into());
}
wanted.clone()
} else {
got
});
let path = build_ca_path(name, &hash, Vec::<String>::new(), false)
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
let node = Node::File(FileNode {
name: path.to_string().into(),
digest: blob_digest.into(),
size,
executable: false,
});
let (nar_size, nar_sha256) = self
.path_info_service
.calculate_nar(&node)
.await
.map_err(|e| ErrorKind::TvixError(Rc::new(e)))?;
let path_info = PathInfo {
node: Some(tvix_castore::proto::Node {
node: Some(node.clone()),
}),
references: vec![],
narinfo: Some(tvix_store::proto::NarInfo {
nar_size,
nar_sha256: nar_sha256.to_vec().into(),
signatures: vec![],
reference_names: vec![],
deriver: None, /* ? */
ca: Some((&hash).into()),
}),
};
self.path_info_service
.put(path_info)
.await
.map_err(|e| std::io::Error::new(io::ErrorKind::Other, e))?;
Ok(path.to_owned())
}
pub async fn fetch_tarball(
&self,
url: &str,
name: &str,
ca: Option<CAHash>,
) -> Result<StorePath, ErrorKind> {
let data = self.download(url).await?;
let data = DecompressedReader::new(data);
let archive = tokio_tar::Archive::new(data);
let node = ingest_archive(&self.blob_service, &self.directory_service, archive)
.await
.map_err(|e| std::io::Error::new(io::ErrorKind::Other, e))?;
let (path_info, got, output_path) = self
.node_to_path_info(
name,
Path::new(""),
ca.clone().expect("TODO: support unspecified CA hash"),
node,
)
.await?;
if let Some(wanted) = &ca {
if *wanted.hash() != got {
return Err(FetcherError::HashMismatch {
url: url.to_owned(),
wanted: wanted.hash().into_owned(),
got,
}
.into());
}
}
self.path_info_service
.put(path_info)
.await
.map_err(|e| std::io::Error::new(io::ErrorKind::Other, e))?;
Ok(output_path)
}
}
impl EvalIO for TvixStoreIO {