refactor(tvix/castore): factor out castore related ServiceUrls utils

Change-Id: Ib4cef49a9519ebf88a05035a7261badd312135f0
Reviewed-on: https://cl.tvl.fyi/c/depot/+/13156
Reviewed-by: flokli <flokli@flokli.de>
Tested-by: BuildkiteCI
This commit is contained in:
Marijan Petričević 2025-02-16 06:30:11 +01:00
parent 0bbeed166d
commit a2e78b62ff
6 changed files with 195 additions and 30 deletions

2
tvix/Cargo.lock generated
View file

@ -5037,6 +5037,7 @@ dependencies = [
"blake3", "blake3",
"bstr", "bstr",
"bytes", "bytes",
"clap",
"data-encoding", "data-encoding",
"digest", "digest",
"erased-serde", "erased-serde",
@ -5070,6 +5071,7 @@ dependencies = [
"tokio-tar", "tokio-tar",
"tokio-test", "tokio-test",
"tokio-util", "tokio-util",
"toml 0.8.19",
"tonic", "tonic",
"tonic-build", "tonic-build",
"tonic-reflection", "tonic-reflection",

View file

@ -16628,6 +16628,11 @@ rec {
name = "bytes"; name = "bytes";
packageId = "bytes"; packageId = "bytes";
} }
{
name = "clap";
packageId = "clap";
features = [ "derive" "env" ];
}
{ {
name = "data-encoding"; name = "data-encoding";
packageId = "data-encoding"; packageId = "data-encoding";
@ -16738,6 +16743,11 @@ rec {
packageId = "tokio-util"; packageId = "tokio-util";
features = [ "io" "io-util" "codec" ]; features = [ "io" "io-util" "codec" ];
} }
{
name = "toml";
packageId = "toml 0.8.19";
optional = true;
}
{ {
name = "tonic"; name = "tonic";
packageId = "tonic"; packageId = "tonic";
@ -16864,10 +16874,12 @@ rec {
"default" = [ "cloud" ]; "default" = [ "cloud" ];
"fs" = [ "dep:fuse-backend-rs" "dep:threadpool" "dep:libc" ]; "fs" = [ "dep:fuse-backend-rs" "dep:threadpool" "dep:libc" ];
"fuse" = [ "fs" ]; "fuse" = [ "fs" ];
"toml" = [ "dep:toml" ];
"tonic-reflection" = [ "dep:tonic-reflection" ]; "tonic-reflection" = [ "dep:tonic-reflection" ];
"virtiofs" = [ "fs" "dep:vhost" "dep:vhost-user-backend" "dep:virtio-queue" "dep:vm-memory" "dep:vmm-sys-util" "dep:virtio-bindings" "fuse-backend-rs?/vhost-user-fs" "fuse-backend-rs?/virtiofs" ]; "virtiofs" = [ "fs" "dep:vhost" "dep:vhost-user-backend" "dep:virtio-queue" "dep:vm-memory" "dep:vmm-sys-util" "dep:virtio-bindings" "fuse-backend-rs?/vhost-user-fs" "fuse-backend-rs?/virtiofs" ];
"xp-composition-cli" = [ "toml" "xp-composition-url-refs" ];
}; };
resolvedDefaultFeatures = [ "cloud" "default" "fs" "fuse" "integration" "tonic-reflection" "virtiofs" "xp-composition-url-refs" ]; resolvedDefaultFeatures = [ "cloud" "default" "fs" "fuse" "integration" "toml" "tonic-reflection" "virtiofs" "xp-composition-cli" "xp-composition-url-refs" ];
}; };
"tvix-cli" = rec { "tvix-cli" = rec {
crateName = "tvix-cli"; crateName = "tvix-cli";

View file

@ -10,6 +10,7 @@ async-tempfile.workspace = true
blake3 = { workspace = true, features = ["rayon", "std", "traits-preview"] } blake3 = { workspace = true, features = ["rayon", "std", "traits-preview"] }
bstr.workspace = true bstr.workspace = true
bytes.workspace = true bytes.workspace = true
clap = { workspace = true, features = ["derive", "env"] }
data-encoding.workspace = true data-encoding.workspace = true
digest.workspace = true digest.workspace = true
fastcdc = { workspace = true, features = ["tokio"] } fastcdc = { workspace = true, features = ["tokio"] }
@ -23,6 +24,7 @@ tokio-stream = { workspace = true, features = ["fs", "net"] }
tokio-util = { workspace = true, features = ["io", "io-util", "codec"] } tokio-util = { workspace = true, features = ["io", "io-util", "codec"] }
tokio-tar.workspace = true tokio-tar.workspace = true
tokio = { workspace = true, features = ["fs", "macros", "net", "rt", "rt-multi-thread", "signal"] } tokio = { workspace = true, features = ["fs", "macros", "net", "rt", "rt-multi-thread", "signal"] }
toml = { version = "0.8.19", optional = true }
tonic.workspace = true tonic.workspace = true
tower.workspace = true tower.workspace = true
tracing.workspace = true tracing.workspace = true
@ -91,6 +93,7 @@ virtiofs = [
] ]
fuse = ["fs"] fuse = ["fs"]
tonic-reflection = ["dep:tonic-reflection"] tonic-reflection = ["dep:tonic-reflection"]
xp-composition-cli = ["toml", "xp-composition-url-refs"]
# This feature enables anonymous url syntax which might inherently expose # This feature enables anonymous url syntax which might inherently expose
# arbitrary composition possibilities to the user. # arbitrary composition possibilities to the user.
xp-composition-url-refs = [] xp-composition-url-refs = []

View file

@ -7,6 +7,7 @@ pub mod composition;
pub mod directoryservice; pub mod directoryservice;
pub mod fixtures; pub mod fixtures;
pub mod refscan; pub mod refscan;
pub mod utils;
#[cfg(feature = "fs")] #[cfg(feature = "fs")]
pub mod fs; pub mod fs;

165
tvix/castore/src/utils.rs Normal file
View file

@ -0,0 +1,165 @@
use std::collections::HashMap;
use std::sync::Arc;
use url::Url;
use crate::blobservice::BlobService;
use crate::composition::{
with_registry, Composition, DeserializeWithRegistry, ServiceBuilder, REG,
};
use crate::directoryservice::DirectoryService;
#[derive(serde::Deserialize, Default)]
pub struct CompositionConfigs {
pub blobservices:
HashMap<String, DeserializeWithRegistry<Box<dyn ServiceBuilder<Output = dyn BlobService>>>>,
pub directoryservices: HashMap<
String,
DeserializeWithRegistry<Box<dyn ServiceBuilder<Output = dyn DirectoryService>>>,
>,
}
/// Provides a set clap arguments to configure tvix-[ca]store services.
///
/// This particular variant has defaults tailored for usecases accessing data
/// directly locally, like the `tvix-store daemon` command.
#[derive(clap::Parser, Clone)]
pub struct ServiceUrls {
#[arg(
long,
env,
default_value = "objectstore+file:///var/lib/tvix-store/blobs.object_store"
)]
pub blob_service_addr: String,
#[arg(
long,
env,
default_value = "redb:///var/lib/tvix-store/directories.redb"
)]
pub directory_service_addr: String,
/// Path to a TOML file describing the way the services should be composed
/// Experimental because the format is not final.
/// If specified, the other service addrs are ignored.
#[cfg(feature = "xp-composition-cli")]
#[arg(long, env)]
experimental_store_composition: Option<String>,
}
/// Provides a set clap arguments to configure tvix-[ca]store services.
///
/// This particular variant has defaults tailored for usecases accessing data
/// from another running tvix daemon, via gRPC.
#[derive(clap::Parser, Clone)]
pub struct ServiceUrlsGrpc {
#[arg(long, env, default_value = "grpc+http://[::1]:8000")]
blob_service_addr: String,
#[arg(long, env, default_value = "grpc+http://[::1]:8000")]
directory_service_addr: String,
#[cfg(feature = "xp-composition-cli")]
#[arg(long, env)]
experimental_store_composition: Option<String>,
}
/// Provides a set clap arguments to configure tvix-[ca]store services.
///
/// This particular variant has defaults tailored for usecases keeping all data
/// in memory.
/// It's currently used in tvix-cli, as we don't really care about persistency
/// there yet, and using something else here might make some perf output harder
/// to interpret.
#[derive(clap::Parser, Clone)]
pub struct ServiceUrlsMemory {
#[arg(long, env, default_value = "memory://")]
blob_service_addr: String,
#[arg(long, env, default_value = "memory://")]
directory_service_addr: String,
#[cfg(feature = "xp-composition-cli")]
#[arg(long, env)]
experimental_store_composition: Option<String>,
}
impl From<ServiceUrlsGrpc> for ServiceUrls {
fn from(urls: ServiceUrlsGrpc) -> ServiceUrls {
ServiceUrls {
blob_service_addr: urls.blob_service_addr,
directory_service_addr: urls.directory_service_addr,
#[cfg(feature = "xp-composition-cli")]
experimental_store_composition: urls.experimental_store_composition,
}
}
}
impl From<ServiceUrlsMemory> for ServiceUrls {
fn from(urls: ServiceUrlsMemory) -> ServiceUrls {
ServiceUrls {
blob_service_addr: urls.blob_service_addr,
directory_service_addr: urls.directory_service_addr,
#[cfg(feature = "xp-composition-cli")]
experimental_store_composition: urls.experimental_store_composition,
}
}
}
/// Deserializes service addresses into composition config, configuring each
/// service as the single "root".
/// If the `xp-composition-cli` feature is enabled, and a file specified in the
/// `--experimental-store-composition` parameter, this is used instead.
pub async fn addrs_to_configs(
urls: impl Into<ServiceUrls>,
) -> Result<CompositionConfigs, Box<dyn std::error::Error + Send + Sync>> {
let urls: ServiceUrls = urls.into();
#[cfg(feature = "xp-composition-cli")]
if let Some(conf_path) = urls.experimental_store_composition {
let conf_text = tokio::fs::read_to_string(conf_path).await?;
return Ok(with_registry(&REG, || toml::from_str(&conf_text))?);
}
let mut configs: CompositionConfigs = Default::default();
let blob_service_url = Url::parse(&urls.blob_service_addr)?;
let directory_service_url = Url::parse(&urls.directory_service_addr)?;
configs.blobservices.insert(
"root".into(),
with_registry(&REG, || blob_service_url.try_into())?,
);
configs.directoryservices.insert(
"root".into(),
with_registry(&REG, || directory_service_url.try_into())?,
);
Ok(configs)
}
/// Construct the castore handles from their addrs.
pub async fn construct_services(
urls: impl Into<ServiceUrls>,
) -> Result<
(Arc<dyn BlobService>, Arc<dyn DirectoryService>),
Box<dyn std::error::Error + Send + Sync>,
> {
let configs = addrs_to_configs(urls).await?;
construct_services_from_configs(configs).await
}
/// Construct the castore handles from their addrs.
pub async fn construct_services_from_configs(
configs: CompositionConfigs,
) -> Result<
(Arc<dyn BlobService>, Arc<dyn DirectoryService>),
Box<dyn std::error::Error + Send + Sync>,
> {
let mut comp = Composition::new(&REG);
comp.extend(configs.blobservices);
comp.extend(configs.directoryservices);
let blob_service: Arc<dyn BlobService> = comp.build("root").await?;
let directory_service: Arc<dyn DirectoryService> = comp.build("root").await?;
Ok((blob_service, directory_service))
}

