feat(tvix/castore): add composition module

Change-Id: I0868f3278db85ae5fe030089ee9033837bc08748
Signed-off-by: Yureka <tvl@yuka.dev>
Reviewed-on: https://cl.tvl.fyi/c/depot/+/11853
Reviewed-by: flokli <flokli@flokli.de>
Tested-by: BuildkiteCI
This commit is contained in:
Yureka 2024-06-17 01:10:55 +02:00 committed by yuka
parent 64fd1d3e56
commit 1a6b6e3ef3
16 changed files with 747 additions and 51 deletions

View file

@ -1,8 +1,11 @@
use std::sync::Arc;
use futures::{StreamExt, TryStreamExt};
use tokio_util::io::{ReaderStream, StreamReader};
use tonic::async_trait;
use tracing::{instrument, warn};
use crate::composition::{CompositionContext, ServiceBuilder};
use crate::B3Digest;
use super::{naive_seeker::NaiveSeeker, BlobReader, BlobService, BlobWriter};
@ -93,6 +96,32 @@ where
}
}
#[derive(serde::Deserialize, Debug, Clone)]
#[serde(deny_unknown_fields)]
pub struct CombinedBlobServiceConfig {
local: String,
remote: String,
}
#[async_trait]
impl ServiceBuilder for CombinedBlobServiceConfig {
type Output = dyn BlobService;
async fn build<'a>(
&'a self,
_instance_name: &str,
context: &CompositionContext<dyn BlobService>,
) -> Result<Arc<dyn BlobService>, Box<dyn std::error::Error + Send + Sync>> {
let (local, remote) = futures::join!(
context.resolve(self.local.clone()),
context.resolve(self.remote.clone())
);
Ok(Arc::new(CombinedBlobService {
local: local?,
remote: remote?,
}))
}
}
fn make_chunked_reader<BS>(
// This must consume, as we can't retain references to blob_service,
// as it'd add a lifetime to BlobReader in general, which will get

View file

@ -1,4 +1,5 @@
use super::{BlobReader, BlobService, BlobWriter, ChunkedReader};
use crate::composition::{CompositionContext, ServiceBuilder};
use crate::{
proto::{self, stat_blob_response::ChunkMeta},
B3Digest,
@ -180,6 +181,27 @@ where
}
}
#[derive(serde::Deserialize, Debug)]
#[serde(deny_unknown_fields)]
pub struct GRPCBlobServiceConfig {
url: String,
}
#[async_trait]
impl ServiceBuilder for GRPCBlobServiceConfig {
type Output = dyn BlobService;
async fn build<'a>(
&'a self,
_instance_name: &str,
_context: &CompositionContext<dyn BlobService>,
) -> Result<Arc<dyn BlobService>, Box<dyn std::error::Error + Send + Sync + 'static>> {
let client = proto::blob_service_client::BlobServiceClient::new(
crate::tonic::channel_from_url(&self.url.parse()?).await?,
);
Ok(Arc::new(GRPCBlobService::from_client(client)))
}
}
pub struct GRPCBlobWriter<W: tokio::io::AsyncWrite> {
/// The task containing the put request, and the inner writer, if we're still writing.
task_and_writer: Option<(JoinHandle<Result<proto::PutBlobResponse, Status>>, W)>,

View file

@ -6,6 +6,7 @@ use tonic::async_trait;
use tracing::instrument;
use super::{BlobReader, BlobService, BlobWriter};
use crate::composition::{CompositionContext, ServiceBuilder};
use crate::B3Digest;
#[derive(Clone, Default)]
@ -37,6 +38,22 @@ impl BlobService for MemoryBlobService {
}
}
#[derive(serde::Deserialize, Debug)]
#[serde(deny_unknown_fields)]
pub struct MemoryBlobServiceConfig {}
#[async_trait]
impl ServiceBuilder for MemoryBlobServiceConfig {
type Output = dyn BlobService;
async fn build<'a>(
&'a self,
_instance_name: &str,
_context: &CompositionContext<dyn BlobService>,
) -> Result<Arc<dyn BlobService>, Box<dyn std::error::Error + Send + Sync + 'static>> {
Ok(Arc::new(MemoryBlobService::default()))
}
}
pub struct MemoryBlobWriter {
db: Arc<RwLock<HashMap<B3Digest, Vec<u8>>>>,

View file

@ -1,6 +1,8 @@
use std::io;
use tonic::async_trait;
use crate::composition::{Registry, ServiceBuilder};
use crate::proto::stat_blob_response::ChunkMeta;
use crate::B3Digest;
@ -16,11 +18,11 @@ mod object_store;
pub mod tests;
pub use self::chunked_reader::ChunkedReader;
pub use self::combinator::CombinedBlobService;
pub use self::combinator::{CombinedBlobService, CombinedBlobServiceConfig};
pub use self::from_addr::from_addr;
pub use self::grpc::GRPCBlobService;
pub use self::memory::MemoryBlobService;
pub use self::object_store::ObjectStoreBlobService;
pub use self::grpc::{GRPCBlobService, GRPCBlobServiceConfig};
pub use self::memory::{MemoryBlobService, MemoryBlobServiceConfig};
pub use self::object_store::{ObjectStoreBlobService, ObjectStoreBlobServiceConfig};
/// The base trait all BlobService services need to implement.
/// It provides functions to check whether a given blob exists,
@ -101,3 +103,11 @@ impl BlobReader for io::Cursor<&'static [u8; 0]> {}
impl BlobReader for io::Cursor<Vec<u8>> {}
impl BlobReader for io::Cursor<bytes::Bytes> {}
impl BlobReader for tokio::fs::File {}
/// Registers the builtin BlobService implementations with the registry
pub(crate) fn register_blob_services(reg: &mut Registry) {
reg.register::<Box<dyn ServiceBuilder<Output = dyn BlobService>>, super::blobservice::ObjectStoreBlobServiceConfig>("objectstore");
reg.register::<Box<dyn ServiceBuilder<Output = dyn BlobService>>, super::blobservice::MemoryBlobServiceConfig>("memory");
reg.register::<Box<dyn ServiceBuilder<Output = dyn BlobService>>, super::blobservice::CombinedBlobServiceConfig>("combined");
reg.register::<Box<dyn ServiceBuilder<Output = dyn BlobService>>, super::blobservice::GRPCBlobServiceConfig>("grpc");
}

View file

@ -1,4 +1,5 @@
use std::{
collections::HashMap,
io::{self, Cursor},
pin::pin,
sync::Arc,
@ -18,6 +19,7 @@ use tracing::{debug, instrument, trace, Level};
use url::Url;
use crate::{
composition::{CompositionContext, ServiceBuilder},
proto::{stat_blob_response::ChunkMeta, StatBlobResponse},
B3Digest, B3HashingReader,
};
@ -269,6 +271,40 @@ impl BlobService for ObjectStoreBlobService {
}
}
fn default_avg_chunk_size() -> u32 {
256 * 1024
}
#[derive(serde::Deserialize)]
#[serde(deny_unknown_fields)]
pub struct ObjectStoreBlobServiceConfig {
object_store_url: String,
#[serde(default = "default_avg_chunk_size")]
avg_chunk_size: u32,
#[serde(default)]
object_store_options: HashMap<String, String>,
}
#[async_trait]
impl ServiceBuilder for ObjectStoreBlobServiceConfig {
type Output = dyn BlobService;
async fn build<'a>(
&'a self,
_instance_name: &str,
_context: &CompositionContext<dyn BlobService>,
) -> Result<Arc<dyn BlobService>, Box<dyn std::error::Error + Send + Sync + 'static>> {
let (object_store, path) = object_store::parse_url_opts(
&self.object_store_url.parse()?,
&self.object_store_options,
)?;
Ok(Arc::new(ObjectStoreBlobService {
object_store: Arc::new(object_store),
base_path: path,
avg_chunk_size: self.avg_chunk_size,
}))
}
}
/// Reads blob contents from a AsyncRead, chunks and uploads them.
/// On success, returns a [StatBlobResponse] pointing to the individual chunks.
#[instrument(skip_all, fields(base_path=%base_path, min_chunk_size, avg_chunk_size, max_chunk_size), err)]