refactor(tvix/store/directorysvc): move from Vec<u8> to B3Digest

This introduces a new struct, B3Digest, which internally holds a
Vec<u8>, but only allows construction with 32 bytes.

It also implements display, which will print the base64 representation.
This should reduce some boilerplate when parsing Vec<u8>.

Change-Id: Ia91aa40cb691916773abc8f93e6ed79a5fd34863
Reviewed-on: https://cl.tvl.fyi/c/depot/+/8592
Reviewed-by: tazjin <tazjin@tvl.su>
Autosubmit: flokli <flokli@flokli.de>
Tested-by: BuildkiteCI
This commit is contained in:
Florian Klink 2023-05-18 21:43:33 +03:00 committed by clbot
parent e779b866cc
commit b8ff08b1b0
17 changed files with 199 additions and 165 deletions

View file

@ -2,8 +2,7 @@ use std::collections::HashSet;
use super::{DirectoryPutter, DirectoryService};
use crate::proto::{self, get_directory_request::ByWhat};
use crate::Error;
use data_encoding::BASE64;
use crate::{B3Digest, Error};
use tokio::sync::mpsc::UnboundedSender;
use tokio_stream::wrappers::UnboundedReceiverStream;
use tonic::{transport::Channel, Status};
@ -38,16 +37,16 @@ impl GRPCDirectoryService {
impl DirectoryService for GRPCDirectoryService {
type DirectoriesIterator = StreamIterator;
fn get(&self, digest: &[u8; 32]) -> Result<Option<crate::proto::Directory>, crate::Error> {
fn get(&self, digest: &B3Digest) -> Result<Option<crate::proto::Directory>, crate::Error> {
// Get a new handle to the gRPC client, and copy the digest.
let mut grpc_client = self.grpc_client.clone();
let digest = digest.to_owned();
let digest_as_vec = digest.to_vec();
let task = self.tokio_handle.spawn(async move {
let mut s = grpc_client
.get(proto::GetDirectoryRequest {
recursive: false,
by_what: Some(ByWhat::Digest(digest.to_vec())),
by_what: Some(ByWhat::Digest(digest_as_vec)),
})
.await?
.into_inner();
@ -56,6 +55,7 @@ impl DirectoryService for GRPCDirectoryService {
s.message().await
});
let digest = digest.clone();
match self.tokio_handle.block_on(task)? {
Ok(Some(directory)) => {
// Validate the retrieved Directory indeed has the
@ -64,16 +64,14 @@ impl DirectoryService for GRPCDirectoryService {
if actual_digest != digest {
Err(crate::Error::StorageError(format!(
"requested directory with digest {}, but got {}",
BASE64.encode(&digest),
BASE64.encode(&actual_digest)
digest, actual_digest
)))
} else if let Err(e) = directory.validate() {
// Validate the Directory itself is valid.
warn!("directory failed validation: {}", e.to_string());
Err(crate::Error::StorageError(format!(
"directory {} failed validation: {}",
BASE64.encode(&digest),
e,
digest, e,
)))
} else {
Ok(Some(directory))
@ -85,7 +83,7 @@ impl DirectoryService for GRPCDirectoryService {
}
}
fn put(&self, directory: crate::proto::Directory) -> Result<[u8; 32], crate::Error> {
fn put(&self, directory: crate::proto::Directory) -> Result<B3Digest, crate::Error> {
let mut grpc_client = self.grpc_client.clone();
let task = self
@ -93,29 +91,27 @@ impl DirectoryService for GRPCDirectoryService {
.spawn(async move { grpc_client.put(tokio_stream::iter(vec![directory])).await });
match self.tokio_handle.block_on(task)? {
Ok(put_directory_resp) => Ok(put_directory_resp
.into_inner()
.root_digest
.as_slice()
.try_into()
.map_err(|_| {
Error::StorageError("invalid root digest length in response".to_string())
})?),
Ok(put_directory_resp) => Ok(B3Digest::from_vec(
put_directory_resp.into_inner().root_digest,
)
.map_err(|_| {
Error::StorageError("invalid root digest length in response".to_string())
})?),
Err(e) => Err(crate::Error::StorageError(e.to_string())),
}
}
#[instrument(skip_all, fields(directory.digest = BASE64.encode(root_directory_digest)))]
fn get_recursive(&self, root_directory_digest: &[u8; 32]) -> Self::DirectoriesIterator {
#[instrument(skip_all, fields(directory.digest = %root_directory_digest))]
fn get_recursive(&self, root_directory_digest: &B3Digest) -> Self::DirectoriesIterator {
let mut grpc_client = self.grpc_client.clone();
let root_directory_digest = root_directory_digest.to_owned();
let root_directory_digest_as_vec = root_directory_digest.to_vec();
let task: tokio::task::JoinHandle<Result<Streaming<proto::Directory>, Status>> =
self.tokio_handle.spawn(async move {
let s = grpc_client
.get(proto::GetDirectoryRequest {
recursive: true,
by_what: Some(ByWhat::Digest(root_directory_digest.to_vec())),
by_what: Some(ByWhat::Digest(root_directory_digest_as_vec)),
})
.await?
.into_inner();
@ -125,7 +121,11 @@ impl DirectoryService for GRPCDirectoryService {
let stream = self.tokio_handle.block_on(task).unwrap().unwrap();
StreamIterator::new(self.tokio_handle.clone(), &root_directory_digest, stream)
StreamIterator::new(
self.tokio_handle.clone(),
root_directory_digest.clone(),
stream,
)
}
type DirectoryPutter = GRPCPutter;
@ -159,22 +159,22 @@ pub struct StreamIterator {
// A stream of [proto::Directory]
stream: Streaming<proto::Directory>,
// The Directory digests we received so far
received_directory_digests: HashSet<[u8; 32]>,
received_directory_digests: HashSet<B3Digest>,
// The Directory digests we're still expecting to get sent.
expected_directory_digests: HashSet<[u8; 32]>,
expected_directory_digests: HashSet<B3Digest>,
}
impl StreamIterator {
pub fn new(
tokio_handle: tokio::runtime::Handle,
root_digest: &[u8; 32],
root_digest: B3Digest,
stream: Streaming<proto::Directory>,
) -> Self {
Self {
tokio_handle,
stream,
received_directory_digests: HashSet::new(),
expected_directory_digests: HashSet::from([*root_digest]),
expected_directory_digests: HashSet::from([root_digest]),
}
}
}
@ -190,7 +190,7 @@ impl Iterator for StreamIterator {
if let Err(e) = directory.validate() {
return Some(Err(crate::Error::StorageError(format!(
"directory {} failed validation: {}",
BASE64.encode(&directory.digest()),
directory.digest(),
e,
))));
}
@ -204,16 +204,19 @@ impl Iterator for StreamIterator {
// means it once was in expected_directory_digests)
return Some(Err(crate::Error::StorageError(format!(
"received unexpected directory {}",
BASE64.encode(&directory_digest)
directory_digest
))));
}
self.received_directory_digests.insert(directory_digest);
// register all children in expected_directory_digests.
// We ran validate() above, so we know these digests must be correct.
for child_directory in &directory.directories {
// We ran validate() above, so we know these digests must be correct.
let child_directory_digest =
B3Digest::from_vec(child_directory.digest.clone()).unwrap();
self.expected_directory_digests
.insert(child_directory.digest.clone().try_into().unwrap());
.insert(child_directory_digest);
}
Some(Ok(directory))
@ -294,7 +297,7 @@ impl DirectoryPutter for GRPCPutter {
}
/// Closes the stream for sending, and returns the value
fn close(&mut self) -> Result<[u8; 32], crate::Error> {
fn close(&mut self) -> Result<B3Digest, crate::Error> {
// get self.rq, and replace it with None.
// This ensures we can only close it once.
match std::mem::take(&mut self.rq) {
@ -303,15 +306,15 @@ impl DirectoryPutter for GRPCPutter {
// close directory_sender, so blocking on task will finish.
drop(directory_sender);
Ok(self
let root_digest = self
.tokio_handle
.block_on(task)?
.map_err(|e| Error::StorageError(e.to_string()))?
.root_digest
.try_into()
.map_err(|_| {
Error::StorageError("invalid root digest length in response".to_string())
})?)
.root_digest;
B3Digest::from_vec(root_digest).map_err(|_| {
Error::StorageError("invalid root digest length in response".to_string())
})
}
}
}