diff --git a/tvix/castore/src/digests.rs b/tvix/castore/src/digests.rs index 6c9104582..7cecea2c1 100644 --- a/tvix/castore/src/digests.rs +++ b/tvix/castore/src/digests.rs @@ -87,12 +87,14 @@ impl Clone for B3Digest { impl std::fmt::Display for B3Digest { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "b3:{}", BASE64.encode(&self.0)) + f.write_str("blake3-").unwrap(); + BASE64.encode_write(&self.0, f) } } impl std::fmt::Debug for B3Digest { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "b3:{}", BASE64.encode(&self.0)) + f.write_str("blake3-").unwrap(); + BASE64.encode_write(&self.0, f) } } diff --git a/tvix/castore/src/proto/grpc_blobservice_wrapper.rs b/tvix/castore/src/proto/grpc_blobservice_wrapper.rs index 41bd0698e..86f5d7860 100644 --- a/tvix/castore/src/proto/grpc_blobservice_wrapper.rs +++ b/tvix/castore/src/proto/grpc_blobservice_wrapper.rs @@ -1,6 +1,5 @@ -use crate::blobservice::BlobService; +use crate::{blobservice::BlobService, B3Digest}; use core::pin::pin; -use data_encoding::BASE64; use futures::{stream::BoxStream, TryFutureExt}; use std::{ collections::VecDeque, @@ -9,7 +8,7 @@ use std::{ use tokio_stream::StreamExt; use tokio_util::io::ReaderStream; use tonic::{async_trait, Request, Response, Status, Streaming}; -use tracing::{instrument, warn}; +use tracing::{instrument, warn, Span}; pub struct GRPCBlobServiceWrapper { blob_service: T, @@ -87,17 +86,20 @@ where // https://github.com/tokio-rs/tokio/issues/2723#issuecomment-1534723933 type ReadStream = BoxStream<'static, Result>; - #[instrument(skip_all, fields(blob.digest=format!("b3:{}", BASE64.encode(&request.get_ref().digest))))] + #[instrument(skip_all)] async fn stat( &self, request: Request, ) -> Result, Status> { let rq = request.into_inner(); - let req_digest = rq + let req_digest: B3Digest = rq .digest .try_into() .map_err(|_e| Status::invalid_argument("invalid digest length"))?; + let span = Span::current(); + span.record("blob.digest", req_digest.to_string()); + match self.blob_service.chunks(&req_digest).await { Ok(None) => Err(Status::not_found(format!("blob {} not found", &req_digest))), Ok(Some(chunk_metas)) => Ok(Response::new(super::StatBlobResponse { @@ -111,18 +113,21 @@ where } } - #[instrument(skip_all, fields(blob.digest=format!("b3:{}", BASE64.encode(&request.get_ref().digest))))] + #[instrument(skip_all)] async fn read( &self, request: Request, ) -> Result, Status> { let rq = request.into_inner(); - let req_digest = rq + let req_digest: B3Digest = rq .digest .try_into() .map_err(|_e| Status::invalid_argument("invalid digest length"))?; + let span = Span::current(); + span.record("blob.digest", req_digest.to_string()); + match self.blob_service.open_read(&req_digest).await { Ok(Some(r)) => { let chunks_stream =