refactor(tvix/castore): use "blake3" for Display impl
make the Display impl of B3Digest use the `blake3-` prefix rather than `b3:`, which is more close to the SRI hashes we use in NixHash. Also use Span::current().record() to record the `blob.digest` field (and only when the digest has the right lenght). It doesn't make sense to brand it as blake3 before anyways, and allows us to get rid of the manual implementation. Change-Id: Iee349557ef4761807af1fb3942387de1942ab12b Reviewed-on: https://cl.tvl.fyi/c/depot/+/13162 Autosubmit: flokli <flokli@flokli.de> Reviewed-by: Ilan Joselevich <personal@ilanjoselevich.com> Tested-by: BuildkiteCI
This commit is contained in:
parent
51ea9c7801
commit
3491a7e679
2 changed files with 16 additions and 9 deletions
|
|
@ -87,12 +87,14 @@ impl Clone for B3Digest {
|
||||||
|
|
||||||
impl std::fmt::Display for B3Digest {
|
impl std::fmt::Display for B3Digest {
|
||||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||||
write!(f, "b3:{}", BASE64.encode(&self.0))
|
f.write_str("blake3-").unwrap();
|
||||||
|
BASE64.encode_write(&self.0, f)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl std::fmt::Debug for B3Digest {
|
impl std::fmt::Debug for B3Digest {
|
||||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||||
write!(f, "b3:{}", BASE64.encode(&self.0))
|
f.write_str("blake3-").unwrap();
|
||||||
|
BASE64.encode_write(&self.0, f)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -1,6 +1,5 @@
|
||||||
use crate::blobservice::BlobService;
|
use crate::{blobservice::BlobService, B3Digest};
|
||||||
use core::pin::pin;
|
use core::pin::pin;
|
||||||
use data_encoding::BASE64;
|
|
||||||
use futures::{stream::BoxStream, TryFutureExt};
|
use futures::{stream::BoxStream, TryFutureExt};
|
||||||
use std::{
|
use std::{
|
||||||
collections::VecDeque,
|
collections::VecDeque,
|
||||||
|
|
@ -9,7 +8,7 @@ use std::{
|
||||||
use tokio_stream::StreamExt;
|
use tokio_stream::StreamExt;
|
||||||
use tokio_util::io::ReaderStream;
|
use tokio_util::io::ReaderStream;
|
||||||
use tonic::{async_trait, Request, Response, Status, Streaming};
|
use tonic::{async_trait, Request, Response, Status, Streaming};
|
||||||
use tracing::{instrument, warn};
|
use tracing::{instrument, warn, Span};
|
||||||
|
|
||||||
pub struct GRPCBlobServiceWrapper<T> {
|
pub struct GRPCBlobServiceWrapper<T> {
|
||||||
blob_service: T,
|
blob_service: T,
|
||||||
|
|
@ -87,17 +86,20 @@ where
|
||||||
// https://github.com/tokio-rs/tokio/issues/2723#issuecomment-1534723933
|
// https://github.com/tokio-rs/tokio/issues/2723#issuecomment-1534723933
|
||||||
type ReadStream = BoxStream<'static, Result<super::BlobChunk, Status>>;
|
type ReadStream = BoxStream<'static, Result<super::BlobChunk, Status>>;
|
||||||
|
|
||||||
#[instrument(skip_all, fields(blob.digest=format!("b3:{}", BASE64.encode(&request.get_ref().digest))))]
|
#[instrument(skip_all)]
|
||||||
async fn stat(
|
async fn stat(
|
||||||
&self,
|
&self,
|
||||||
request: Request<super::StatBlobRequest>,
|
request: Request<super::StatBlobRequest>,
|
||||||
) -> Result<Response<super::StatBlobResponse>, Status> {
|
) -> Result<Response<super::StatBlobResponse>, Status> {
|
||||||
let rq = request.into_inner();
|
let rq = request.into_inner();
|
||||||
let req_digest = rq
|
let req_digest: B3Digest = rq
|
||||||
.digest
|
.digest
|
||||||
.try_into()
|
.try_into()
|
||||||
.map_err(|_e| Status::invalid_argument("invalid digest length"))?;
|
.map_err(|_e| Status::invalid_argument("invalid digest length"))?;
|
||||||
|
|
||||||
|
let span = Span::current();
|
||||||
|
span.record("blob.digest", req_digest.to_string());
|
||||||
|
|
||||||
match self.blob_service.chunks(&req_digest).await {
|
match self.blob_service.chunks(&req_digest).await {
|
||||||
Ok(None) => Err(Status::not_found(format!("blob {} not found", &req_digest))),
|
Ok(None) => Err(Status::not_found(format!("blob {} not found", &req_digest))),
|
||||||
Ok(Some(chunk_metas)) => Ok(Response::new(super::StatBlobResponse {
|
Ok(Some(chunk_metas)) => Ok(Response::new(super::StatBlobResponse {
|
||||||
|
|
@ -111,18 +113,21 @@ where
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[instrument(skip_all, fields(blob.digest=format!("b3:{}", BASE64.encode(&request.get_ref().digest))))]
|
#[instrument(skip_all)]
|
||||||
async fn read(
|
async fn read(
|
||||||
&self,
|
&self,
|
||||||
request: Request<super::ReadBlobRequest>,
|
request: Request<super::ReadBlobRequest>,
|
||||||
) -> Result<Response<Self::ReadStream>, Status> {
|
) -> Result<Response<Self::ReadStream>, Status> {
|
||||||
let rq = request.into_inner();
|
let rq = request.into_inner();
|
||||||
|
|
||||||
let req_digest = rq
|
let req_digest: B3Digest = rq
|
||||||
.digest
|
.digest
|
||||||
.try_into()
|
.try_into()
|
||||||
.map_err(|_e| Status::invalid_argument("invalid digest length"))?;
|
.map_err(|_e| Status::invalid_argument("invalid digest length"))?;
|
||||||
|
|
||||||
|
let span = Span::current();
|
||||||
|
span.record("blob.digest", req_digest.to_string());
|
||||||
|
|
||||||
match self.blob_service.open_read(&req_digest).await {
|
match self.blob_service.open_read(&req_digest).await {
|
||||||
Ok(Some(r)) => {
|
Ok(Some(r)) => {
|
||||||
let chunks_stream =
|
let chunks_stream =
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue