feat(tvix/store/pathinfosvc): provide listing
This provides an additional method in the PathInfoService trait, as well as an RPC method on the gRPC layer to list all PathInfo objects in a PathInfoService. Change-Id: I7378f6bbd334bd6ac4e9be92505bd099a1c2b19a Reviewed-on: https://cl.tvl.fyi/c/depot/+/9216 Reviewed-by: tazjin <tazjin@tvl.su> Autosubmit: flokli <flokli@flokli.de> Tested-by: BuildkiteCI
This commit is contained in:
parent
e41b5ae3f0
commit
da9d706e0a
9 changed files with 310 additions and 41 deletions
|
|
@ -1,8 +1,12 @@
|
|||
use super::PathInfoService;
|
||||
use crate::{blobservice::BlobService, directoryservice::DirectoryService, proto};
|
||||
use crate::{
|
||||
blobservice::BlobService,
|
||||
directoryservice::DirectoryService,
|
||||
proto::{self, ListPathInfoRequest},
|
||||
};
|
||||
use std::sync::Arc;
|
||||
use tokio::net::UnixStream;
|
||||
use tonic::{transport::Channel, Code, Status};
|
||||
use tonic::{transport::Channel, Code, Status, Streaming};
|
||||
|
||||
/// Connects to a (remote) tvix-store PathInfoService over gRPC.
|
||||
#[derive(Clone)]
|
||||
|
|
@ -160,6 +164,62 @@ impl PathInfoService for GRPCPathInfoService {
|
|||
|
||||
Ok((resp.nar_size, nar_sha256))
|
||||
}
|
||||
|
||||
fn list(&self) -> Box<dyn Iterator<Item = Result<proto::PathInfo, crate::Error>> + Send> {
|
||||
// Get a new handle to the gRPC client.
|
||||
let mut grpc_client = self.grpc_client.clone();
|
||||
|
||||
let task: tokio::task::JoinHandle<Result<_, Status>> =
|
||||
self.tokio_handle.spawn(async move {
|
||||
let s = grpc_client
|
||||
.list(ListPathInfoRequest::default())
|
||||
.await?
|
||||
.into_inner();
|
||||
|
||||
Ok(s)
|
||||
});
|
||||
|
||||
let stream = self.tokio_handle.block_on(task).unwrap().unwrap();
|
||||
|
||||
Box::new(StreamIterator::new(self.tokio_handle.clone(), stream))
|
||||
}
|
||||
}
|
||||
|
||||
pub struct StreamIterator {
|
||||
tokio_handle: tokio::runtime::Handle,
|
||||
stream: Streaming<proto::PathInfo>,
|
||||
}
|
||||
|
||||
impl StreamIterator {
|
||||
pub fn new(tokio_handle: tokio::runtime::Handle, stream: Streaming<proto::PathInfo>) -> Self {
|
||||
Self {
|
||||
tokio_handle,
|
||||
stream,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Iterator for StreamIterator {
|
||||
type Item = Result<proto::PathInfo, crate::Error>;
|
||||
|
||||
fn next(&mut self) -> Option<Self::Item> {
|
||||
match self.tokio_handle.block_on(self.stream.message()) {
|
||||
Ok(o) => match o {
|
||||
Some(pathinfo) => {
|
||||
// validate the pathinfo
|
||||
if let Err(e) = pathinfo.validate() {
|
||||
return Some(Err(crate::Error::StorageError(format!(
|
||||
"pathinfo {:?} failed validation: {}",
|
||||
pathinfo, e
|
||||
))));
|
||||
}
|
||||
Some(Ok(pathinfo))
|
||||
}
|
||||
None => None,
|
||||
},
|
||||
Err(e) => Some(Err(crate::Error::StorageError(e.to_string()))),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue