ClosureValidator was previously only suitable for a very narrow use case: Validating incoming uploads, which are in leaves-to-root order. This is because the ordering validation was hard-wired into the add() function. This - Re-name ClosureValidator to DirectoryGraph, which is more suitable since it actually stores the Directory structs and is drained in the end. - Move the ordering-related logic to a separate OrderValidator, which can be used independently. - re-write DirectoryGraph to be a general purpose validator which can accept the input in both orders and can be drained in both orders as well. This means the DirectoryGraph and OrderValidator can now serve multiple new purposes: - Validating the incoming closure on the client while downloading. - Validating the incoming closure downloaded in a caching layer from the `far` cache, and re-order it for insertion into the `near` cache. Change-Id: I2b4b226348416912d7a31935bec050e53d911b70 Reviewed-on: https://cl.tvl.fyi/c/depot/+/11708 Tested-by: BuildkiteCI Reviewed-by: flokli <flokli@flokli.de> Autosubmit: yuka <yuka@yuka.dev>
110 lines
4 KiB
Rust
110 lines
4 KiB
Rust
use crate::directoryservice::DirectoryGraph;
|
|
use crate::directoryservice::LeavesToRootValidator;
|
|
use crate::proto;
|
|
use crate::{directoryservice::DirectoryService, B3Digest};
|
|
use futures::stream::BoxStream;
|
|
use futures::TryStreamExt;
|
|
use std::ops::Deref;
|
|
use tokio_stream::once;
|
|
use tonic::{async_trait, Request, Response, Status, Streaming};
|
|
use tracing::{instrument, warn};
|
|
|
|
pub struct GRPCDirectoryServiceWrapper<T> {
|
|
directory_service: T,
|
|
}
|
|
|
|
impl<T> GRPCDirectoryServiceWrapper<T> {
|
|
pub fn new(directory_service: T) -> Self {
|
|
Self { directory_service }
|
|
}
|
|
}
|
|
|
|
#[async_trait]
|
|
impl<T> proto::directory_service_server::DirectoryService for GRPCDirectoryServiceWrapper<T>
|
|
where
|
|
T: Deref<Target = dyn DirectoryService> + Send + Sync + 'static,
|
|
{
|
|
type GetStream = BoxStream<'static, tonic::Result<proto::Directory, Status>>;
|
|
|
|
#[instrument(skip_all)]
|
|
async fn get<'a>(
|
|
&'a self,
|
|
request: Request<proto::GetDirectoryRequest>,
|
|
) -> Result<Response<Self::GetStream>, Status> {
|
|
let req_inner = request.into_inner();
|
|
|
|
let by_what = &req_inner
|
|
.by_what
|
|
.ok_or_else(|| Status::invalid_argument("invalid by_what"))?;
|
|
|
|
match by_what {
|
|
proto::get_directory_request::ByWhat::Digest(ref digest) => {
|
|
let digest: B3Digest = digest
|
|
.clone()
|
|
.try_into()
|
|
.map_err(|_e| Status::invalid_argument("invalid digest length"))?;
|
|
|
|
Ok(tonic::Response::new({
|
|
if !req_inner.recursive {
|
|
let directory = self
|
|
.directory_service
|
|
.get(&digest)
|
|
.await
|
|
.map_err(|e| {
|
|
warn!(err = %e, directory.digest=%digest, "failed to get directory");
|
|
tonic::Status::new(tonic::Code::Internal, e.to_string())
|
|
})?
|
|
.ok_or_else(|| {
|
|
Status::not_found(format!("directory {} not found", digest))
|
|
})?;
|
|
|
|
Box::pin(once(Ok(directory)))
|
|
} else {
|
|
// If recursive was requested, traverse via get_recursive.
|
|
Box::pin(
|
|
self.directory_service.get_recursive(&digest).map_err(|e| {
|
|
tonic::Status::new(tonic::Code::Internal, e.to_string())
|
|
}),
|
|
)
|
|
}
|
|
}))
|
|
}
|
|
}
|
|
}
|
|
|
|
#[instrument(skip_all)]
|
|
async fn put(
|
|
&self,
|
|
request: Request<Streaming<proto::Directory>>,
|
|
) -> Result<Response<proto::PutDirectoryResponse>, Status> {
|
|
let mut req_inner = request.into_inner();
|
|
|
|
// We put all Directory messages we receive into DirectoryGraph.
|
|
let mut validator = DirectoryGraph::<LeavesToRootValidator>::default();
|
|
while let Some(directory) = req_inner.message().await? {
|
|
validator
|
|
.add(directory)
|
|
.map_err(|e| tonic::Status::new(tonic::Code::Internal, e.to_string()))?;
|
|
}
|
|
|
|
// drain, which validates connectivity too.
|
|
let directories = validator
|
|
.validate()
|
|
.map_err(|e| tonic::Status::new(tonic::Code::Internal, e.to_string()))?
|
|
.drain_leaves_to_root()
|
|
.collect::<Vec<_>>();
|
|
|
|
let mut directory_putter = self.directory_service.put_multiple_start();
|
|
for directory in directories {
|
|
directory_putter.put(directory).await?;
|
|
}
|
|
|
|
// Properly close the directory putter. Peek at last_directory_digest
|
|
// and return it, or propagate errors.
|
|
let last_directory_dgst = directory_putter.close().await?;
|
|
|
|
Ok(Response::new(proto::PutDirectoryResponse {
|
|
root_digest: last_directory_dgst.into(),
|
|
}))
|
|
}
|
|
}
|