refactor(tvix): move service addrs into shared clap struct

Change-Id: I7cab29ecfa1823c2103b4c47b7d784bc31459d55
Reviewed-on: https://cl.tvl.fyi/c/depot/+/12008
Tested-by: BuildkiteCI
Reviewed-by: flokli <flokli@flokli.de>
Autosubmit: yuka <yuka@yuka.dev>
This commit is contained in:
Yureka 2024-07-21 16:41:09 +02:00 committed by yuka
parent 6774d9c59c
commit 67335c41b7
12 changed files with 132 additions and 149 deletions

View file

@ -18,6 +18,7 @@ use tvix_castore::import::fs::ingest_path;
use tvix_store::nar::NarCalculationService;
use tvix_store::proto::NarInfo;
use tvix_store::proto::PathInfo;
use tvix_store::utils::{ServiceUrls, ServiceUrlsGrpc};
use tvix_castore::proto::blob_service_server::BlobServiceServer;
use tvix_castore::proto::directory_service_server::DirectoryServiceServer;
@ -67,22 +68,8 @@ enum Commands {
#[clap(flatten)]
listen_args: tokio_listener::ListenerAddressLFlag,
#[arg(
long,
env,
default_value = "objectstore+file:///var/lib/tvix-store/blobs.object_store"
)]
blob_service_addr: String,
#[arg(
long,
env,
default_value = "sled:///var/lib/tvix-store/directories.sled"
)]
directory_service_addr: String,
#[arg(long, env, default_value = "sled:///var/lib/tvix-store/pathinfo.sled")]
path_info_service_addr: String,
#[clap(flatten)]
service_addrs: ServiceUrls,
/// URL to a PathInfoService that's considered "remote".
/// If set, the other one is considered "local", and a "cache" for the
@ -95,26 +82,14 @@ enum Commands {
#[clap(value_name = "PATH")]
paths: Vec<PathBuf>,
#[arg(long, env, default_value = "grpc+http://[::1]:8000")]
blob_service_addr: String,
#[arg(long, env, default_value = "grpc+http://[::1]:8000")]
directory_service_addr: String,
#[arg(long, env, default_value = "grpc+http://[::1]:8000")]
path_info_service_addr: String,
#[clap(flatten)]
service_addrs: ServiceUrlsGrpc,
},
/// Copies a list of store paths on the system into tvix-store.
Copy {
#[arg(long, env, default_value = "grpc+http://[::1]:8000")]
blob_service_addr: String,
#[arg(long, env, default_value = "grpc+http://[::1]:8000")]
directory_service_addr: String,
#[arg(long, env, default_value = "grpc+http://[::1]:8000")]
path_info_service_addr: String,
#[clap(flatten)]
service_addrs: ServiceUrlsGrpc,
/// A path pointing to a JSON file produced by the Nix
/// `__structuredAttrs` containing reference graph information provided
@ -134,14 +109,8 @@ enum Commands {
#[clap(value_name = "PATH")]
dest: PathBuf,
#[arg(long, env, default_value = "grpc+http://[::1]:8000")]
blob_service_addr: String,
#[arg(long, env, default_value = "grpc+http://[::1]:8000")]
directory_service_addr: String,
#[arg(long, env, default_value = "grpc+http://[::1]:8000")]
path_info_service_addr: String,
#[clap(flatten)]
service_addrs: ServiceUrlsGrpc,
/// Number of FUSE threads to spawn.
#[arg(long, env, default_value_t = default_threads())]
@ -170,14 +139,8 @@ enum Commands {
#[clap(value_name = "PATH")]
socket: PathBuf,
#[arg(long, env, default_value = "grpc+http://[::1]:8000")]
blob_service_addr: String,
#[arg(long, env, default_value = "grpc+http://[::1]:8000")]
directory_service_addr: String,
#[arg(long, env, default_value = "grpc+http://[::1]:8000")]
path_info_service_addr: String,
#[clap(flatten)]
service_addrs: ServiceUrlsGrpc,
/// Whether to list elements at the root of the mount point.
/// This is useful if your PathInfoService doesn't provide an
@ -203,17 +166,11 @@ async fn run_cli(cli: Cli) -> Result<(), Box<dyn std::error::Error + Send + Sync
match cli.command {
Commands::Daemon {
listen_args,
blob_service_addr,
directory_service_addr,
path_info_service_addr,
service_addrs,
remote_path_info_service_addr,
} => {
// initialize stores
let mut configs = tvix_store::utils::addrs_to_configs(
blob_service_addr,
directory_service_addr,
path_info_service_addr,
)?;
let mut configs = tvix_store::utils::addrs_to_configs(service_addrs)?;
// if remote_path_info_service_addr has been specified,
// update path_info_service to point to a cache combining the two.
@ -295,18 +252,11 @@ async fn run_cli(cli: Cli) -> Result<(), Box<dyn std::error::Error + Send + Sync
}
Commands::Import {
paths,
blob_service_addr,
directory_service_addr,
path_info_service_addr,
service_addrs,
} => {
// FUTUREWORK: allow flat for single files?
let (blob_service, directory_service, path_info_service, nar_calculation_service) =
tvix_store::utils::construct_services(
blob_service_addr,
directory_service_addr,
path_info_service_addr,
)
.await?;
tvix_store::utils::construct_services(service_addrs).await?;
// Arc NarCalculationService, as we clone it .
let nar_calculation_service: Arc<dyn NarCalculationService> =
@ -345,18 +295,11 @@ async fn run_cli(cli: Cli) -> Result<(), Box<dyn std::error::Error + Send + Sync
try_join_all(tasks).await?;
}
Commands::Copy {
blob_service_addr,
directory_service_addr,
path_info_service_addr,
service_addrs,
reference_graph_path,
} => {
let (blob_service, directory_service, path_info_service, _nar_calculation_service) =
tvix_store::utils::construct_services(
blob_service_addr,
directory_service_addr,
path_info_service_addr,
)
.await?;
tvix_store::utils::construct_services(service_addrs).await?;
// Parse the file at reference_graph_path.
let reference_graph_json = tokio::fs::read(&reference_graph_path).await?;
@ -457,21 +400,14 @@ async fn run_cli(cli: Cli) -> Result<(), Box<dyn std::error::Error + Send + Sync
#[cfg(feature = "fuse")]
Commands::Mount {
dest,
blob_service_addr,
directory_service_addr,
path_info_service_addr,
service_addrs,
list_root,
threads,
allow_other,
show_xattr,
} => {
let (blob_service, directory_service, path_info_service, _nar_calculation_service) =
tvix_store::utils::construct_services(
blob_service_addr,
directory_service_addr,
path_info_service_addr,
)
.await?;
tvix_store::utils::construct_services(service_addrs).await?;
let fuse_daemon = tokio::task::spawn_blocking(move || {
let fs = make_fs(
@ -507,19 +443,12 @@ async fn run_cli(cli: Cli) -> Result<(), Box<dyn std::error::Error + Send + Sync
#[cfg(feature = "virtiofs")]
Commands::VirtioFs {
socket,
blob_service_addr,
directory_service_addr,
path_info_service_addr,
service_addrs,
list_root,
show_xattr,
} => {
let (blob_service, directory_service, path_info_service, _nar_calculation_service) =
tvix_store::utils::construct_services(
blob_service_addr,
directory_service_addr,
path_info_service_addr,
)
.await?;
tvix_store::utils::construct_services(service_addrs).await?;
tokio::task::spawn_blocking(move || {
let fs = make_fs(

View file

@ -29,16 +29,81 @@ pub struct CompositionConfigs {
>,
}
#[derive(clap::Parser, Clone)]
pub struct ServiceUrls {
#[arg(
long,
env,
default_value = "objectstore+file:///var/lib/tvix-store/blobs.object_store"
)]
blob_service_addr: String,
#[arg(
long,
env,
default_value = "sled:///var/lib/tvix-store/directories.sled"
)]
directory_service_addr: String,
#[arg(long, env, default_value = "sled:///var/lib/tvix-store/pathinfo.sled")]
path_info_service_addr: String,
}
/// like ServiceUrls, but with different clap defaults
#[derive(clap::Parser, Clone)]
pub struct ServiceUrlsGrpc {
#[arg(long, env, default_value = "grpc+http://[::1]:8000")]
blob_service_addr: String,
#[arg(long, env, default_value = "grpc+http://[::1]:8000")]
directory_service_addr: String,
#[arg(long, env, default_value = "grpc+http://[::1]:8000")]
path_info_service_addr: String,
}
/// like ServiceUrls, but with different clap defaults
#[derive(clap::Parser, Clone)]
pub struct ServiceUrlsMemory {
#[arg(long, env, default_value = "memory://")]
blob_service_addr: String,
#[arg(long, env, default_value = "memory://")]
directory_service_addr: String,
#[arg(long, env, default_value = "memory://")]
path_info_service_addr: String,
}
impl From<ServiceUrlsGrpc> for ServiceUrls {
fn from(urls: ServiceUrlsGrpc) -> ServiceUrls {
ServiceUrls {
blob_service_addr: urls.blob_service_addr,
directory_service_addr: urls.directory_service_addr,
path_info_service_addr: urls.path_info_service_addr,
}
}
}
impl From<ServiceUrlsMemory> for ServiceUrls {
fn from(urls: ServiceUrlsMemory) -> ServiceUrls {
ServiceUrls {
blob_service_addr: urls.blob_service_addr,
directory_service_addr: urls.directory_service_addr,
path_info_service_addr: urls.path_info_service_addr,
}
}
}
pub fn addrs_to_configs(
blob_service_addr: impl AsRef<str>,
directory_service_addr: impl AsRef<str>,
path_info_service_addr: impl AsRef<str>,
urls: impl Into<ServiceUrls>,
) -> Result<CompositionConfigs, Box<dyn std::error::Error + Send + Sync>> {
let urls: ServiceUrls = urls.into();
let mut configs: CompositionConfigs = Default::default();
let blob_service_url = Url::parse(blob_service_addr.as_ref())?;
let directory_service_url = Url::parse(directory_service_addr.as_ref())?;
let path_info_service_url = Url::parse(path_info_service_addr.as_ref())?;
let blob_service_url = Url::parse(&urls.blob_service_addr)?;
let directory_service_url = Url::parse(&urls.directory_service_addr)?;
let path_info_service_url = Url::parse(&urls.path_info_service_addr)?;
configs.blobservices.insert(
"default".into(),
@ -58,9 +123,7 @@ pub fn addrs_to_configs(
/// Construct the store handles from their addrs.
pub async fn construct_services(
blob_service_addr: impl AsRef<str>,
directory_service_addr: impl AsRef<str>,
path_info_service_addr: impl AsRef<str>,
urls: impl Into<ServiceUrls>,
) -> Result<
(
Arc<dyn BlobService>,
@ -70,11 +133,7 @@ pub async fn construct_services(
),
Box<dyn std::error::Error + Send + Sync>,
> {
let configs = addrs_to_configs(
blob_service_addr,
directory_service_addr,
path_info_service_addr,
)?;
let configs = addrs_to_configs(urls)?;
construct_services_from_configs(configs).await
}