From 43e5c0ff0eb87df85164577be1aca9e8e1f4452d Mon Sep 17 00:00:00 2001 From: Bob van der Linden Date: Wed, 29 Jan 2025 23:18:17 +0100 Subject: [PATCH] feat(tvix/castore): ingest_entries: error on unexpected end of stream Currently ingest_entries panics. This makes it hard to use ingest_entries in parallel with other processes. When another process runs into an error and wants to return so, while ingest_entries has already started and has no entries yet, we're forced to panic. With this change ingest_entries will return an error when there are no entries, thus allowing the user to handle the errors as they please. Change-Id: I78b85bf18f52af8c157d6bedad6019fd4398250a Reviewed-on: https://cl.tvl.fyi/c/depot/+/13146 Reviewed-by: flokli Autosubmit: Bob van der Linden Reviewed-by: Bob van der Linden Tested-by: BuildkiteCI --- tvix/castore/src/import/error.rs | 3 +++ tvix/castore/src/import/mod.rs | 19 ++++++++++++++++--- 2 files changed, 19 insertions(+), 3 deletions(-) diff --git a/tvix/castore/src/import/error.rs b/tvix/castore/src/import/error.rs index e3fba617e..26d7fa8ba 100644 --- a/tvix/castore/src/import/error.rs +++ b/tvix/castore/src/import/error.rs @@ -17,4 +17,7 @@ pub enum IngestionError { #[error("failed to finalize directory upload: {0}")] FinalizeDirectoryUpload(CastoreError), + + #[error("unexpected end of stream")] + UnexpectedEndOfStream, } diff --git a/tvix/castore/src/import/mod.rs b/tvix/castore/src/import/mod.rs index 6e10a6493..07a3dceff 100644 --- a/tvix/castore/src/import/mod.rs +++ b/tvix/castore/src/import/mod.rs @@ -57,7 +57,7 @@ where .await // The last entry of the stream must have 1 path component, after which // we break the loop manually. - .expect("Tvix bug: unexpected end of stream")?; + .ok_or(IngestionError::UnexpectedEndOfStream)??; let node = match &mut entry { IngestionEntry::Dir { .. } => { @@ -290,9 +290,7 @@ mod test { } #[rstest] - #[should_panic] #[case::empty_entries(vec![])] - #[should_panic] #[case::missing_intermediate_dir(vec![ IngestionEntry::Regular { path: "blub/.keep".parse().unwrap(), @@ -301,6 +299,21 @@ mod test { digest: EMPTY_BLOB_DIGEST.clone(), }, ])] + #[tokio::test] + async fn test_end_of_stream(#[case] entries: Vec) { + use crate::import::IngestionError; + + let directory_service = MemoryDirectoryService::default(); + + let result = ingest_entries( + directory_service.clone(), + futures::stream::iter(entries.into_iter().map(Ok::<_, std::io::Error>)), + ) + .await; + assert!(matches!(result, Err(IngestionError::UnexpectedEndOfStream))); + } + + #[rstest] #[should_panic] #[case::leaf_after_parent(vec![ IngestionEntry::Dir {