refactor(tvix/castore): ingest filesystem entries in parallel

Rather than carrying around an Future in the IngestionEntry::Regular,
simply carry the plain B3Digest.

Code reading through a non-seekable data stream has no choice but to
read and upload blobs immediately, and code seeking through something
seekable (like a filesystem) probably knows better what concurrency to
pick when ingesting, rather than the consuming side.

(Our only) one of these seekable source implementations is now doing
exactly that. We produce a stream of futures, and then use
[StreamExt::buffered] to process more than one, concurrently.

We still keep the same order, to avoid shuffling things and violating
the stream order.

This also cleans up walk_path_for_ingestion in castore/import, as well
as ingest_dir_entries in glue/tvix_store_io.

Change-Id: I5eb70f3e1e372c74bcbfcf6b6e2653eba36e151d
Reviewed-on: https://cl.tvl.fyi/c/depot/+/11491
Autosubmit: flokli <flokli@flokli.de>
Reviewed-by: Connor Brewster <cbrewster@hey.com>
Tested-by: BuildkiteCI
This commit is contained in:
Florian Klink 2024-04-20 19:32:30 +03:00 committed by clbot
parent 01239a4f6f
commit 5fc403587f
4 changed files with 53 additions and 82 deletions

View file

@ -13,10 +13,9 @@ use crate::proto::DirectoryNode;
use crate::proto::FileNode;
use crate::proto::SymlinkNode;
use crate::B3Digest;
use futures::Future;
use futures::{Stream, StreamExt};
use std::fs::FileType;
use std::pin::Pin;
use tracing::Level;
#[cfg(target_family = "unix")]
@ -52,10 +51,10 @@ pub mod fs;
///
/// On success, returns the root node.
#[instrument(skip_all, ret(level = Level::TRACE), err)]
pub async fn ingest_entries<'a, DS, S>(directory_service: DS, mut entries: S) -> Result<Node, Error>
pub async fn ingest_entries<DS, S>(directory_service: DS, mut entries: S) -> Result<Node, Error>
where
DS: AsRef<dyn DirectoryService>,
S: Stream<Item = Result<IngestionEntry<'a>, Error>> + Send + std::marker::Unpin,
S: Stream<Item = Result<IngestionEntry, Error>> + Send + std::marker::Unpin,
{
// For a given path, this holds the [Directory] structs as they are populated.
let mut directories: HashMap<PathBuf, Directory> = HashMap::default();
@ -124,7 +123,7 @@ where
..
} => Node::File(FileNode {
name,
digest: digest.await?.into(),
digest: digest.to_owned().into(),
size: *size,
executable: *executable,
}),
@ -200,14 +199,12 @@ where
Ok(digest)
}
type BlobFut<'a> = Pin<Box<dyn Future<Output = Result<B3Digest, Error>> + Send + 'a>>;
pub enum IngestionEntry<'a> {
pub enum IngestionEntry {
Regular {
path: PathBuf,
size: u64,
executable: bool,
digest: BlobFut<'a>,
digest: B3Digest,
},
Symlink {
path: PathBuf,
@ -222,7 +219,7 @@ pub enum IngestionEntry<'a> {
},
}
impl<'a> IngestionEntry<'a> {
impl IngestionEntry {
fn path(&self) -> &Path {
match self {
IngestionEntry::Regular { path, .. } => path,