From a2e78b62fff67e6c897438bd181e9f90e5b24eab Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marijan=20Petri=C4=8Devi=C4=87?= Date: Sun, 16 Feb 2025 06:30:11 +0100 Subject: [PATCH] refactor(tvix/castore): factor out castore related ServiceUrls utils Change-Id: Ib4cef49a9519ebf88a05035a7261badd312135f0 Reviewed-on: https://cl.tvl.fyi/c/depot/+/13156 Reviewed-by: flokli Tested-by: BuildkiteCI --- tvix/Cargo.lock | 2 + tvix/Cargo.nix | 14 +++- tvix/castore/Cargo.toml | 3 + tvix/castore/src/lib.rs | 1 + tvix/castore/src/utils.rs | 165 ++++++++++++++++++++++++++++++++++++++ tvix/store/src/utils.rs | 40 +++------ 6 files changed, 195 insertions(+), 30 deletions(-) create mode 100644 tvix/castore/src/utils.rs diff --git a/tvix/Cargo.lock b/tvix/Cargo.lock index 51773c080..0824dde9e 100644 --- a/tvix/Cargo.lock +++ b/tvix/Cargo.lock @@ -5037,6 +5037,7 @@ dependencies = [ "blake3", "bstr", "bytes", + "clap", "data-encoding", "digest", "erased-serde", @@ -5070,6 +5071,7 @@ dependencies = [ "tokio-tar", "tokio-test", "tokio-util", + "toml 0.8.19", "tonic", "tonic-build", "tonic-reflection", diff --git a/tvix/Cargo.nix b/tvix/Cargo.nix index 5e10c5df0..2266ebc28 100644 --- a/tvix/Cargo.nix +++ b/tvix/Cargo.nix @@ -16628,6 +16628,11 @@ rec { name = "bytes"; packageId = "bytes"; } + { + name = "clap"; + packageId = "clap"; + features = [ "derive" "env" ]; + } { name = "data-encoding"; packageId = "data-encoding"; @@ -16738,6 +16743,11 @@ rec { packageId = "tokio-util"; features = [ "io" "io-util" "codec" ]; } + { + name = "toml"; + packageId = "toml 0.8.19"; + optional = true; + } { name = "tonic"; packageId = "tonic"; @@ -16864,10 +16874,12 @@ rec { "default" = [ "cloud" ]; "fs" = [ "dep:fuse-backend-rs" "dep:threadpool" "dep:libc" ]; "fuse" = [ "fs" ]; + "toml" = [ "dep:toml" ]; "tonic-reflection" = [ "dep:tonic-reflection" ]; "virtiofs" = [ "fs" "dep:vhost" "dep:vhost-user-backend" "dep:virtio-queue" "dep:vm-memory" "dep:vmm-sys-util" "dep:virtio-bindings" "fuse-backend-rs?/vhost-user-fs" "fuse-backend-rs?/virtiofs" ]; + "xp-composition-cli" = [ "toml" "xp-composition-url-refs" ]; }; - resolvedDefaultFeatures = [ "cloud" "default" "fs" "fuse" "integration" "tonic-reflection" "virtiofs" "xp-composition-url-refs" ]; + resolvedDefaultFeatures = [ "cloud" "default" "fs" "fuse" "integration" "toml" "tonic-reflection" "virtiofs" "xp-composition-cli" "xp-composition-url-refs" ]; }; "tvix-cli" = rec { crateName = "tvix-cli"; diff --git a/tvix/castore/Cargo.toml b/tvix/castore/Cargo.toml index 09a4074aa..0a99f306f 100644 --- a/tvix/castore/Cargo.toml +++ b/tvix/castore/Cargo.toml @@ -10,6 +10,7 @@ async-tempfile.workspace = true blake3 = { workspace = true, features = ["rayon", "std", "traits-preview"] } bstr.workspace = true bytes.workspace = true +clap = { workspace = true, features = ["derive", "env"] } data-encoding.workspace = true digest.workspace = true fastcdc = { workspace = true, features = ["tokio"] } @@ -23,6 +24,7 @@ tokio-stream = { workspace = true, features = ["fs", "net"] } tokio-util = { workspace = true, features = ["io", "io-util", "codec"] } tokio-tar.workspace = true tokio = { workspace = true, features = ["fs", "macros", "net", "rt", "rt-multi-thread", "signal"] } +toml = { version = "0.8.19", optional = true } tonic.workspace = true tower.workspace = true tracing.workspace = true @@ -91,6 +93,7 @@ virtiofs = [ ] fuse = ["fs"] tonic-reflection = ["dep:tonic-reflection"] +xp-composition-cli = ["toml", "xp-composition-url-refs"] # This feature enables anonymous url syntax which might inherently expose # arbitrary composition possibilities to the user. xp-composition-url-refs = [] diff --git a/tvix/castore/src/lib.rs b/tvix/castore/src/lib.rs index 78018d8df..8a293b76f 100644 --- a/tvix/castore/src/lib.rs +++ b/tvix/castore/src/lib.rs @@ -7,6 +7,7 @@ pub mod composition; pub mod directoryservice; pub mod fixtures; pub mod refscan; +pub mod utils; #[cfg(feature = "fs")] pub mod fs; diff --git a/tvix/castore/src/utils.rs b/tvix/castore/src/utils.rs new file mode 100644 index 000000000..381f72cbe --- /dev/null +++ b/tvix/castore/src/utils.rs @@ -0,0 +1,165 @@ +use std::collections::HashMap; +use std::sync::Arc; +use url::Url; + +use crate::blobservice::BlobService; +use crate::composition::{ + with_registry, Composition, DeserializeWithRegistry, ServiceBuilder, REG, +}; +use crate::directoryservice::DirectoryService; + +#[derive(serde::Deserialize, Default)] +pub struct CompositionConfigs { + pub blobservices: + HashMap>>>, + pub directoryservices: HashMap< + String, + DeserializeWithRegistry>>, + >, +} + +/// Provides a set clap arguments to configure tvix-[ca]store services. +/// +/// This particular variant has defaults tailored for usecases accessing data +/// directly locally, like the `tvix-store daemon` command. +#[derive(clap::Parser, Clone)] +pub struct ServiceUrls { + #[arg( + long, + env, + default_value = "objectstore+file:///var/lib/tvix-store/blobs.object_store" + )] + pub blob_service_addr: String, + + #[arg( + long, + env, + default_value = "redb:///var/lib/tvix-store/directories.redb" + )] + pub directory_service_addr: String, + + /// Path to a TOML file describing the way the services should be composed + /// Experimental because the format is not final. + /// If specified, the other service addrs are ignored. + #[cfg(feature = "xp-composition-cli")] + #[arg(long, env)] + experimental_store_composition: Option, +} + +/// Provides a set clap arguments to configure tvix-[ca]store services. +/// +/// This particular variant has defaults tailored for usecases accessing data +/// from another running tvix daemon, via gRPC. +#[derive(clap::Parser, Clone)] +pub struct ServiceUrlsGrpc { + #[arg(long, env, default_value = "grpc+http://[::1]:8000")] + blob_service_addr: String, + + #[arg(long, env, default_value = "grpc+http://[::1]:8000")] + directory_service_addr: String, + + #[cfg(feature = "xp-composition-cli")] + #[arg(long, env)] + experimental_store_composition: Option, +} + +/// Provides a set clap arguments to configure tvix-[ca]store services. +/// +/// This particular variant has defaults tailored for usecases keeping all data +/// in memory. +/// It's currently used in tvix-cli, as we don't really care about persistency +/// there yet, and using something else here might make some perf output harder +/// to interpret. +#[derive(clap::Parser, Clone)] +pub struct ServiceUrlsMemory { + #[arg(long, env, default_value = "memory://")] + blob_service_addr: String, + + #[arg(long, env, default_value = "memory://")] + directory_service_addr: String, + + #[cfg(feature = "xp-composition-cli")] + #[arg(long, env)] + experimental_store_composition: Option, +} + +impl From for ServiceUrls { + fn from(urls: ServiceUrlsGrpc) -> ServiceUrls { + ServiceUrls { + blob_service_addr: urls.blob_service_addr, + directory_service_addr: urls.directory_service_addr, + #[cfg(feature = "xp-composition-cli")] + experimental_store_composition: urls.experimental_store_composition, + } + } +} + +impl From for ServiceUrls { + fn from(urls: ServiceUrlsMemory) -> ServiceUrls { + ServiceUrls { + blob_service_addr: urls.blob_service_addr, + directory_service_addr: urls.directory_service_addr, + #[cfg(feature = "xp-composition-cli")] + experimental_store_composition: urls.experimental_store_composition, + } + } +} + +/// Deserializes service addresses into composition config, configuring each +/// service as the single "root". +/// If the `xp-composition-cli` feature is enabled, and a file specified in the +/// `--experimental-store-composition` parameter, this is used instead. +pub async fn addrs_to_configs( + urls: impl Into, +) -> Result> { + let urls: ServiceUrls = urls.into(); + + #[cfg(feature = "xp-composition-cli")] + if let Some(conf_path) = urls.experimental_store_composition { + let conf_text = tokio::fs::read_to_string(conf_path).await?; + return Ok(with_registry(®, || toml::from_str(&conf_text))?); + } + + let mut configs: CompositionConfigs = Default::default(); + + let blob_service_url = Url::parse(&urls.blob_service_addr)?; + let directory_service_url = Url::parse(&urls.directory_service_addr)?; + configs.blobservices.insert( + "root".into(), + with_registry(®, || blob_service_url.try_into())?, + ); + configs.directoryservices.insert( + "root".into(), + with_registry(®, || directory_service_url.try_into())?, + ); + Ok(configs) +} + +/// Construct the castore handles from their addrs. +pub async fn construct_services( + urls: impl Into, +) -> Result< + (Arc, Arc), + Box, +> { + let configs = addrs_to_configs(urls).await?; + construct_services_from_configs(configs).await +} + +/// Construct the castore handles from their addrs. +pub async fn construct_services_from_configs( + configs: CompositionConfigs, +) -> Result< + (Arc, Arc), + Box, +> { + let mut comp = Composition::new(®); + + comp.extend(configs.blobservices); + comp.extend(configs.directoryservices); + + let blob_service: Arc = comp.build("root").await?; + let directory_service: Arc = comp.build("root").await?; + + Ok((blob_service, directory_service)) +} diff --git a/tvix/store/src/utils.rs b/tvix/store/src/utils.rs index 19e0dce56..22ec48068 100644 --- a/tvix/store/src/utils.rs +++ b/tvix/store/src/utils.rs @@ -6,6 +6,7 @@ use std::{ }; use tokio::io::{self, AsyncWrite}; +use tvix_castore::utils as castore_utils; use tvix_castore::{blobservice::BlobService, directoryservice::DirectoryService}; use url::Url; @@ -36,19 +37,8 @@ pub struct CompositionConfigs { /// directly locally, like the `tvix-store daemon` command. #[derive(clap::Parser, Clone)] pub struct ServiceUrls { - #[arg( - long, - env, - default_value = "objectstore+file:///var/lib/tvix-store/blobs.object_store" - )] - blob_service_addr: String, - - #[arg( - long, - env, - default_value = "redb:///var/lib/tvix-store/directories.redb" - )] - directory_service_addr: String, + #[clap(flatten)] + castore_service_addrs: castore_utils::ServiceUrls, #[arg(long, env, default_value = "redb:///var/lib/tvix-store/pathinfo.redb")] path_info_service_addr: String, @@ -67,11 +57,8 @@ pub struct ServiceUrls { /// from another running tvix daemon, via gRPC. #[derive(clap::Parser, Clone)] pub struct ServiceUrlsGrpc { - #[arg(long, env, default_value = "grpc+http://[::1]:8000")] - blob_service_addr: String, - - #[arg(long, env, default_value = "grpc+http://[::1]:8000")] - directory_service_addr: String, + #[clap(flatten)] + castore_service_addrs: castore_utils::ServiceUrlsGrpc, #[arg(long, env, default_value = "grpc+http://[::1]:8000")] path_info_service_addr: String, @@ -90,11 +77,8 @@ pub struct ServiceUrlsGrpc { /// to interpret. #[derive(clap::Parser, Clone)] pub struct ServiceUrlsMemory { - #[arg(long, env, default_value = "memory://")] - blob_service_addr: String, - - #[arg(long, env, default_value = "memory://")] - directory_service_addr: String, + #[clap(flatten)] + castore_service_addrs: castore_utils::ServiceUrlsMemory, #[arg(long, env, default_value = "memory://")] path_info_service_addr: String, @@ -107,8 +91,7 @@ pub struct ServiceUrlsMemory { impl From for ServiceUrls { fn from(urls: ServiceUrlsGrpc) -> ServiceUrls { ServiceUrls { - blob_service_addr: urls.blob_service_addr, - directory_service_addr: urls.directory_service_addr, + castore_service_addrs: urls.castore_service_addrs.into(), path_info_service_addr: urls.path_info_service_addr, #[cfg(feature = "xp-composition-cli")] experimental_store_composition: urls.experimental_store_composition, @@ -119,8 +102,7 @@ impl From for ServiceUrls { impl From for ServiceUrls { fn from(urls: ServiceUrlsMemory) -> ServiceUrls { ServiceUrls { - blob_service_addr: urls.blob_service_addr, - directory_service_addr: urls.directory_service_addr, + castore_service_addrs: urls.castore_service_addrs.into(), path_info_service_addr: urls.path_info_service_addr, #[cfg(feature = "xp-composition-cli")] experimental_store_composition: urls.experimental_store_composition, @@ -145,8 +127,8 @@ pub async fn addrs_to_configs( let mut configs: CompositionConfigs = Default::default(); - let blob_service_url = Url::parse(&urls.blob_service_addr)?; - let directory_service_url = Url::parse(&urls.directory_service_addr)?; + let blob_service_url = Url::parse(&urls.castore_service_addrs.blob_service_addr)?; + let directory_service_url = Url::parse(&urls.castore_service_addrs.directory_service_addr)?; let path_info_service_url = Url::parse(&urls.path_info_service_addr)?; configs.blobservices.insert(