View file

@ -6,6 +6,7 @@ use std::{
}; };
use tokio::io::{self, AsyncWrite}; use tokio::io::{self, AsyncWrite};
use tvix_castore::utils as castore_utils;
use tvix_castore::{blobservice::BlobService, directoryservice::DirectoryService}; use tvix_castore::{blobservice::BlobService, directoryservice::DirectoryService};
use url::Url; use url::Url;
@ -36,19 +37,8 @@ pub struct CompositionConfigs {
/// directly locally, like the `tvix-store daemon` command. /// directly locally, like the `tvix-store daemon` command.
#[derive(clap::Parser, Clone)] #[derive(clap::Parser, Clone)]
pub struct ServiceUrls { pub struct ServiceUrls {
#[arg( #[clap(flatten)]
long, castore_service_addrs: castore_utils::ServiceUrls,
env,
default_value = "objectstore+file:///var/lib/tvix-store/blobs.object_store"
)]
blob_service_addr: String,
#[arg(
long,
env,
default_value = "redb:///var/lib/tvix-store/directories.redb"
)]
directory_service_addr: String,
#[arg(long, env, default_value = "redb:///var/lib/tvix-store/pathinfo.redb")] #[arg(long, env, default_value = "redb:///var/lib/tvix-store/pathinfo.redb")]
path_info_service_addr: String, path_info_service_addr: String,
@ -67,11 +57,8 @@ pub struct ServiceUrls {
/// from another running tvix daemon, via gRPC. /// from another running tvix daemon, via gRPC.
#[derive(clap::Parser, Clone)] #[derive(clap::Parser, Clone)]
pub struct ServiceUrlsGrpc { pub struct ServiceUrlsGrpc {
#[arg(long, env, default_value = "grpc+http://[::1]:8000")] #[clap(flatten)]
blob_service_addr: String, castore_service_addrs: castore_utils::ServiceUrlsGrpc,
#[arg(long, env, default_value = "grpc+http://[::1]:8000")]
directory_service_addr: String,
#[arg(long, env, default_value = "grpc+http://[::1]:8000")] #[arg(long, env, default_value = "grpc+http://[::1]:8000")]
path_info_service_addr: String, path_info_service_addr: String,
@ -90,11 +77,8 @@ pub struct ServiceUrlsGrpc {
/// to interpret. /// to interpret.
#[derive(clap::Parser, Clone)] #[derive(clap::Parser, Clone)]
pub struct ServiceUrlsMemory { pub struct ServiceUrlsMemory {
#[arg(long, env, default_value = "memory://")] #[clap(flatten)]
blob_service_addr: String, castore_service_addrs: castore_utils::ServiceUrlsMemory,
#[arg(long, env, default_value = "memory://")]
directory_service_addr: String,
#[arg(long, env, default_value = "memory://")] #[arg(long, env, default_value = "memory://")]
path_info_service_addr: String, path_info_service_addr: String,
@ -107,8 +91,7 @@ pub struct ServiceUrlsMemory {
impl From<ServiceUrlsGrpc> for ServiceUrls { impl From<ServiceUrlsGrpc> for ServiceUrls {
fn from(urls: ServiceUrlsGrpc) -> ServiceUrls { fn from(urls: ServiceUrlsGrpc) -> ServiceUrls {
ServiceUrls { ServiceUrls {
blob_service_addr: urls.blob_service_addr, castore_service_addrs: urls.castore_service_addrs.into(),
directory_service_addr: urls.directory_service_addr,
path_info_service_addr: urls.path_info_service_addr, path_info_service_addr: urls.path_info_service_addr,
#[cfg(feature = "xp-composition-cli")] #[cfg(feature = "xp-composition-cli")]
experimental_store_composition: urls.experimental_store_composition, experimental_store_composition: urls.experimental_store_composition,
@ -119,8 +102,7 @@ impl From<ServiceUrlsGrpc> for ServiceUrls {
impl From<ServiceUrlsMemory> for ServiceUrls { impl From<ServiceUrlsMemory> for ServiceUrls {
fn from(urls: ServiceUrlsMemory) -> ServiceUrls { fn from(urls: ServiceUrlsMemory) -> ServiceUrls {
ServiceUrls { ServiceUrls {
blob_service_addr: urls.blob_service_addr, castore_service_addrs: urls.castore_service_addrs.into(),
directory_service_addr: urls.directory_service_addr,
path_info_service_addr: urls.path_info_service_addr, path_info_service_addr: urls.path_info_service_addr,
#[cfg(feature = "xp-composition-cli")] #[cfg(feature = "xp-composition-cli")]
experimental_store_composition: urls.experimental_store_composition, experimental_store_composition: urls.experimental_store_composition,
@ -145,8 +127,8 @@ pub async fn addrs_to_configs(
let mut configs: CompositionConfigs = Default::default(); let mut configs: CompositionConfigs = Default::default();
let blob_service_url = Url::parse(&urls.blob_service_addr)?; let blob_service_url = Url::parse(&urls.castore_service_addrs.blob_service_addr)?;
let directory_service_url = Url::parse(&urls.directory_service_addr)?; let directory_service_url = Url::parse(&urls.castore_service_addrs.directory_service_addr)?;
let path_info_service_url = Url::parse(&urls.path_info_service_addr)?; let path_info_service_url = Url::parse(&urls.path_info_service_addr)?;
configs.blobservices.insert( configs.blobservices.insert(