refactor(tvix/store): Asyncify PathInfoService and DirectoryService

We've decided to asyncify all of the services to reduce some of the
pains going back and for between sync<->async. The end goal will be for
all the tvix-store internals to be async and then expose a sync
interface for things like tvix eval io.

Change-Id: I97c71f8db1d05a38bd8f625df5087d565705d52d
Reviewed-on: https://cl.tvl.fyi/c/depot/+/9369
Autosubmit: Connor Brewster <cbrewster@hey.com>
Tested-by: BuildkiteCI
Reviewed-by: flokli <flokli@flokli.de>
This commit is contained in:
Connor Brewster 2023-09-19 11:46:41 -05:00 committed by clbot
parent 7e737fde34
commit 37a348b4fa
24 changed files with 764 additions and 715 deletions

View file

@ -1,22 +1,24 @@
use std::collections::HashSet;
use std::pin::Pin;
use super::{DirectoryPutter, DirectoryService};
use crate::proto::{self, get_directory_request::ByWhat};
use crate::{B3Digest, Error};
use async_stream::try_stream;
use futures::Stream;
use tokio::net::UnixStream;
use tokio::spawn;
use tokio::sync::mpsc::UnboundedSender;
use tokio::task::JoinHandle;
use tokio_stream::wrappers::UnboundedReceiverStream;
use tonic::async_trait;
use tonic::Code;
use tonic::{transport::Channel, Status};
use tonic::{Code, Streaming};
use tracing::{instrument, warn};
/// Connects to a (remote) tvix-store DirectoryService over gRPC.
#[derive(Clone)]
pub struct GRPCDirectoryService {
/// A handle into the active tokio runtime. Necessary to spawn tasks.
tokio_handle: tokio::runtime::Handle,
/// The internal reference to a gRPC client.
/// Cloning it is cheap, and it internally handles concurrent requests.
grpc_client: proto::directory_service_client::DirectoryServiceClient<Channel>,
@ -28,13 +30,11 @@ impl GRPCDirectoryService {
pub fn from_client(
grpc_client: proto::directory_service_client::DirectoryServiceClient<Channel>,
) -> Self {
Self {
tokio_handle: tokio::runtime::Handle::current(),
grpc_client,
}
Self { grpc_client }
}
}
#[async_trait]
impl DirectoryService for GRPCDirectoryService {
/// Constructs a [GRPCDirectoryService] from the passed [url::Url]:
/// - scheme has to match `grpc+*://`.
@ -89,11 +89,15 @@ impl DirectoryService for GRPCDirectoryService {
}
}
}
fn get(&self, digest: &B3Digest) -> Result<Option<crate::proto::Directory>, crate::Error> {
async fn get(
&self,
digest: &B3Digest,
) -> Result<Option<crate::proto::Directory>, crate::Error> {
// Get a new handle to the gRPC client, and copy the digest.
let mut grpc_client = self.grpc_client.clone();
let digest_cpy = digest.clone();
let task = self.tokio_handle.spawn(async move {
let message = async move {
let mut s = grpc_client
.get(proto::GetDirectoryRequest {
recursive: false,
@ -104,10 +108,10 @@ impl DirectoryService for GRPCDirectoryService {
// Retrieve the first message only, then close the stream (we set recursive to false)
s.message().await
});
};
let digest = digest.clone();
match self.tokio_handle.block_on(task)? {
match message.await {
Ok(Some(directory)) => {
// Validate the retrieved Directory indeed has the
// digest we expect it to have, to detect corruptions.
@ -134,14 +138,12 @@ impl DirectoryService for GRPCDirectoryService {
}
}
fn put(&self, directory: crate::proto::Directory) -> Result<B3Digest, crate::Error> {
async fn put(&self, directory: crate::proto::Directory) -> Result<B3Digest, crate::Error> {
let mut grpc_client = self.grpc_client.clone();
let task = self
.tokio_handle
.spawn(async move { grpc_client.put(tokio_stream::iter(vec![directory])).await });
let resp = grpc_client.put(tokio_stream::iter(vec![directory])).await;
match self.tokio_handle.block_on(task)? {
match resp {
Ok(put_directory_resp) => Ok(put_directory_resp
.into_inner()
.root_digest
@ -157,32 +159,82 @@ impl DirectoryService for GRPCDirectoryService {
fn get_recursive(
&self,
root_directory_digest: &B3Digest,
) -> Box<dyn Iterator<Item = Result<proto::Directory, Error>> + Send> {
) -> Pin<Box<dyn Stream<Item = Result<proto::Directory, Error>> + Send>> {
let mut grpc_client = self.grpc_client.clone();
let root_directory_digest = root_directory_digest.clone();
// clone so we can move it
let root_directory_digest_cpy = root_directory_digest.clone();
let stream = try_stream! {
let mut stream = grpc_client
.get(proto::GetDirectoryRequest {
recursive: true,
by_what: Some(ByWhat::Digest(root_directory_digest.clone().into())),
})
.await
.map_err(|e| crate::Error::StorageError(e.to_string()))?
.into_inner();
let task: JoinHandle<Result<Streaming<proto::Directory>, Status>> =
self.tokio_handle.spawn(async move {
let s = grpc_client
.get(proto::GetDirectoryRequest {
recursive: true,
by_what: Some(ByWhat::Digest(root_directory_digest_cpy.into())),
})
.await?
.into_inner();
// The Directory digests we received so far
let mut received_directory_digests: HashSet<B3Digest> = HashSet::new();
// The Directory digests we're still expecting to get sent.
let mut expected_directory_digests: HashSet<B3Digest> = HashSet::from([root_directory_digest]);
Ok(s)
});
loop {
match stream.message().await {
Ok(Some(directory)) => {
// validate the directory itself.
if let Err(e) = directory.validate() {
Err(crate::Error::StorageError(format!(
"directory {} failed validation: {}",
directory.digest(),
e,
)))?;
}
// validate we actually expected that directory, and move it from expected to received.
let directory_digest = directory.digest();
let was_expected = expected_directory_digests.remove(&directory_digest);
if !was_expected {
// FUTUREWORK: dumb clients might send the same stuff twice.
// as a fallback, we might want to tolerate receiving
// it if it's in received_directory_digests (as that
// means it once was in expected_directory_digests)
Err(crate::Error::StorageError(format!(
"received unexpected directory {}",
directory_digest
)))?;
}
received_directory_digests.insert(directory_digest);
let stream = self.tokio_handle.block_on(task).unwrap().unwrap();
// register all children in expected_directory_digests.
for child_directory in &directory.directories {
// We ran validate() above, so we know these digests must be correct.
let child_directory_digest =
child_directory.digest.clone().try_into().unwrap();
Box::new(StreamIterator::new(
self.tokio_handle.clone(),
root_directory_digest.clone(),
stream,
))
expected_directory_digests
.insert(child_directory_digest);
}
yield directory;
},
Ok(None) => {
// If we were still expecting something, that's an error.
if !expected_directory_digests.is_empty() {
Err(crate::Error::StorageError(format!(
"still expected {} directories, but got premature end of stream",
expected_directory_digests.len(),
)))?
} else {
return
}
},
Err(e) => {
Err(crate::Error::StorageError(e.to_string()))?;
},
}
}
};
Box::pin(stream)
}
#[instrument(skip_all)]
@ -194,110 +246,21 @@ impl DirectoryService for GRPCDirectoryService {
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
let task: JoinHandle<Result<proto::PutDirectoryResponse, Status>> =
self.tokio_handle.spawn(async move {
let s = grpc_client
.put(UnboundedReceiverStream::new(rx))
.await?
.into_inner();
let task: JoinHandle<Result<proto::PutDirectoryResponse, Status>> = spawn(async move {
let s = grpc_client
.put(UnboundedReceiverStream::new(rx))
.await?
.into_inner();
Ok(s)
});
Ok(s)
});
Box::new(GRPCPutter::new(self.tokio_handle.clone(), tx, task))
}
}
pub struct StreamIterator {
/// A handle into the active tokio runtime. Necessary to run futures to completion.
tokio_handle: tokio::runtime::Handle,
// A stream of [proto::Directory]
stream: Streaming<proto::Directory>,
// The Directory digests we received so far
received_directory_digests: HashSet<B3Digest>,
// The Directory digests we're still expecting to get sent.
expected_directory_digests: HashSet<B3Digest>,
}
impl StreamIterator {
pub fn new(
tokio_handle: tokio::runtime::Handle,
root_digest: B3Digest,
stream: Streaming<proto::Directory>,
) -> Self {
Self {
tokio_handle,
stream,
received_directory_digests: HashSet::new(),
expected_directory_digests: HashSet::from([root_digest]),
}
}
}
impl Iterator for StreamIterator {
type Item = Result<proto::Directory, crate::Error>;
fn next(&mut self) -> Option<Self::Item> {
match self.tokio_handle.block_on(self.stream.message()) {
Ok(ok) => match ok {
Some(directory) => {
// validate the directory itself.
if let Err(e) = directory.validate() {
return Some(Err(crate::Error::StorageError(format!(
"directory {} failed validation: {}",
directory.digest(),
e,
))));
}
// validate we actually expected that directory, and move it from expected to received.
let directory_digest = directory.digest();
let was_expected = self.expected_directory_digests.remove(&directory_digest);
if !was_expected {
// FUTUREWORK: dumb clients might send the same stuff twice.
// as a fallback, we might want to tolerate receiving
// it if it's in received_directory_digests (as that
// means it once was in expected_directory_digests)
return Some(Err(crate::Error::StorageError(format!(
"received unexpected directory {}",
directory_digest
))));
}
self.received_directory_digests.insert(directory_digest);
// register all children in expected_directory_digests.
for child_directory in &directory.directories {
// We ran validate() above, so we know these digests must be correct.
let child_directory_digest =
child_directory.digest.clone().try_into().unwrap();
self.expected_directory_digests
.insert(child_directory_digest);
}
Some(Ok(directory))
}
None => {
// If we were still expecting something, that's an error.
if !self.expected_directory_digests.is_empty() {
Some(Err(crate::Error::StorageError(format!(
"still expected {} directories, but got premature end of stream",
self.expected_directory_digests.len(),
))))
} else {
None
}
}
},
Err(e) => Some(Err(crate::Error::StorageError(e.to_string()))),
}
Box::new(GRPCPutter::new(tx, task))
}
}
/// Allows uploading multiple Directory messages in the same gRPC stream.
pub struct GRPCPutter {
/// A handle into the active tokio runtime. Necessary to spawn tasks.
tokio_handle: tokio::runtime::Handle,
/// Data about the current request - a handle to the task, and the tx part
/// of the channel.
/// The tx part of the pipe is used to send [proto::Directory] to the ongoing request.
@ -311,19 +274,18 @@ pub struct GRPCPutter {
impl GRPCPutter {
pub fn new(
tokio_handle: tokio::runtime::Handle,
directory_sender: UnboundedSender<proto::Directory>,
task: JoinHandle<Result<proto::PutDirectoryResponse, Status>>,
) -> Self {
Self {
tokio_handle,
rq: Some((task, directory_sender)),
}
}
}
#[async_trait]
impl DirectoryPutter for GRPCPutter {
fn put(&mut self, directory: proto::Directory) -> Result<(), crate::Error> {
async fn put(&mut self, directory: proto::Directory) -> Result<(), crate::Error> {
match self.rq {
// If we're not already closed, send the directory to directory_sender.
Some((_, ref directory_sender)) => {
@ -331,7 +293,7 @@ impl DirectoryPutter for GRPCPutter {
// If the channel has been prematurely closed, invoke close (so we can peek at the error code)
// That error code is much more helpful, because it
// contains the error message from the server.
self.close()?;
self.close().await?;
}
Ok(())
}
@ -343,7 +305,7 @@ impl DirectoryPutter for GRPCPutter {
}
/// Closes the stream for sending, and returns the value
fn close(&mut self) -> Result<B3Digest, crate::Error> {
async fn close(&mut self) -> Result<B3Digest, crate::Error> {
// get self.rq, and replace it with None.
// This ensures we can only close it once.
match std::mem::take(&mut self.rq) {
@ -352,9 +314,8 @@ impl DirectoryPutter for GRPCPutter {
// close directory_sender, so blocking on task will finish.
drop(directory_sender);
let root_digest = self
.tokio_handle
.block_on(task)?
let root_digest = task
.await?
.map_err(|e| Error::StorageError(e.to_string()))?
.root_digest;
@ -379,6 +340,7 @@ mod tests {
use core::time;
use std::thread;
use futures::StreamExt;
use tempfile::TempDir;
use tokio::net::{UnixListener, UnixStream};
use tokio_stream::wrappers::UnixListenerStream;
@ -446,7 +408,7 @@ mod tests {
);
}
let task = tester_runtime.spawn_blocking(move || {
tester_runtime.block_on(async move {
// Create a channel, connecting to the uds at socket_path.
// The URI is unused.
let channel = Endpoint::try_from("http://[::]:50051")
@ -465,6 +427,7 @@ mod tests {
None,
directory_service
.get(&DIRECTORY_A.digest())
.await
.expect("must not fail")
);
@ -473,6 +436,7 @@ mod tests {
DIRECTORY_A.digest(),
directory_service
.put(DIRECTORY_A.clone())
.await
.expect("must succeed")
);
@ -481,6 +445,7 @@ mod tests {
DIRECTORY_A.clone(),
directory_service
.get(&DIRECTORY_A.digest())
.await
.expect("must succeed")
.expect("must be some")
);
@ -488,21 +453,22 @@ mod tests {
// Putting DIRECTORY_B alone should fail, because it refers to DIRECTORY_A.
directory_service
.put(DIRECTORY_B.clone())
.await
.expect_err("must fail");
// Putting DIRECTORY_B in a put_multiple will succeed, but the close
// will always fail.
{
let mut handle = directory_service.put_multiple_start();
handle.put(DIRECTORY_B.clone()).expect("must succeed");
handle.close().expect_err("must fail");
handle.put(DIRECTORY_B.clone()).await.expect("must succeed");
handle.close().await.expect_err("must fail");
}
// Uploading A and then B should succeed, and closing should return the digest of B.
let mut handle = directory_service.put_multiple_start();
handle.put(DIRECTORY_A.clone()).expect("must succeed");
handle.put(DIRECTORY_B.clone()).expect("must succeed");
let digest = handle.close().expect("must succeed");
handle.put(DIRECTORY_A.clone()).await.expect("must succeed");
handle.put(DIRECTORY_B.clone()).await.expect("must succeed");
let digest = handle.close().await.expect("must succeed");
assert_eq!(DIRECTORY_B.digest(), digest);
// Now try to retrieve the closure of DIRECTORY_B, which should return B and then A.
@ -511,6 +477,7 @@ mod tests {
DIRECTORY_B.clone(),
directories_it
.next()
.await
.expect("must be some")
.expect("must succeed")
);
@ -518,6 +485,7 @@ mod tests {
DIRECTORY_A.clone(),
directories_it
.next()
.await
.expect("must be some")
.expect("must succeed")
);
@ -529,15 +497,15 @@ mod tests {
{
let mut handle = directory_service.put_multiple_start();
// sending out B will always be fine
handle.put(DIRECTORY_B.clone()).expect("must succeed");
handle.put(DIRECTORY_B.clone()).await.expect("must succeed");
// whether we will be able to put A as well depends on whether we
// already received the error about B.
if handle.put(DIRECTORY_A.clone()).is_ok() {
if handle.put(DIRECTORY_A.clone()).await.is_ok() {
// If we didn't, and this was Ok(_), …
// a subsequent close MUST fail (because it waits for the
// server)
handle.close().expect_err("must fail");
handle.close().await.expect_err("must fail");
}
}
@ -547,7 +515,7 @@ mod tests {
// and then assert that uploading anything else via the handle will fail.
{
let mut handle = directory_service.put_multiple_start();
handle.put(DIRECTORY_B.clone()).expect("must succeed");
handle.put(DIRECTORY_B.clone()).await.expect("must succeed");
let mut is_closed = false;
for _try in 1..1000 {
@ -555,7 +523,7 @@ mod tests {
is_closed = true;
break;
}
std::thread::sleep(time::Duration::from_millis(10))
tokio::time::sleep(time::Duration::from_millis(10)).await;
}
assert!(
@ -563,12 +531,13 @@ mod tests {
"expected channel to eventually close, but never happened"
);
handle.put(DIRECTORY_A.clone()).expect_err("must fail");
handle
.put(DIRECTORY_A.clone())
.await
.expect_err("must fail");
}
});
tester_runtime.block_on(task)?;
Ok(())
}
}