feat(tvix/store/proto): use Bytes instead of Vec<u8>

Makes use of https://github.com/tokio-rs/prost/pull/341, which makes our
bytes field cheaper to clone.

It's a bit annoying to configure due to
https://github.com/hyperium/tonic/issues/908, but the workaround does
get the job done.

Change-Id: I25714600b041bb5432d3adf5859b151e72b12778
Reviewed-on: https://cl.tvl.fyi/c/depot/+/8975
Reviewed-by: raitobezarius <tvl@lahfa.xyz>
Tested-by: BuildkiteCI
Reviewed-by: tazjin <tazjin@tvl.su>
Autosubmit: flokli <flokli@flokli.de>
This commit is contained in:
Florian Klink 2023-07-19 18:52:50 +03:00 committed by clbot
parent 7971d7d9ff
commit 432222f098
34 changed files with 216 additions and 164 deletions

View file

@ -133,9 +133,7 @@ impl super::blob_service_server::BlobService for GRPCBlobServiceWrapper {
x: Result<bytes::Bytes, io::Error>,
) -> Result<super::BlobChunk, Status> {
match x {
Ok(bytes) => Ok(super::BlobChunk {
data: bytes.to_vec(),
}),
Ok(bytes) => Ok(super::BlobChunk { data: bytes }),
Err(e) => Err(Status::from(e)),
}
}
@ -156,7 +154,7 @@ impl super::blob_service_server::BlobService for GRPCBlobServiceWrapper {
let req_inner = request.into_inner();
let data_stream = req_inner.map(|x| {
x.map(|x| VecDeque::from(x.data))
x.map(|x| VecDeque::from(x.data.to_vec()))
.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidInput, e))
});
@ -182,7 +180,9 @@ impl super::blob_service_server::BlobService for GRPCBlobServiceWrapper {
})?
.to_vec();
Ok(super::PutBlobResponse { digest })
Ok(super::PutBlobResponse {
digest: digest.into(),
})
})
.await
.map_err(|_| Status::internal("failed to wait for task"))??;

View file

@ -176,7 +176,7 @@ impl proto::directory_service_server::DirectoryService for GRPCDirectoryServiceW
match last_directory_dgst {
None => Err(Status::invalid_argument("no directories received")),
Some(last_directory_dgst) => Ok(Response::new(proto::PutDirectoryResponse {
root_digest: last_directory_dgst.to_vec(),
root_digest: last_directory_dgst.into(),
})),
}
}

View file

@ -26,10 +26,11 @@ impl proto::path_info_service_server::PathInfoService for GRPCPathInfoServiceWra
) -> Result<Response<proto::PathInfo>> {
match request.into_inner().by_what {
None => Err(Status::unimplemented("by_what needs to be specified")),
Some(proto::get_path_info_request::ByWhat::ByOutputHash(digest)) => {
let digest: [u8; 20] = digest
Some(proto::get_path_info_request::ByWhat::ByOutputHash(output_digest)) => {
let digest: [u8; 20] = output_digest
.to_vec()
.try_into()
.map_err(|_e| Status::invalid_argument("invalid digest length"))?;
.map_err(|_e| Status::invalid_argument("invalid output digest length"))?;
match self.path_info_service.get(digest) {
Ok(None) => Err(Status::not_found("PathInfo not found")),
Ok(Some(path_info)) => Ok(Response::new(path_info)),
@ -72,7 +73,7 @@ impl proto::path_info_service_server::PathInfoService for GRPCPathInfoServiceWra
Ok(Response::new(proto::CalculateNarResponse {
nar_size,
nar_sha256: nar_sha256.to_vec(),
nar_sha256: nar_sha256.to_vec().into(),
}))
}
}

View file

@ -86,7 +86,7 @@ fn validate_node_name<E>(name: &[u8], err: fn(Vec<u8>) -> E) -> Result<(), E> {
/// Checks a digest for validity.
/// Digests are 32 bytes long, as we store blake3 digests.
fn validate_digest<E>(digest: &Vec<u8>, err: fn(usize) -> E) -> Result<(), E> {
fn validate_digest<E>(digest: &bytes::Bytes, err: fn(usize) -> E) -> Result<(), E> {
if digest.len() != 32 {
return Err(err(digest.len()));
}

View file

@ -18,7 +18,7 @@ fn size() {
let d = Directory {
directories: vec![DirectoryNode {
name: "foo".into(),
digest: DUMMY_DIGEST.to_vec(),
digest: DUMMY_DIGEST.to_vec().into(),
size: 0,
}],
..Default::default()
@ -29,7 +29,7 @@ fn size() {
let d = Directory {
directories: vec![DirectoryNode {
name: "foo".into(),
digest: DUMMY_DIGEST.to_vec(),
digest: DUMMY_DIGEST.to_vec().into(),
size: 4,
}],
..Default::default()
@ -40,7 +40,7 @@ fn size() {
let d = Directory {
files: vec![FileNode {
name: "foo".into(),
digest: DUMMY_DIGEST.to_vec(),
digest: DUMMY_DIGEST.to_vec().into(),
size: 42,
executable: false,
}],
@ -88,7 +88,7 @@ fn validate_invalid_names() {
let d = Directory {
directories: vec![DirectoryNode {
name: "".into(),
digest: DUMMY_DIGEST.to_vec(),
digest: DUMMY_DIGEST.to_vec().into(),
size: 42,
}],
..Default::default()
@ -105,7 +105,7 @@ fn validate_invalid_names() {
let d = Directory {
directories: vec![DirectoryNode {
name: ".".into(),
digest: DUMMY_DIGEST.to_vec(),
digest: DUMMY_DIGEST.to_vec().into(),
size: 42,
}],
..Default::default()
@ -122,7 +122,7 @@ fn validate_invalid_names() {
let d = Directory {
files: vec![FileNode {
name: "..".into(),
digest: DUMMY_DIGEST.to_vec(),
digest: DUMMY_DIGEST.to_vec().into(),
size: 42,
executable: false,
}],
@ -174,7 +174,7 @@ fn validate_invalid_digest() {
let d = Directory {
directories: vec![DirectoryNode {
name: "foo".into(),
digest: vec![0x00, 0x42], // invalid length
digest: vec![0x00, 0x42].into(), // invalid length
size: 42,
}],
..Default::default()
@ -195,12 +195,12 @@ fn validate_sorting() {
directories: vec![
DirectoryNode {
name: "b".into(),
digest: DUMMY_DIGEST.to_vec(),
digest: DUMMY_DIGEST.to_vec().into(),
size: 42,
},
DirectoryNode {
name: "a".into(),
digest: DUMMY_DIGEST.to_vec(),
digest: DUMMY_DIGEST.to_vec().into(),
size: 42,
},
],
@ -220,12 +220,12 @@ fn validate_sorting() {
directories: vec![
DirectoryNode {
name: "a".into(),
digest: DUMMY_DIGEST.to_vec(),
digest: DUMMY_DIGEST.to_vec().into(),
size: 42,
},
DirectoryNode {
name: "a".into(),
digest: DUMMY_DIGEST.to_vec(),
digest: DUMMY_DIGEST.to_vec().into(),
size: 42,
},
],
@ -245,12 +245,12 @@ fn validate_sorting() {
directories: vec![
DirectoryNode {
name: "a".into(),
digest: DUMMY_DIGEST.to_vec(),
digest: DUMMY_DIGEST.to_vec().into(),
size: 42,
},
DirectoryNode {
name: "b".into(),
digest: DUMMY_DIGEST.to_vec(),
digest: DUMMY_DIGEST.to_vec().into(),
size: 42,
},
],
@ -266,12 +266,12 @@ fn validate_sorting() {
directories: vec![
DirectoryNode {
name: "b".into(),
digest: DUMMY_DIGEST.to_vec(),
digest: DUMMY_DIGEST.to_vec().into(),
size: 42,
},
DirectoryNode {
name: "c".into(),
digest: DUMMY_DIGEST.to_vec(),
digest: DUMMY_DIGEST.to_vec().into(),
size: 42,
},
],

View file

@ -16,7 +16,7 @@ async fn not_found_read() {
let resp = service
.read(tonic::Request::new(ReadBlobRequest {
digest: BLOB_A_DIGEST.to_vec(),
digest: BLOB_A_DIGEST.clone().into(),
}))
.await;
@ -36,7 +36,7 @@ async fn not_found_stat() {
let resp = service
.stat(tonic::Request::new(StatBlobRequest {
digest: BLOB_A_DIGEST.to_vec(),
digest: BLOB_A_DIGEST.clone().into(),
..Default::default()
}))
.await
@ -54,7 +54,7 @@ async fn put_read_stat() {
// Send blob A.
let put_resp = service
.put(tonic_mock::streaming_request(vec![BlobChunk {
data: BLOB_A.clone(),
data: BLOB_A.clone().into(),
}]))
.await
.expect("must succeed")
@ -67,7 +67,7 @@ async fn put_read_stat() {
// expose it yet.
let _resp = service
.stat(tonic::Request::new(StatBlobRequest {
digest: BLOB_A_DIGEST.to_vec(),
digest: BLOB_A_DIGEST.clone().into(),
..Default::default()
}))
.await
@ -77,7 +77,7 @@ async fn put_read_stat() {
// Read the blob. It should return the same data.
let resp = service
.read(tonic::Request::new(ReadBlobRequest {
digest: BLOB_A_DIGEST.to_vec(),
digest: BLOB_A_DIGEST.clone().into(),
}))
.await;
@ -90,7 +90,7 @@ async fn put_read_stat() {
.expect("must be some")
.expect("must succeed");
assert_eq!(BLOB_A.to_vec(), item.data);
assert_eq!(BLOB_A.clone(), item.data);
// … and no more elements
assert!(rx.next().await.is_none());

View file

@ -42,7 +42,7 @@ async fn not_found() {
let resp = service
.get(tonic::Request::new(GetDirectoryRequest {
by_what: Some(ByWhat::Digest(DIRECTORY_A.digest().to_vec())),
by_what: Some(ByWhat::Digest(DIRECTORY_A.digest().into())),
..Default::default()
}))
.await;
@ -80,7 +80,7 @@ async fn put_get() {
let items = get_directories(
&service,
GetDirectoryRequest {
by_what: Some(ByWhat::Digest(DIRECTORY_A.digest().to_vec())),
by_what: Some(ByWhat::Digest(DIRECTORY_A.digest().into())),
..Default::default()
},
)
@ -122,7 +122,7 @@ async fn put_get_multiple() {
&service,
GetDirectoryRequest {
recursive: false,
by_what: Some(ByWhat::Digest(DIRECTORY_B.digest().to_vec())),
by_what: Some(ByWhat::Digest(DIRECTORY_B.digest().into())),
},
)
.await
@ -136,7 +136,7 @@ async fn put_get_multiple() {
&service,
GetDirectoryRequest {
recursive: true,
by_what: Some(ByWhat::Digest(DIRECTORY_B.digest().to_vec())),
by_what: Some(ByWhat::Digest(DIRECTORY_B.digest().into())),
},
)
.await
@ -172,7 +172,7 @@ async fn put_get_dedup() {
&service,
GetDirectoryRequest {
recursive: true,
by_what: Some(ByWhat::Digest(DIRECTORY_C.digest().to_vec())),
by_what: Some(ByWhat::Digest(DIRECTORY_C.digest().into())),
},
)
.await
@ -215,7 +215,7 @@ async fn put_reject_wrong_size() {
let broken_parent_directory = Directory {
directories: vec![DirectoryNode {
name: "foo".into(),
digest: DIRECTORY_A.digest().to_vec(),
digest: DIRECTORY_A.digest().into(),
size: 42,
}],
..Default::default()

View file

@ -32,7 +32,7 @@ async fn not_found() {
let resp = service
.get(Request::new(GetPathInfoRequest {
by_what: Some(ByOutputHash(DUMMY_OUTPUT_HASH.to_vec())),
by_what: Some(ByOutputHash(DUMMY_OUTPUT_HASH.clone())),
}))
.await;
@ -62,7 +62,7 @@ async fn put_get() {
let resp = service
.get(Request::new(GetPathInfoRequest {
by_what: Some(ByOutputHash(DUMMY_OUTPUT_HASH.to_vec())),
by_what: Some(ByOutputHash(DUMMY_OUTPUT_HASH.clone())),
}))
.await;

View file

@ -1,20 +1,28 @@
use crate::proto::{self, Node, PathInfo, ValidatePathInfoError};
use crate::B3Digest;
use bytes::Bytes;
use lazy_static::lazy_static;
use nix_compat::store_path::{self, StorePath};
use std::str::FromStr;
use test_case::test_case;
lazy_static! {
static ref DUMMY_DIGEST: Vec<u8> = vec![
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00,
];
static ref DUMMY_DIGEST_2: Vec<u8> = vec![
0x10, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00,
];
static ref DUMMY_DIGEST: B3Digest = {
let u: &[u8; 32] = &[
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00,
];
u.into()
};
static ref DUMMY_DIGEST_2: B3Digest = {
let u: &[u8; 32] = &[
0x10, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00,
];
u.into()
};
}
const DUMMY_NAME: &str = "00000000000000000000000000000000-dummy";
@ -44,7 +52,7 @@ fn validate_no_node(
#[test_case(
proto::DirectoryNode {
name: DUMMY_NAME.into(),
digest: DUMMY_DIGEST.to_vec(),
digest: DUMMY_DIGEST.clone().into(),
size: 0,
},
Ok(StorePath::from_str(DUMMY_NAME).expect("must succeed"));
@ -53,7 +61,7 @@ fn validate_no_node(
#[test_case(
proto::DirectoryNode {
name: DUMMY_NAME.into(),
digest: vec![],
digest: Bytes::new(),
size: 0,
},
Err(ValidatePathInfoError::InvalidDigestLen(0));
@ -62,7 +70,7 @@ fn validate_no_node(
#[test_case(
proto::DirectoryNode {
name: "invalid".into(),
digest: DUMMY_DIGEST.to_vec(),
digest: DUMMY_DIGEST.clone().into(),
size: 0,
},
Err(ValidatePathInfoError::InvalidNodeName(
@ -88,7 +96,7 @@ fn validate_directory(
#[test_case(
proto::FileNode {
name: DUMMY_NAME.into(),
digest: DUMMY_DIGEST.to_vec(),
digest: DUMMY_DIGEST.clone().into(),
size: 0,
executable: false,
},
@ -98,7 +106,7 @@ fn validate_directory(
#[test_case(
proto::FileNode {
name: DUMMY_NAME.into(),
digest: vec![],
digest: Bytes::new(),
..Default::default()
},
Err(ValidatePathInfoError::InvalidDigestLen(0));
@ -107,7 +115,7 @@ fn validate_directory(
#[test_case(
proto::FileNode {
name: "invalid".into(),
digest: DUMMY_DIGEST.to_vec(),
digest: DUMMY_DIGEST.clone().into(),
..Default::default()
},
Err(ValidatePathInfoError::InvalidNodeName(
@ -167,11 +175,11 @@ fn validate_references() {
node: Some(Node {
node: Some(proto::node::Node::Directory(proto::DirectoryNode {
name: DUMMY_NAME.into(),
digest: DUMMY_DIGEST.to_vec(),
digest: DUMMY_DIGEST.clone().into(),
size: 0,
})),
}),
references: vec![DUMMY_DIGEST_2.to_vec()],
references: vec![DUMMY_DIGEST_2.clone().into()],
narinfo: None,
};
assert!(path_info.validate().is_ok());
@ -180,7 +188,7 @@ fn validate_references() {
let path_info_with_narinfo_missing_refs = PathInfo {
narinfo: Some(proto::NarInfo {
nar_size: 0,
nar_sha256: DUMMY_DIGEST.to_vec(),
nar_sha256: DUMMY_DIGEST.clone().into(),
signatures: vec![],
reference_names: vec![],
}),
@ -198,7 +206,7 @@ fn validate_references() {
let path_info_with_narinfo = PathInfo {
narinfo: Some(proto::NarInfo {
nar_size: 0,
nar_sha256: DUMMY_DIGEST.to_vec(),
nar_sha256: DUMMY_DIGEST.clone().into(),
signatures: vec![],
reference_names: vec![format!("/nix/store/{}", DUMMY_NAME)],
}),