From 3491a7e679d16608f6b80aad2f9180ea87576020 Mon Sep 17 00:00:00 2001 From: Florian Klink Date: Wed, 19 Feb 2025 15:13:54 +0100 Subject: [PATCH] refactor(tvix/castore): use "blake3" for Display impl make the Display impl of B3Digest use the `blake3-` prefix rather than `b3:`, which is more close to the SRI hashes we use in NixHash. Also use Span::current().record() to record the `blob.digest` field (and only when the digest has the right lenght). It doesn't make sense to brand it as blake3 before anyways, and allows us to get rid of the manual implementation. Change-Id: Iee349557ef4761807af1fb3942387de1942ab12b Reviewed-on: https://cl.tvl.fyi/c/depot/+/13162 Autosubmit: flokli Reviewed-by: Ilan Joselevich Tested-by: BuildkiteCI --- tvix/castore/src/digests.rs | 6 ++++-- .../src/proto/grpc_blobservice_wrapper.rs | 19 ++++++++++++------- 2 files changed, 16 insertions(+), 9 deletions(-) diff --git a/tvix/castore/src/digests.rs b/tvix/castore/src/digests.rs index 6c9104582..7cecea2c1 100644 --- a/tvix/castore/src/digests.rs +++ b/tvix/castore/src/digests.rs @@ -87,12 +87,14 @@ impl Clone for B3Digest { impl std::fmt::Display for B3Digest { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "b3:{}", BASE64.encode(&self.0)) + f.write_str("blake3-").unwrap(); + BASE64.encode_write(&self.0, f) } } impl std::fmt::Debug for B3Digest { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "b3:{}", BASE64.encode(&self.0)) + f.write_str("blake3-").unwrap(); + BASE64.encode_write(&self.0, f) } } diff --git a/tvix/castore/src/proto/grpc_blobservice_wrapper.rs b/tvix/castore/src/proto/grpc_blobservice_wrapper.rs index 41bd0698e..86f5d7860 100644 --- a/tvix/castore/src/proto/grpc_blobservice_wrapper.rs +++ b/tvix/castore/src/proto/grpc_blobservice_wrapper.rs @@ -1,6 +1,5 @@ -use crate::blobservice::BlobService; +use crate::{blobservice::BlobService, B3Digest}; use core::pin::pin; -use data_encoding::BASE64; use futures::{stream::BoxStream, TryFutureExt}; use std::{ collections::VecDeque, @@ -9,7 +8,7 @@ use std::{ use tokio_stream::StreamExt; use tokio_util::io::ReaderStream; use tonic::{async_trait, Request, Response, Status, Streaming}; -use tracing::{instrument, warn}; +use tracing::{instrument, warn, Span}; pub struct GRPCBlobServiceWrapper { blob_service: T, @@ -87,17 +86,20 @@ where // https://github.com/tokio-rs/tokio/issues/2723#issuecomment-1534723933 type ReadStream = BoxStream<'static, Result>; - #[instrument(skip_all, fields(blob.digest=format!("b3:{}", BASE64.encode(&request.get_ref().digest))))] + #[instrument(skip_all)] async fn stat( &self, request: Request, ) -> Result, Status> { let rq = request.into_inner(); - let req_digest = rq + let req_digest: B3Digest = rq .digest .try_into() .map_err(|_e| Status::invalid_argument("invalid digest length"))?; + let span = Span::current(); + span.record("blob.digest", req_digest.to_string()); + match self.blob_service.chunks(&req_digest).await { Ok(None) => Err(Status::not_found(format!("blob {} not found", &req_digest))), Ok(Some(chunk_metas)) => Ok(Response::new(super::StatBlobResponse { @@ -111,18 +113,21 @@ where } } - #[instrument(skip_all, fields(blob.digest=format!("b3:{}", BASE64.encode(&request.get_ref().digest))))] + #[instrument(skip_all)] async fn read( &self, request: Request, ) -> Result, Status> { let rq = request.into_inner(); - let req_digest = rq + let req_digest: B3Digest = rq .digest .try_into() .map_err(|_e| Status::invalid_argument("invalid digest length"))?; + let span = Span::current(); + span.record("blob.digest", req_digest.to_string()); + match self.blob_service.open_read(&req_digest).await { Ok(Some(r)) => { let chunks_stream =