diff --git a/snix/store/src/pathinfoservice/redb.rs b/snix/store/src/pathinfoservice/redb.rs index a83cce302..f32f7d472 100644 --- a/snix/store/src/pathinfoservice/redb.rs +++ b/snix/store/src/pathinfoservice/redb.rs @@ -1,7 +1,8 @@ use super::{PathInfo, PathInfoService}; use crate::proto; +use async_stream::try_stream; use data_encoding::BASE64; -use futures::{StreamExt, stream::BoxStream}; +use futures::stream::BoxStream; use prost::Message; use redb::{Database, ReadableTable, TableDefinition}; use snix_castore::{ @@ -9,7 +10,6 @@ use snix_castore::{ composition::{CompositionContext, ServiceBuilder}, }; use std::{path::PathBuf, sync::Arc}; -use tokio_stream::wrappers::ReceiverStream; use tonic::async_trait; use tracing::{instrument, warn}; @@ -130,37 +130,26 @@ impl PathInfoService for RedbPathInfoService { fn list(&self) -> BoxStream<'static, Result> { let db = self.db.clone(); - let (tx, rx) = tokio::sync::mpsc::channel(50); + Box::pin(try_stream! { + let read_txn = db.begin_read()?; + let table = read_txn.open_table(PATHINFO_TABLE)?; - // Spawn a blocking task which writes all PathInfos to tx. - tokio::task::spawn_blocking({ - move || -> Result<(), Error> { - let read_txn = db.begin_read()?; - let table = read_txn.open_table(PATHINFO_TABLE)?; - - for elem in table.iter()? { - let elem = elem?; - tokio::runtime::Handle::current() - .block_on(tx.send(Ok({ - let path_info_proto = proto::PathInfo::decode( - elem.1.value().as_slice(), - ) - .map_err(|e| { - warn!(err=%e, "invalid PathInfo"); - Error::StorageError("invalid PathInfo".to_string()) - })?; - PathInfo::try_from(path_info_proto).map_err(|e| { - Error::StorageError(format!("Invalid path info: {e}")) - })? - }))) - .map_err(|e| Error::StorageError(e.to_string()))?; + for elem in table.iter()? { + let elem = elem?; + yield { + let path_info_proto = proto::PathInfo::decode( + elem.1.value().as_slice(), + ) + .map_err(|e| { + warn!(err=%e, "invalid PathInfo"); + Error::StorageError("invalid PathInfo".to_string()) + })?; + PathInfo::try_from(path_info_proto).map_err(|e| { + Error::StorageError(format!("Invalid path info: {e}")) + })? } - - Ok(()) } - }); - - ReceiverStream::from(rx).boxed() + }) } }