diff --git a/snix/store/src/pathinfoservice/redb.rs b/snix/store/src/pathinfoservice/redb.rs index f32f7d472..a500bdc94 100644 --- a/snix/store/src/pathinfoservice/redb.rs +++ b/snix/store/src/pathinfoservice/redb.rs @@ -1,8 +1,7 @@ use super::{PathInfo, PathInfoService}; use crate::proto; -use async_stream::try_stream; use data_encoding::BASE64; -use futures::stream::BoxStream; +use futures::{StreamExt, stream::BoxStream}; use prost::Message; use redb::{Database, ReadableTable, TableDefinition}; use snix_castore::{ @@ -10,6 +9,7 @@ use snix_castore::{ composition::{CompositionContext, ServiceBuilder}, }; use std::{path::PathBuf, sync::Arc}; +use tokio_stream::wrappers::ReceiverStream; use tonic::async_trait; use tracing::{instrument, warn}; @@ -130,26 +130,55 @@ impl PathInfoService for RedbPathInfoService { fn list(&self) -> BoxStream<'static, Result> { let db = self.db.clone(); - Box::pin(try_stream! { - let read_txn = db.begin_read()?; - let table = read_txn.open_table(PATHINFO_TABLE)?; + let (tx, rx) = tokio::sync::mpsc::channel(64); - for elem in table.iter()? { - let elem = elem?; - yield { - let path_info_proto = proto::PathInfo::decode( - elem.1.value().as_slice(), - ) - .map_err(|e| { - warn!(err=%e, "invalid PathInfo"); - Error::StorageError("invalid PathInfo".to_string()) + tokio::task::spawn_blocking(move || { + // IIFE to be able to use ? for the error cases + let result = (|| -> Result<(), Error> { + let read_txn = db.begin_read().map_err(|err| { + warn!(%err, "failed to open read transaction"); + Error::StorageError("failed to open read transaction".to_string()) + })?; + + let table = read_txn.open_table(PATHINFO_TABLE).map_err(|err| { + warn!(%err, "failed to open table"); + Error::StorageError("failed to open table".to_string()) + })?; + + let table_iter = table.iter().map_err(|err| { + warn!(%err, "failed to iterate over table items"); + Error::StorageError("failed to iterate over table items".into()) + })?; + + for elem in table_iter { + let elem = elem.map_err(|err| { + warn!(%err, "failed to retrieve item"); + Error::StorageError("failed to retrieve item".into()) })?; - PathInfo::try_from(path_info_proto).map_err(|e| { - Error::StorageError(format!("Invalid path info: {e}")) - })? + + let path_info_proto = proto::PathInfo::decode(elem.1.value().as_slice()) + .map_err(|err| { + warn!(%err, "invalid PathInfo"); + Error::StorageError("invalid PathInfo".into()) + })?; + + let path_info = PathInfo::try_from(path_info_proto) + .map_err(|e| Error::StorageError(format!("Invalid path info: {e}")))?; + + if tx.blocking_send(Ok(path_info)).is_err() { + break; + } } + + Ok(()) + })(); + + if let Err(err) = result { + let _ = tx.blocking_send(Err(err)); } - }) + }); + + ReceiverStream::new(rx).boxed() } }