fix(snix/store/redb): no blocking IO in list() without spawn_blocking
All these functions do blocking IO, and don't yield back to the executor, so we cannot invoke them directly, but have to use spawn_blocking and a channel. Instead of plainly reverting cl/30575, this keeps potential errors being sent as the last element of the stream. We need to make our error construction a bit more ergonomic, potentially allow them to wrap other errors instead of the madness this currently is, but this is something for a later CL. Change-Id: Ifb05871741813a389ac00b4f2c468f984a689a18 Reviewed-on: https://cl.snix.dev/c/snix/+/30586 Reviewed-by: Vova Kryachko <v.kryachko@gmail.com> Tested-by: besadii
This commit is contained in:
parent
ae2af10cf8
commit
51fc9948cf
1 changed files with 47 additions and 18 deletions
|
|
@ -1,8 +1,7 @@
|
||||||
use super::{PathInfo, PathInfoService};
|
use super::{PathInfo, PathInfoService};
|
||||||
use crate::proto;
|
use crate::proto;
|
||||||
use async_stream::try_stream;
|
|
||||||
use data_encoding::BASE64;
|
use data_encoding::BASE64;
|
||||||
use futures::stream::BoxStream;
|
use futures::{StreamExt, stream::BoxStream};
|
||||||
use prost::Message;
|
use prost::Message;
|
||||||
use redb::{Database, ReadableTable, TableDefinition};
|
use redb::{Database, ReadableTable, TableDefinition};
|
||||||
use snix_castore::{
|
use snix_castore::{
|
||||||
|
|
@ -10,6 +9,7 @@ use snix_castore::{
|
||||||
composition::{CompositionContext, ServiceBuilder},
|
composition::{CompositionContext, ServiceBuilder},
|
||||||
};
|
};
|
||||||
use std::{path::PathBuf, sync::Arc};
|
use std::{path::PathBuf, sync::Arc};
|
||||||
|
use tokio_stream::wrappers::ReceiverStream;
|
||||||
use tonic::async_trait;
|
use tonic::async_trait;
|
||||||
use tracing::{instrument, warn};
|
use tracing::{instrument, warn};
|
||||||
|
|
||||||
|
|
@ -130,26 +130,55 @@ impl PathInfoService for RedbPathInfoService {
|
||||||
|
|
||||||
fn list(&self) -> BoxStream<'static, Result<PathInfo, Error>> {
|
fn list(&self) -> BoxStream<'static, Result<PathInfo, Error>> {
|
||||||
let db = self.db.clone();
|
let db = self.db.clone();
|
||||||
Box::pin(try_stream! {
|
let (tx, rx) = tokio::sync::mpsc::channel(64);
|
||||||
let read_txn = db.begin_read()?;
|
|
||||||
let table = read_txn.open_table(PATHINFO_TABLE)?;
|
|
||||||
|
|
||||||
for elem in table.iter()? {
|
tokio::task::spawn_blocking(move || {
|
||||||
let elem = elem?;
|
// IIFE to be able to use ? for the error cases
|
||||||
yield {
|
let result = (|| -> Result<(), Error> {
|
||||||
let path_info_proto = proto::PathInfo::decode(
|
let read_txn = db.begin_read().map_err(|err| {
|
||||||
elem.1.value().as_slice(),
|
warn!(%err, "failed to open read transaction");
|
||||||
)
|
Error::StorageError("failed to open read transaction".to_string())
|
||||||
.map_err(|e| {
|
|
||||||
warn!(err=%e, "invalid PathInfo");
|
|
||||||
Error::StorageError("invalid PathInfo".to_string())
|
|
||||||
})?;
|
})?;
|
||||||
PathInfo::try_from(path_info_proto).map_err(|e| {
|
|
||||||
Error::StorageError(format!("Invalid path info: {e}"))
|
let table = read_txn.open_table(PATHINFO_TABLE).map_err(|err| {
|
||||||
})?
|
warn!(%err, "failed to open table");
|
||||||
|
Error::StorageError("failed to open table".to_string())
|
||||||
|
})?;
|
||||||
|
|
||||||
|
let table_iter = table.iter().map_err(|err| {
|
||||||
|
warn!(%err, "failed to iterate over table items");
|
||||||
|
Error::StorageError("failed to iterate over table items".into())
|
||||||
|
})?;
|
||||||
|
|
||||||
|
for elem in table_iter {
|
||||||
|
let elem = elem.map_err(|err| {
|
||||||
|
warn!(%err, "failed to retrieve item");
|
||||||
|
Error::StorageError("failed to retrieve item".into())
|
||||||
|
})?;
|
||||||
|
|
||||||
|
let path_info_proto = proto::PathInfo::decode(elem.1.value().as_slice())
|
||||||
|
.map_err(|err| {
|
||||||
|
warn!(%err, "invalid PathInfo");
|
||||||
|
Error::StorageError("invalid PathInfo".into())
|
||||||
|
})?;
|
||||||
|
|
||||||
|
let path_info = PathInfo::try_from(path_info_proto)
|
||||||
|
.map_err(|e| Error::StorageError(format!("Invalid path info: {e}")))?;
|
||||||
|
|
||||||
|
if tx.blocking_send(Ok(path_info)).is_err() {
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
})
|
|
||||||
|
Ok(())
|
||||||
|
})();
|
||||||
|
|
||||||
|
if let Err(err) = result {
|
||||||
|
let _ = tx.blocking_send(Err(err));
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
ReceiverStream::new(rx).boxed()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue