refactor(tvix/store/blobsvc): move from Vec<u8> to B3Digest
Change-Id: I809bab75221f81b6023cfe75c2fe9e589c1e9192 Reviewed-on: https://cl.tvl.fyi/c/depot/+/8605 Autosubmit: flokli <flokli@flokli.de> Reviewed-by: tazjin <tazjin@tvl.su> Tested-by: BuildkiteCI
This commit is contained in:
parent
b8ff08b1b0
commit
066179651c
9 changed files with 79 additions and 95 deletions
|
|
@ -1,8 +1,8 @@
|
|||
use crate::{
|
||||
blobservice::{BlobService, BlobWriter},
|
||||
proto::sync_read_into_async_read::SyncReadIntoAsyncRead,
|
||||
B3Digest,
|
||||
};
|
||||
use data_encoding::BASE64;
|
||||
use std::{collections::VecDeque, io, pin::Pin};
|
||||
use tokio::task;
|
||||
use tokio_stream::StreamExt;
|
||||
|
|
@ -36,10 +36,7 @@ impl<BS: BlobService + Send + Sync + Clone + 'static> super::blob_service_server
|
|||
request: Request<super::StatBlobRequest>,
|
||||
) -> Result<Response<super::BlobMeta>, Status> {
|
||||
let rq = request.into_inner();
|
||||
let req_digest: [u8; 32] = rq
|
||||
.digest
|
||||
.clone()
|
||||
.try_into()
|
||||
let req_digest = B3Digest::from_vec(rq.digest)
|
||||
.map_err(|_e| Status::invalid_argument("invalid digest length"))?;
|
||||
|
||||
if rq.include_chunks || rq.include_bao {
|
||||
|
|
@ -48,10 +45,7 @@ impl<BS: BlobService + Send + Sync + Clone + 'static> super::blob_service_server
|
|||
|
||||
match self.blob_service.has(&req_digest) {
|
||||
Ok(true) => Ok(Response::new(super::BlobMeta::default())),
|
||||
Ok(false) => Err(Status::not_found(format!(
|
||||
"blob {} not found",
|
||||
BASE64.encode(&req_digest)
|
||||
))),
|
||||
Ok(false) => Err(Status::not_found(format!("blob {} not found", &req_digest))),
|
||||
Err(e) => Err(e.into()),
|
||||
}
|
||||
}
|
||||
|
|
@ -63,11 +57,8 @@ impl<BS: BlobService + Send + Sync + Clone + 'static> super::blob_service_server
|
|||
) -> Result<Response<Self::ReadStream>, Status> {
|
||||
let rq = request.into_inner();
|
||||
|
||||
let req_digest: [u8; 32] = rq
|
||||
.digest
|
||||
.clone()
|
||||
.try_into()
|
||||
.map_err(|_| Status::invalid_argument("invalid digest length"))?;
|
||||
let req_digest = B3Digest::from_vec(rq.digest)
|
||||
.map_err(|_e| Status::invalid_argument("invalid digest length"))?;
|
||||
|
||||
match self.blob_service.open_read(&req_digest) {
|
||||
Ok(Some(reader)) => {
|
||||
|
|
@ -87,10 +78,7 @@ impl<BS: BlobService + Send + Sync + Clone + 'static> super::blob_service_server
|
|||
let chunks_stream = ReaderStream::new(async_reader).map(stream_mapper);
|
||||
Ok(Response::new(Box::pin(chunks_stream)))
|
||||
}
|
||||
Ok(None) => Err(Status::not_found(format!(
|
||||
"blob {} not found",
|
||||
BASE64.encode(&rq.digest)
|
||||
))),
|
||||
Ok(None) => Err(Status::not_found(format!("blob {} not found", &req_digest))),
|
||||
Err(e) => Err(e.into()),
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue