feat(tvix/castore): ingest_entries: error on unexpected end of stream

Currently ingest_entries panics. This makes it hard to use
ingest_entries in parallel with other processes. When another process
runs into an error and wants to return so, while ingest_entries has
already started and has no entries yet, we're forced to panic.

With this change ingest_entries will return an error when there are no
entries, thus allowing the user to handle the errors as they please.

Change-Id: I78b85bf18f52af8c157d6bedad6019fd4398250a
Reviewed-on: https://cl.tvl.fyi/c/depot/+/13146
Reviewed-by: flokli <flokli@flokli.de>
Autosubmit: Bob van der Linden <bobvanderlinden@gmail.com>
Reviewed-by: Bob van der Linden <bobvanderlinden@gmail.com>
Tested-by: BuildkiteCI
This commit is contained in:
Bob van der Linden 2025-01-29 23:18:17 +01:00 committed by clbot
parent dddcc6ef01
commit 43e5c0ff0e
2 changed files with 19 additions and 3 deletions

View file

@ -17,4 +17,7 @@ pub enum IngestionError<E: std::fmt::Display> {
#[error("failed to finalize directory upload: {0}")] #[error("failed to finalize directory upload: {0}")]
FinalizeDirectoryUpload(CastoreError), FinalizeDirectoryUpload(CastoreError),
#[error("unexpected end of stream")]
UnexpectedEndOfStream,
} }

View file

@ -57,7 +57,7 @@ where
.await .await
// The last entry of the stream must have 1 path component, after which // The last entry of the stream must have 1 path component, after which
// we break the loop manually. // we break the loop manually.
.expect("Tvix bug: unexpected end of stream")?; .ok_or(IngestionError::UnexpectedEndOfStream)??;
let node = match &mut entry { let node = match &mut entry {
IngestionEntry::Dir { .. } => { IngestionEntry::Dir { .. } => {
@ -290,9 +290,7 @@ mod test {
} }
#[rstest] #[rstest]
#[should_panic]
#[case::empty_entries(vec![])] #[case::empty_entries(vec![])]
#[should_panic]
#[case::missing_intermediate_dir(vec![ #[case::missing_intermediate_dir(vec![
IngestionEntry::Regular { IngestionEntry::Regular {
path: "blub/.keep".parse().unwrap(), path: "blub/.keep".parse().unwrap(),
@ -301,6 +299,21 @@ mod test {
digest: EMPTY_BLOB_DIGEST.clone(), digest: EMPTY_BLOB_DIGEST.clone(),
}, },
])] ])]
#[tokio::test]
async fn test_end_of_stream(#[case] entries: Vec<IngestionEntry>) {
use crate::import::IngestionError;
let directory_service = MemoryDirectoryService::default();
let result = ingest_entries(
directory_service.clone(),
futures::stream::iter(entries.into_iter().map(Ok::<_, std::io::Error>)),
)
.await;
assert!(matches!(result, Err(IngestionError::UnexpectedEndOfStream)));
}
#[rstest]
#[should_panic] #[should_panic]
#[case::leaf_after_parent(vec![ #[case::leaf_after_parent(vec![
IngestionEntry::Dir { IngestionEntry::Dir {