This makes the inside code a bit less verbose. I wasn't able to describe the type of the async move closure itself, which would allow us to remove the JoinHandle<_> type annotation entirely. Change-Id: I06193982a0c7010bd72d3ffa4f760bea1b097632 Reviewed-on: https://cl.tvl.fyi/c/depot/+/9268 Autosubmit: flokli <flokli@flokli.de> Reviewed-by: tazjin <tazjin@tvl.su> Tested-by: BuildkiteCI
390 lines
14 KiB
Rust
390 lines
14 KiB
Rust
use super::PathInfoService;
|
|
use crate::{
|
|
blobservice::BlobService,
|
|
directoryservice::DirectoryService,
|
|
proto::{self, ListPathInfoRequest},
|
|
};
|
|
use std::sync::Arc;
|
|
use tokio::{net::UnixStream, task::JoinHandle};
|
|
use tonic::{transport::Channel, Code, Status, Streaming};
|
|
|
|
/// Connects to a (remote) tvix-store PathInfoService over gRPC.
|
|
#[derive(Clone)]
|
|
pub struct GRPCPathInfoService {
|
|
/// A handle into the active tokio runtime. Necessary to spawn tasks.
|
|
tokio_handle: tokio::runtime::Handle,
|
|
|
|
/// The internal reference to a gRPC client.
|
|
/// Cloning it is cheap, and it internally handles concurrent requests.
|
|
grpc_client: proto::path_info_service_client::PathInfoServiceClient<Channel>,
|
|
}
|
|
|
|
impl GRPCPathInfoService {
|
|
/// construct a [GRPCPathInfoService] from a [proto::path_info_service_client::PathInfoServiceClient].
|
|
/// panics if called outside the context of a tokio runtime.
|
|
pub fn from_client(
|
|
grpc_client: proto::path_info_service_client::PathInfoServiceClient<Channel>,
|
|
) -> Self {
|
|
Self {
|
|
tokio_handle: tokio::runtime::Handle::current(),
|
|
grpc_client,
|
|
}
|
|
}
|
|
}
|
|
|
|
impl PathInfoService for GRPCPathInfoService {
|
|
/// Constructs a [GRPCPathInfoService] from the passed [url::Url]:
|
|
/// - scheme has to match `grpc+*://`.
|
|
/// That's normally grpc+unix for unix sockets, and grpc+http(s) for the HTTP counterparts.
|
|
/// - In the case of unix sockets, there must be a path, but may not be a host.
|
|
/// - In the case of non-unix sockets, there must be a host, but no path.
|
|
/// The blob_service and directory_service arguments are ignored, because the gRPC service already provides answers to these questions.
|
|
fn from_url(
|
|
url: &url::Url,
|
|
_blob_service: Arc<dyn BlobService>,
|
|
_directory_service: Arc<dyn DirectoryService>,
|
|
) -> Result<Self, crate::Error> {
|
|
// Start checking for the scheme to start with grpc+.
|
|
match url.scheme().strip_prefix("grpc+") {
|
|
None => Err(crate::Error::StorageError("invalid scheme".to_string())),
|
|
Some(rest) => {
|
|
if rest == "unix" {
|
|
if url.host_str().is_some() {
|
|
return Err(crate::Error::StorageError(
|
|
"host may not be set".to_string(),
|
|
));
|
|
}
|
|
let path = url.path().to_string();
|
|
let channel = tonic::transport::Endpoint::try_from("http://[::]:50051") // doesn't matter
|
|
.unwrap()
|
|
.connect_with_connector_lazy(tower::service_fn(
|
|
move |_: tonic::transport::Uri| UnixStream::connect(path.clone()),
|
|
));
|
|
let grpc_client =
|
|
proto::path_info_service_client::PathInfoServiceClient::new(channel);
|
|
Ok(Self::from_client(grpc_client))
|
|
} else {
|
|
// ensure path is empty, not supported with gRPC.
|
|
if !url.path().is_empty() {
|
|
return Err(crate::Error::StorageError(
|
|
"path may not be set".to_string(),
|
|
));
|
|
}
|
|
|
|
// clone the uri, and drop the grpc+ from the scheme.
|
|
// Recreate a new uri with the `grpc+` prefix dropped from the scheme.
|
|
// We can't use `url.set_scheme(rest)`, as it disallows
|
|
// setting something http(s) that previously wasn't.
|
|
let url = {
|
|
let url_str = url.to_string();
|
|
let s_stripped = url_str.strip_prefix("grpc+").unwrap();
|
|
url::Url::parse(s_stripped).unwrap()
|
|
};
|
|
let channel = tonic::transport::Endpoint::try_from(url.to_string())
|
|
.unwrap()
|
|
.connect_lazy();
|
|
|
|
let grpc_client =
|
|
proto::path_info_service_client::PathInfoServiceClient::new(channel);
|
|
Ok(Self::from_client(grpc_client))
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
fn get(&self, digest: [u8; 20]) -> Result<Option<proto::PathInfo>, crate::Error> {
|
|
// Get a new handle to the gRPC client.
|
|
let mut grpc_client = self.grpc_client.clone();
|
|
|
|
let task: JoinHandle<Result<proto::PathInfo, Status>> =
|
|
self.tokio_handle.spawn(async move {
|
|
let path_info = grpc_client
|
|
.get(proto::GetPathInfoRequest {
|
|
by_what: Some(proto::get_path_info_request::ByWhat::ByOutputHash(
|
|
digest.to_vec().into(),
|
|
)),
|
|
})
|
|
.await?
|
|
.into_inner();
|
|
|
|
Ok(path_info)
|
|
});
|
|
|
|
match self.tokio_handle.block_on(task)? {
|
|
Ok(path_info) => Ok(Some(path_info)),
|
|
Err(e) if e.code() == Code::NotFound => Ok(None),
|
|
Err(e) => Err(crate::Error::StorageError(e.to_string())),
|
|
}
|
|
}
|
|
|
|
fn put(&self, path_info: proto::PathInfo) -> Result<proto::PathInfo, crate::Error> {
|
|
// Get a new handle to the gRPC client.
|
|
let mut grpc_client = self.grpc_client.clone();
|
|
|
|
let task: JoinHandle<Result<proto::PathInfo, Status>> =
|
|
self.tokio_handle.spawn(async move {
|
|
let path_info = grpc_client.put(path_info).await?.into_inner();
|
|
Ok(path_info)
|
|
});
|
|
|
|
self.tokio_handle
|
|
.block_on(task)?
|
|
.map_err(|e| crate::Error::StorageError(e.to_string()))
|
|
}
|
|
|
|
fn calculate_nar(
|
|
&self,
|
|
root_node: &proto::node::Node,
|
|
) -> Result<(u64, [u8; 32]), crate::Error> {
|
|
// Get a new handle to the gRPC client.
|
|
let mut grpc_client = self.grpc_client.clone();
|
|
let root_node = root_node.clone();
|
|
|
|
let task: JoinHandle<Result<_, Status>> = self.tokio_handle.spawn(async move {
|
|
let path_info = grpc_client
|
|
.calculate_nar(proto::Node {
|
|
node: Some(root_node),
|
|
})
|
|
.await?
|
|
.into_inner();
|
|
Ok(path_info)
|
|
});
|
|
|
|
let resp = self
|
|
.tokio_handle
|
|
.block_on(task)?
|
|
.map_err(|e| crate::Error::StorageError(e.to_string()))?;
|
|
|
|
let nar_sha256: [u8; 32] = resp
|
|
.nar_sha256
|
|
.to_vec()
|
|
.try_into()
|
|
.map_err(|_e| crate::Error::StorageError("invalid digest length".to_string()))?;
|
|
|
|
Ok((resp.nar_size, nar_sha256))
|
|
}
|
|
|
|
fn list(&self) -> Box<dyn Iterator<Item = Result<proto::PathInfo, crate::Error>> + Send> {
|
|
// Get a new handle to the gRPC client.
|
|
let mut grpc_client = self.grpc_client.clone();
|
|
|
|
let task: JoinHandle<Result<_, Status>> = self.tokio_handle.spawn(async move {
|
|
let s = grpc_client
|
|
.list(ListPathInfoRequest::default())
|
|
.await?
|
|
.into_inner();
|
|
|
|
Ok(s)
|
|
});
|
|
|
|
let stream = self.tokio_handle.block_on(task).unwrap().unwrap();
|
|
|
|
Box::new(StreamIterator::new(self.tokio_handle.clone(), stream))
|
|
}
|
|
}
|
|
|
|
pub struct StreamIterator {
|
|
tokio_handle: tokio::runtime::Handle,
|
|
stream: Streaming<proto::PathInfo>,
|
|
}
|
|
|
|
impl StreamIterator {
|
|
pub fn new(tokio_handle: tokio::runtime::Handle, stream: Streaming<proto::PathInfo>) -> Self {
|
|
Self {
|
|
tokio_handle,
|
|
stream,
|
|
}
|
|
}
|
|
}
|
|
|
|
impl Iterator for StreamIterator {
|
|
type Item = Result<proto::PathInfo, crate::Error>;
|
|
|
|
fn next(&mut self) -> Option<Self::Item> {
|
|
match self.tokio_handle.block_on(self.stream.message()) {
|
|
Ok(o) => match o {
|
|
Some(pathinfo) => {
|
|
// validate the pathinfo
|
|
if let Err(e) = pathinfo.validate() {
|
|
return Some(Err(crate::Error::StorageError(format!(
|
|
"pathinfo {:?} failed validation: {}",
|
|
pathinfo, e
|
|
))));
|
|
}
|
|
Some(Ok(pathinfo))
|
|
}
|
|
None => None,
|
|
},
|
|
Err(e) => Some(Err(crate::Error::StorageError(e.to_string()))),
|
|
}
|
|
}
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use std::sync::Arc;
|
|
use std::thread;
|
|
|
|
use tempfile::TempDir;
|
|
use tokio::net::UnixListener;
|
|
use tokio::task;
|
|
use tokio::time;
|
|
use tokio_stream::wrappers::UnixListenerStream;
|
|
|
|
use crate::pathinfoservice::MemoryPathInfoService;
|
|
use crate::proto::GRPCPathInfoServiceWrapper;
|
|
use crate::tests::fixtures;
|
|
use crate::tests::utils::gen_blob_service;
|
|
use crate::tests::utils::gen_directory_service;
|
|
|
|
use super::GRPCPathInfoService;
|
|
use super::PathInfoService;
|
|
|
|
/// This uses the wrong scheme
|
|
#[test]
|
|
fn test_invalid_scheme() {
|
|
let url = url::Url::parse("http://foo.example/test").expect("must parse");
|
|
|
|
assert!(
|
|
GRPCPathInfoService::from_url(&url, gen_blob_service(), gen_directory_service())
|
|
.is_err()
|
|
);
|
|
}
|
|
|
|
/// This uses the correct scheme for a unix socket.
|
|
/// The fact that /path/to/somewhere doesn't exist yet is no problem, because we connect lazily.
|
|
#[tokio::test]
|
|
async fn test_valid_unix_path() {
|
|
let url = url::Url::parse("grpc+unix:///path/to/somewhere").expect("must parse");
|
|
|
|
assert!(
|
|
GRPCPathInfoService::from_url(&url, gen_blob_service(), gen_directory_service())
|
|
.is_ok()
|
|
);
|
|
}
|
|
|
|
/// This uses the correct scheme for a unix socket,
|
|
/// but sets a host, which is unsupported.
|
|
#[tokio::test]
|
|
async fn test_invalid_unix_path_with_domain() {
|
|
let url =
|
|
url::Url::parse("grpc+unix://host.example/path/to/somewhere").expect("must parse");
|
|
|
|
assert!(
|
|
GRPCPathInfoService::from_url(&url, gen_blob_service(), gen_directory_service())
|
|
.is_err()
|
|
);
|
|
}
|
|
|
|
/// This uses the correct scheme for a HTTP server.
|
|
/// The fact that nothing is listening there is no problem, because we connect lazily.
|
|
#[tokio::test]
|
|
async fn test_valid_http() {
|
|
let url = url::Url::parse("grpc+http://localhost").expect("must parse");
|
|
|
|
assert!(
|
|
GRPCPathInfoService::from_url(&url, gen_blob_service(), gen_directory_service())
|
|
.is_ok()
|
|
);
|
|
}
|
|
|
|
/// This uses the correct scheme for a HTTPS server.
|
|
/// The fact that nothing is listening there is no problem, because we connect lazily.
|
|
#[tokio::test]
|
|
async fn test_valid_https() {
|
|
let url = url::Url::parse("grpc+https://localhost").expect("must parse");
|
|
|
|
assert!(
|
|
GRPCPathInfoService::from_url(&url, gen_blob_service(), gen_directory_service())
|
|
.is_ok()
|
|
);
|
|
}
|
|
|
|
/// This uses the correct scheme, but also specifies
|
|
/// an additional path, which is not supported for gRPC.
|
|
/// The fact that nothing is listening there is no problem, because we connect lazily.
|
|
#[tokio::test]
|
|
async fn test_invalid_http_with_path() {
|
|
let url = url::Url::parse("grpc+https://localhost/some-path").expect("must parse");
|
|
|
|
assert!(
|
|
GRPCPathInfoService::from_url(&url, gen_blob_service(), gen_directory_service())
|
|
.is_err()
|
|
);
|
|
}
|
|
|
|
/// This uses the correct scheme for a unix socket, and provides a server on the other side.
|
|
#[tokio::test]
|
|
async fn test_valid_unix_path_ping_pong() {
|
|
let tmpdir = TempDir::new().unwrap();
|
|
let path = tmpdir.path().join("daemon");
|
|
|
|
// let mut join_set = JoinSet::new();
|
|
|
|
// prepare a client
|
|
let client = {
|
|
let mut url = url::Url::parse("grpc+unix:///path/to/somewhere").expect("must parse");
|
|
url.set_path(path.to_str().unwrap());
|
|
GRPCPathInfoService::from_url(&url, gen_blob_service(), gen_directory_service())
|
|
.expect("must succeed")
|
|
};
|
|
|
|
let path_copy = path.clone();
|
|
|
|
// Spin up a server, in a thread far away, which spawns its own tokio runtime,
|
|
// and blocks on the task.
|
|
thread::spawn(move || {
|
|
// Create the runtime
|
|
let rt = tokio::runtime::Runtime::new().unwrap();
|
|
// Get a handle from this runtime
|
|
let handle = rt.handle();
|
|
|
|
let task = handle.spawn(async {
|
|
let uds = UnixListener::bind(path_copy).unwrap();
|
|
let uds_stream = UnixListenerStream::new(uds);
|
|
|
|
// spin up a new server
|
|
let mut server = tonic::transport::Server::builder();
|
|
let router = server.add_service(
|
|
crate::proto::path_info_service_server::PathInfoServiceServer::new(
|
|
GRPCPathInfoServiceWrapper::from(Arc::new(MemoryPathInfoService::new(
|
|
gen_blob_service(),
|
|
gen_directory_service(),
|
|
))
|
|
as Arc<dyn PathInfoService>),
|
|
),
|
|
);
|
|
router.serve_with_incoming(uds_stream).await
|
|
});
|
|
|
|
handle.block_on(task)
|
|
});
|
|
|
|
// wait for the socket to be created
|
|
{
|
|
let mut socket_created = false;
|
|
for _try in 1..20 {
|
|
if path.exists() {
|
|
socket_created = true;
|
|
break;
|
|
}
|
|
tokio::time::sleep(time::Duration::from_millis(20)).await;
|
|
}
|
|
|
|
assert!(
|
|
socket_created,
|
|
"expected socket path to eventually get created, but never happened"
|
|
);
|
|
}
|
|
|
|
let pi = task::spawn_blocking(move || {
|
|
client
|
|
.get(fixtures::DUMMY_OUTPUT_HASH.to_vec().try_into().unwrap())
|
|
.expect("must not be error")
|
|
})
|
|
.await
|
|
.expect("must not be err");
|
|
|
|
assert!(pi.is_none());
|
|
}
|
|
}
|