feat(tvix/store/directorysvc): add DirectoryService::get_recursive()

This moves the recursive BFS traversal of Directory closures from the
GRPCDirectoryServiceWrapper out into a a DirectoryTraverser struct
implementing Iterator.

It is then used from various implementors of DirectoryService in the
`get_recursive()` method.

This allows distinguishing between recursive requests and non-recursive
requests in the gRPC client trait implementation.

Change-Id: I50bfd4a0d9eb11832847329b78c587ec7c9dc7b1
Reviewed-on: https://cl.tvl.fyi/c/depot/+/8351
Autosubmit: flokli <flokli@flokli.de>
Reviewed-by: tazjin <tazjin@tvl.su>
Tested-by: BuildkiteCI
This commit is contained in:
Florian Klink 2023-03-26 13:51:48 +02:00 committed by flokli
parent 2d305fd5b3
commit 2fe53cce40
5 changed files with 316 additions and 97 deletions

View file

@ -1,11 +1,11 @@
use crate::directoryservice::DirectoryService;
use crate::proto;
use data_encoding::BASE64;
use std::collections::{HashMap, HashSet, VecDeque};
use std::collections::HashMap;
use tokio::{sync::mpsc::channel, task};
use tokio_stream::wrappers::ReceiverStream;
use tonic::{async_trait, Request, Response, Status, Streaming};
use tracing::{debug, info_span, instrument, warn};
use tracing::{debug, instrument, warn};
pub struct GRPCDirectoryServiceWrapper<C: DirectoryService> {
directory_service: C,
@ -36,109 +36,51 @@ impl<DS: DirectoryService + Send + Sync + Clone + 'static>
let directory_service = self.directory_service.clone();
// kick off an async thread
task::spawn(async move {
// Keep the list of directory digests to traverse.
// As per rpc_directory.proto, we traverse in BFS order.
let mut deq: VecDeque<[u8; 32]> = VecDeque::new();
let _task = {
// look at the digest in the request and put it in the top of the queue.
match &req_inner.by_what {
None => return Err(Status::invalid_argument("by_what needs to be specified")),
Some(proto::get_directory_request::ByWhat::Digest(digest)) => {
deq.push_back(
digest
.as_slice()
.try_into()
.map_err(|_e| Status::invalid_argument("invalid digest length"))?,
);
}
}
let digest: [u8; 32] = digest
.as_slice()
.try_into()
.map_err(|_e| Status::invalid_argument("invalid digest length"))?;
// keep a list of all the Directory messages already sent, so we can omit sending the same.
let mut sent_directory_dgsts: HashSet<[u8; 32]> = HashSet::new();
let digest_b64: String = BASE64.encode(&digest);
// look up the directory at the top of the queue
while let Some(digest) = deq.pop_front() {
let digest_b64: String = BASE64.encode(&digest);
task::spawn(async move {
if !req_inner.recursive {
let e: Result<proto::Directory, Status> =
match directory_service.get(&digest) {
Ok(Some(directory)) => Ok(directory),
Ok(None) => Err(Status::not_found(format!(
"directory {} not found",
digest_b64
))),
Err(e) => Err(e.into()),
};
// add digest we're currently processing to a span, but pay attention to
// https://docs.rs/tracing/0.1.37/tracing/span/struct.Span.html#in-asynchronous-code
// There may be no await in here (we leave the span before the tx.send(…).await)
let span = info_span!("digest", "{}", &digest_b64);
let res: Result<proto::Directory, Status> = {
let _enter = span.enter();
// invoke client.get, and map to a Result<Directory, Status>
match directory_service.get(&digest) {
// The directory was not found, abort
Ok(None) => {
if !sent_directory_dgsts.is_empty() {
// If this is not the first lookup, we have a
// consistency issue, and we're missing some children, of which we have the
// parents. Log this out.
// Both the node we started with, and the
// current digest are part of the span.
warn!("consistency issue: directory not found")
if tx.send(e).await.is_err() {
debug!("receiver dropped");
}
Err(Status::not_found(format!(
"directory {} not found",
digest_b64
)))
}
Ok(Some(directory)) => {
// if recursion was requested, all its children need to be added to the queue.
// If a Directory message with the same digest has already
// been sent previously, we can skip enqueueing it.
// Same applies to when it already is in the queue.
if req_inner.recursive {
for child_directory_node in &directory.directories {
let child_directory_node_digest: [u8; 32] =
child_directory_node.digest.clone().try_into().map_err(
|_e| {
Status::internal(
"invalid child directory digest len",
)
},
)?;
} else {
// If recursive was requested, traverse via get_recursive.
let directories_it = directory_service.get_recursive(&digest);
if !sent_directory_dgsts.contains(&child_directory_node_digest)
&& !deq.contains(&child_directory_node_digest)
{
deq.push_back(child_directory_node_digest);
}
for e in directories_it {
// map err in res from Error to Status
let res = e.map_err(|e| Status::internal(e.to_string()));
if tx.send(res).await.is_err() {
debug!("receiver dropped");
break;
}
}
// add it to sent_directory_dgsts.
// Strictly speaking, it wasn't sent yet, but tx.send happens right after,
// and the only way we can still fail is by the remote side to hang up,
// in which case we stop anyways.
sent_directory_dgsts.insert(digest);
Ok(directory)
}
Err(e) => Err(e.into()),
}
};
// send the result to the client
if (tx.send(res).await).is_err() {
debug!("receiver dropped");
break;
});
}
}
};
Ok(())
});
// NOTE: this always returns an Ok response, with the first item in the
// stream being a potential error, instead of directly returning the
// first error.
// There's no need to check if the directory node exists twice,
// and client code should consider an Err(), or the first item of the
// stream being an error to be equivalent.
let receiver_stream = ReceiverStream::new(rx);
Ok(Response::new(receiver_stream))
}