This splits the pure content-addressed layers from tvix-store into a `castore` crate, and only leaves PathInfo related things, as well as the CLI entrypoint in the tvix-store crate. Notable changes: - `fixtures` and `utils` had to be moved out of the `test` cfg, so they can be imported from tvix-store. - Some ad-hoc fixtures in the test were moved to proper fixtures in the same step. - The protos are now created by a (more static) recipe in the protos/ directory. The (now two) golang targets are commented out, as it's not possible to update them properly in the same CL. This will be done by a followup CL once this is merged (and whitby deployed) Bug: https://b.tvl.fyi/issues/301 Change-Id: I8d675d4bf1fb697eb7d479747c1b1e3635718107 Reviewed-on: https://cl.tvl.fyi/c/depot/+/9370 Reviewed-by: tazjin <tazjin@tvl.su> Reviewed-by: flokli <flokli@flokli.de> Autosubmit: flokli <flokli@flokli.de> Tested-by: BuildkiteCI Reviewed-by: Connor Brewster <cbrewster@hey.com>
125 lines
4.5 KiB
Rust
125 lines
4.5 KiB
Rust
use crate::nar::RenderError;
|
|
use crate::pathinfoservice::PathInfoService;
|
|
use crate::proto;
|
|
use futures::StreamExt;
|
|
use std::sync::Arc;
|
|
use tokio::task;
|
|
use tokio_stream::wrappers::ReceiverStream;
|
|
use tonic::{async_trait, Request, Response, Result, Status};
|
|
use tracing::{debug, instrument, warn};
|
|
use tvix_castore::proto as castorepb;
|
|
|
|
pub struct GRPCPathInfoServiceWrapper {
|
|
path_info_service: Arc<dyn PathInfoService>,
|
|
// FUTUREWORK: allow exposing without allowing listing
|
|
}
|
|
|
|
impl From<Arc<dyn PathInfoService>> for GRPCPathInfoServiceWrapper {
|
|
fn from(value: Arc<dyn PathInfoService>) -> Self {
|
|
Self {
|
|
path_info_service: value,
|
|
}
|
|
}
|
|
}
|
|
|
|
#[async_trait]
|
|
impl proto::path_info_service_server::PathInfoService for GRPCPathInfoServiceWrapper {
|
|
type ListStream = ReceiverStream<tonic::Result<proto::PathInfo, Status>>;
|
|
|
|
#[instrument(skip(self))]
|
|
async fn get(
|
|
&self,
|
|
request: Request<proto::GetPathInfoRequest>,
|
|
) -> Result<Response<proto::PathInfo>> {
|
|
match request.into_inner().by_what {
|
|
None => Err(Status::unimplemented("by_what needs to be specified")),
|
|
Some(proto::get_path_info_request::ByWhat::ByOutputHash(output_digest)) => {
|
|
let digest: [u8; 20] = output_digest
|
|
.to_vec()
|
|
.try_into()
|
|
.map_err(|_e| Status::invalid_argument("invalid output digest length"))?;
|
|
match self.path_info_service.get(digest).await {
|
|
Ok(None) => Err(Status::not_found("PathInfo not found")),
|
|
Ok(Some(path_info)) => Ok(Response::new(path_info)),
|
|
Err(e) => {
|
|
warn!("failed to retrieve PathInfo: {}", e);
|
|
Err(e.into())
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
#[instrument(skip(self))]
|
|
async fn put(&self, request: Request<proto::PathInfo>) -> Result<Response<proto::PathInfo>> {
|
|
let path_info = request.into_inner();
|
|
|
|
// Store the PathInfo in the client. Clients MUST validate the data
|
|
// they receive, so we don't validate additionally here.
|
|
match self.path_info_service.put(path_info).await {
|
|
Ok(path_info_new) => Ok(Response::new(path_info_new)),
|
|
Err(e) => {
|
|
warn!("failed to insert PathInfo: {}", e);
|
|
Err(e.into())
|
|
}
|
|
}
|
|
}
|
|
|
|
#[instrument(skip(self))]
|
|
async fn calculate_nar(
|
|
&self,
|
|
request: Request<castorepb::Node>,
|
|
) -> Result<Response<proto::CalculateNarResponse>> {
|
|
match request.into_inner().node {
|
|
None => Err(Status::invalid_argument("no root node sent")),
|
|
Some(root_node) => {
|
|
let path_info_service = self.path_info_service.clone();
|
|
let (nar_size, nar_sha256) = path_info_service
|
|
.calculate_nar(&root_node)
|
|
.await
|
|
.expect("error during nar calculation"); // TODO: handle error
|
|
|
|
Ok(Response::new(proto::CalculateNarResponse {
|
|
nar_size,
|
|
nar_sha256: nar_sha256.to_vec().into(),
|
|
}))
|
|
}
|
|
}
|
|
}
|
|
|
|
#[instrument(skip(self))]
|
|
async fn list(
|
|
&self,
|
|
_request: Request<proto::ListPathInfoRequest>,
|
|
) -> Result<Response<Self::ListStream>, Status> {
|
|
let (tx, rx) = tokio::sync::mpsc::channel(5);
|
|
|
|
let path_info_service = self.path_info_service.clone();
|
|
|
|
let _task = task::spawn(async move {
|
|
let mut stream = path_info_service.list();
|
|
while let Some(e) = stream.next().await {
|
|
let res = e.map_err(|e| Status::internal(e.to_string()));
|
|
if tx.send(res).await.is_err() {
|
|
debug!("receiver dropped");
|
|
break;
|
|
}
|
|
}
|
|
});
|
|
|
|
let receiver_stream = ReceiverStream::new(rx);
|
|
Ok(Response::new(receiver_stream))
|
|
}
|
|
}
|
|
|
|
impl From<RenderError> for tonic::Status {
|
|
fn from(value: RenderError) -> Self {
|
|
match value {
|
|
RenderError::BlobNotFound(_, _) => Self::not_found(value.to_string()),
|
|
RenderError::DirectoryNotFound(_, _) => Self::not_found(value.to_string()),
|
|
RenderError::NARWriterError(_) => Self::internal(value.to_string()),
|
|
RenderError::StoreError(_) => Self::internal(value.to_string()),
|
|
RenderError::UnexpectedBlobMeta(_, _, _, _) => Self::internal(value.to_string()),
|
|
}
|
|
}
|
|
}
|