refactor(tvix/castore/import): restructure error types
Have ingest_entries return an Error type with only three kinds: - Error while uploading a specific Directory - Error while finalizing the directory upload - Error from the producer Move all ingestion method-specific errors to the individual implementations. Change-Id: I2a015cb7ebc96d084cbe2b809f40d1b53a15daf3 Reviewed-on: https://cl.tvl.fyi/c/depot/+/11557 Autosubmit: flokli <flokli@flokli.de> Tested-by: BuildkiteCI Reviewed-by: Connor Brewster <cbrewster@hey.com>
This commit is contained in:
parent
77546d734e
commit
c9d3946cb5
7 changed files with 126 additions and 72 deletions
|
|
@ -26,7 +26,7 @@ use std::{
|
|||
use tracing::instrument;
|
||||
|
||||
mod error;
|
||||
pub use error::Error;
|
||||
pub use error::IngestionError;
|
||||
|
||||
pub mod archive;
|
||||
pub mod fs;
|
||||
|
|
@ -49,10 +49,14 @@ pub mod fs;
|
|||
///
|
||||
/// On success, returns the root node.
|
||||
#[instrument(skip_all, ret(level = Level::TRACE), err)]
|
||||
pub async fn ingest_entries<DS, S>(directory_service: DS, mut entries: S) -> Result<Node, Error>
|
||||
pub async fn ingest_entries<DS, S, E>(
|
||||
directory_service: DS,
|
||||
mut entries: S,
|
||||
) -> Result<Node, IngestionError<E>>
|
||||
where
|
||||
DS: AsRef<dyn DirectoryService>,
|
||||
S: Stream<Item = Result<IngestionEntry, Error>> + Send + std::marker::Unpin,
|
||||
S: Stream<Item = Result<IngestionEntry, E>> + Send + std::marker::Unpin,
|
||||
E: std::error::Error,
|
||||
{
|
||||
// For a given path, this holds the [Directory] structs as they are populated.
|
||||
let mut directories: HashMap<PathBuf, Directory> = HashMap::default();
|
||||
|
|
@ -102,7 +106,10 @@ where
|
|||
maybe_directory_putter
|
||||
.get_or_insert_with(|| directory_service.as_ref().put_multiple_start())
|
||||
.put(directory)
|
||||
.await?;
|
||||
.await
|
||||
.map_err(|e| {
|
||||
IngestionError::UploadDirectoryError(entry.path().to_path_buf(), e)
|
||||
})?;
|
||||
|
||||
Node::Directory(DirectoryNode {
|
||||
name,
|
||||
|
|
@ -147,7 +154,10 @@ where
|
|||
// they're all persisted to the backend.
|
||||
if let Some(mut directory_putter) = maybe_directory_putter {
|
||||
#[cfg_attr(not(debug_assertions), allow(unused))]
|
||||
let root_directory_digest = directory_putter.close().await?;
|
||||
let root_directory_digest = directory_putter
|
||||
.close()
|
||||
.await
|
||||
.map_err(|e| IngestionError::FinalizeDirectoryUpload(e))?;
|
||||
|
||||
#[cfg(debug_assertions)]
|
||||
{
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue