feat(tvix/store/directorysvc): add from_addr

Add --directory-service-addr arg to tvix-store CLI.

Change-Id: Iea1e6f08f27f7157b21ccf397297c68358bd78a0
Reviewed-on: https://cl.tvl.fyi/c/depot/+/8743
Tested-by: BuildkiteCI
Autosubmit: flokli <flokli@flokli.de>
Reviewed-by: tazjin <tazjin@tvl.su>
This commit is contained in:
Florian Klink 2023-06-09 23:35:46 +03:00 committed by clbot
parent a1acb5bcb3
commit bb7c76739a
6 changed files with 287 additions and 21 deletions

View file

@ -3,6 +3,7 @@ use std::collections::HashSet;
use super::{DirectoryPutter, DirectoryService};
use crate::proto::{self, get_directory_request::ByWhat};
use crate::{B3Digest, Error};
use tokio::net::UnixStream;
use tokio::sync::mpsc::UnboundedSender;
use tokio_stream::wrappers::UnboundedReceiverStream;
use tonic::{transport::Channel, Status};
@ -21,7 +22,7 @@ pub struct GRPCDirectoryService {
}
impl GRPCDirectoryService {
/// construct a [GRPCDirectoryService] from a [proto::blob_service_client::BlobServiceClient<Channel>].
/// construct a [GRPCDirectoryService] from a [proto::directory_service_client::DirectoryServiceClient<Channel>].
/// panics if called outside the context of a tokio runtime.
pub fn from_client(
grpc_client: proto::directory_service_client::DirectoryServiceClient<Channel>,
@ -34,6 +35,59 @@ impl GRPCDirectoryService {
}
impl DirectoryService for GRPCDirectoryService {
/// Constructs a [GRPCDirectoryService] from the passed [url::Url]:
/// - scheme has to match `grpc+*://`.
/// That's normally grpc+unix for unix sockets, and grpc+http(s) for the HTTP counterparts.
/// - In the case of unix sockets, there must be a path, but may not be a host.
/// - In the case of non-unix sockets, there must be a host, but no path.
fn from_url(url: &url::Url) -> Result<Self, crate::Error> {
// Start checking for the scheme to start with grpc+.
match url.scheme().strip_prefix("grpc+") {
None => Err(crate::Error::StorageError("invalid scheme".to_string())),
Some(rest) => {
if rest == "unix" {
if url.host_str().is_some() {
return Err(crate::Error::StorageError(
"host may not be set".to_string(),
));
}
let path = url.path().to_string();
let channel = tonic::transport::Endpoint::try_from("http://[::]:50051") // doesn't matter
.unwrap()
.connect_with_connector_lazy(tower::service_fn(
move |_: tonic::transport::Uri| UnixStream::connect(path.clone()),
));
let grpc_client =
proto::directory_service_client::DirectoryServiceClient::new(channel);
Ok(Self::from_client(grpc_client))
} else {
// ensure path is empty, not supported with gRPC.
if !url.path().is_empty() {
return Err(crate::Error::StorageError(
"path may not be set".to_string(),
));
}
// clone the uri, and drop the grpc+ from the scheme.
// Recreate a new uri with the `grpc+` prefix dropped from the scheme.
// We can't use `url.set_scheme(rest)`, as it disallows
// setting something http(s) that previously wasn't.
let url = {
let url_str = url.to_string();
let s_stripped = url_str.strip_prefix("grpc+").unwrap();
url::Url::parse(s_stripped).unwrap()
};
let channel = tonic::transport::Endpoint::try_from(url.to_string())
.unwrap()
.connect_lazy();
let grpc_client =
proto::directory_service_client::DirectoryServiceClient::new(channel);
Ok(Self::from_client(grpc_client))
}
}
}
}
fn get(&self, digest: &B3Digest) -> Result<Option<crate::proto::Directory>, crate::Error> {
// Get a new handle to the gRPC client, and copy the digest.
let mut grpc_client = self.grpc_client.clone();