This shouldn't be part of the PathInfoService trait. Pretty much none of the PathInfoServices do implement it, and requiring them to implement it means they also cannot make use of this calculation already being done by other PathInfoServices. Move it out into its own NarCalculationService trait, defined somewhere at tvix_store::nar, and have everyone who wants to trigger nar calculation use nar_calculation_service directly, which now is an additional field in TvixStoreIO for example. It being moved outside the PathInfoService trait doesn't prohibit specific implementations to implement it (like the GRPC client for the `PathInfoService` does. This is currently wired together in a bit of a hacky fashion - as of now, everything uses the naive implementation that traverses blob and directoryservice, rather than composing it properly. I want to leave that up to a later CL, dealing with other parts of store composition too. Change-Id: I18d07ea4301d4a07651b8218bc5fe95e4e307208 Reviewed-on: https://cl.tvl.fyi/c/depot/+/11619 Reviewed-by: Connor Brewster <cbrewster@hey.com> Autosubmit: flokli <flokli@flokli.de> Tested-by: BuildkiteCI
124 lines
4.5 KiB
Rust
124 lines
4.5 KiB
Rust
use crate::nar::{NarCalculationService, RenderError};
|
|
use crate::pathinfoservice::PathInfoService;
|
|
use crate::proto;
|
|
use futures::{stream::BoxStream, TryStreamExt};
|
|
use std::ops::Deref;
|
|
use tonic::{async_trait, Request, Response, Result, Status};
|
|
use tracing::{instrument, warn};
|
|
use tvix_castore::proto as castorepb;
|
|
|
|
pub struct GRPCPathInfoServiceWrapper<PS, NS> {
|
|
path_info_service: PS,
|
|
// FUTUREWORK: allow exposing without allowing listing
|
|
nar_calculation_service: NS,
|
|
}
|
|
|
|
impl<PS, NS> GRPCPathInfoServiceWrapper<PS, NS> {
|
|
pub fn new(path_info_service: PS, nar_calculation_service: NS) -> Self {
|
|
Self {
|
|
path_info_service,
|
|
nar_calculation_service,
|
|
}
|
|
}
|
|
}
|
|
|
|
#[async_trait]
|
|
impl<PS, NS> proto::path_info_service_server::PathInfoService for GRPCPathInfoServiceWrapper<PS, NS>
|
|
where
|
|
PS: Deref<Target = dyn PathInfoService> + Send + Sync + 'static,
|
|
NS: NarCalculationService + Send + Sync + 'static,
|
|
{
|
|
type ListStream = BoxStream<'static, tonic::Result<proto::PathInfo, Status>>;
|
|
|
|
#[instrument(skip_all)]
|
|
async fn get(
|
|
&self,
|
|
request: Request<proto::GetPathInfoRequest>,
|
|
) -> Result<Response<proto::PathInfo>> {
|
|
match request.into_inner().by_what {
|
|
None => Err(Status::unimplemented("by_what needs to be specified")),
|
|
Some(proto::get_path_info_request::ByWhat::ByOutputHash(output_digest)) => {
|
|
let digest: [u8; 20] = output_digest
|
|
.to_vec()
|
|
.try_into()
|
|
.map_err(|_e| Status::invalid_argument("invalid output digest length"))?;
|
|
match self.path_info_service.get(digest).await {
|
|
Ok(None) => Err(Status::not_found("PathInfo not found")),
|
|
Ok(Some(path_info)) => Ok(Response::new(path_info)),
|
|
Err(e) => {
|
|
warn!(err = %e, "failed to get PathInfo");
|
|
Err(e.into())
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
#[instrument(skip_all)]
|
|
async fn put(&self, request: Request<proto::PathInfo>) -> Result<Response<proto::PathInfo>> {
|
|
let path_info = request.into_inner();
|
|
|
|
// Store the PathInfo in the client. Clients MUST validate the data
|
|
// they receive, so we don't validate additionally here.
|
|
match self.path_info_service.put(path_info).await {
|
|
Ok(path_info_new) => Ok(Response::new(path_info_new)),
|
|
Err(e) => {
|
|
warn!(err = %e, "failed to put PathInfo");
|
|
Err(e.into())
|
|
}
|
|
}
|
|
}
|
|
|
|
#[instrument(skip_all)]
|
|
async fn calculate_nar(
|
|
&self,
|
|
request: Request<castorepb::Node>,
|
|
) -> Result<Response<proto::CalculateNarResponse>> {
|
|
match request.into_inner().node {
|
|
None => Err(Status::invalid_argument("no root node sent")),
|
|
Some(root_node) => {
|
|
if let Err(e) = root_node.validate() {
|
|
warn!(err = %e, "invalid root node");
|
|
Err(Status::invalid_argument("invalid root node"))?
|
|
}
|
|
|
|
match self.nar_calculation_service.calculate_nar(&root_node).await {
|
|
Ok((nar_size, nar_sha256)) => Ok(Response::new(proto::CalculateNarResponse {
|
|
nar_size,
|
|
nar_sha256: nar_sha256.to_vec().into(),
|
|
})),
|
|
Err(e) => {
|
|
warn!(err = %e, "error during NAR calculation");
|
|
Err(e.into())
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
#[instrument(skip_all, err)]
|
|
async fn list(
|
|
&self,
|
|
_request: Request<proto::ListPathInfoRequest>,
|
|
) -> Result<Response<Self::ListStream>, Status> {
|
|
let stream = Box::pin(
|
|
self.path_info_service
|
|
.list()
|
|
.map_err(|e| Status::internal(e.to_string())),
|
|
);
|
|
|
|
Ok(Response::new(Box::pin(stream)))
|
|
}
|
|
}
|
|
|
|
impl From<RenderError> for tonic::Status {
|
|
fn from(value: RenderError) -> Self {
|
|
match value {
|
|
RenderError::BlobNotFound(_, _) => Self::not_found(value.to_string()),
|
|
RenderError::DirectoryNotFound(_, _) => Self::not_found(value.to_string()),
|
|
RenderError::NARWriterError(_) => Self::internal(value.to_string()),
|
|
RenderError::StoreError(_) => Self::internal(value.to_string()),
|
|
RenderError::UnexpectedBlobMeta(_, _, _, _) => Self::internal(value.to_string()),
|
|
}
|
|
}
|
|
}
|