feat(tvix/store/blobsvc): add from_addr

This allows constructing blob stores with a URL syntax at runtime,
by passing the --blob-service-addr arg.

We probably still want to have some builder pattern here, to allow
additional schemes to be registered.

Change-Id: Ie588ff7a7c6fb64c9474dfbd2e4bc5f168dfd778
Reviewed-on: https://cl.tvl.fyi/c/depot/+/8742
Reviewed-by: tazjin <tazjin@tvl.su>
Tested-by: BuildkiteCI
This commit is contained in:
Florian Klink 2023-06-09 19:07:00 +03:00 committed by flokli
parent e409d9cc7e
commit a1acb5bcb3
9 changed files with 657 additions and 24 deletions

View file

@ -6,16 +6,13 @@ use std::path::Path;
use std::path::PathBuf;
use std::sync::Arc;
use tracing_subscriber::prelude::*;
use tvix_store::blobservice::BlobService;
use tvix_store::blobservice::GRPCBlobService;
use tvix_store::blobservice::SledBlobService;
use tvix_store::blobservice;
use tvix_store::directoryservice::DirectoryService;
use tvix_store::directoryservice::GRPCDirectoryService;
use tvix_store::directoryservice::SledDirectoryService;
use tvix_store::pathinfoservice::GRPCPathInfoService;
use tvix_store::pathinfoservice::PathInfoService;
use tvix_store::pathinfoservice::SledPathInfoService;
use tvix_store::proto::blob_service_client::BlobServiceClient;
use tvix_store::proto::blob_service_server::BlobServiceServer;
use tvix_store::proto::directory_service_client::DirectoryServiceClient;
use tvix_store::proto::directory_service_server::DirectoryServiceServer;
@ -55,17 +52,26 @@ enum Commands {
Daemon {
#[arg(long, short = 'l')]
listen_address: Option<String>,
#[arg(long, env, default_value = "sled:///var/lib/tvix-store/blobs.sled")]
blob_service_addr: String,
},
/// Imports a list of paths into the store (not using the daemon)
Import {
#[clap(value_name = "PATH")]
paths: Vec<PathBuf>,
#[arg(long, env, default_value = "grpc+http://[::1]:8000")]
blob_service_addr: String,
},
/// Mounts a tvix-store at the given mountpoint
#[cfg(feature = "fuse")]
Mount {
#[clap(value_name = "PATH")]
dest: PathBuf,
#[arg(long, env, default_value = "grpc+http://[::1]:8000")]
blob_service_addr: String,
},
}
@ -99,10 +105,12 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
tracing::subscriber::set_global_default(subscriber).expect("Unable to set global subscriber");
match cli.command {
Commands::Daemon { listen_address } => {
Commands::Daemon {
listen_address,
blob_service_addr,
} => {
// initialize stores
let blob_service: Arc<dyn BlobService> =
Arc::new(SledBlobService::new("blobs.sled".into())?);
let blob_service = blobservice::from_addr(&blob_service_addr).await?;
let directory_service: Arc<dyn DirectoryService> =
Arc::new(SledDirectoryService::new("directories.sled".into())?);
let path_info_service: Arc<dyn PathInfoService> = Arc::new(SledPathInfoService::new(
@ -142,10 +150,12 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
router.serve(listen_address).await?;
}
Commands::Import { paths } => {
let blob_service = GRPCBlobService::from_client(
BlobServiceClient::connect("http://[::1]:8000").await?,
);
Commands::Import {
paths,
blob_service_addr,
} => {
let blob_service = blobservice::from_addr(&blob_service_addr).await?;
let directory_service = GRPCDirectoryService::from_client(
DirectoryServiceClient::connect("http://[::1]:8000").await?,
);
@ -155,7 +165,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
GRPCPathInfoService::from_client(path_info_service_client.clone());
let io = Arc::new(TvixStoreIO::new(
Arc::new(blob_service),
blob_service,
Arc::new(directory_service),
Arc::new(path_info_service),
));
@ -178,10 +188,12 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
try_join_all(tasks).await?;
}
#[cfg(feature = "fuse")]
Commands::Mount { dest } => {
let blob_service = GRPCBlobService::from_client(
BlobServiceClient::connect("http://[::1]:8000").await?,
);
Commands::Mount {
dest,
blob_service_addr,
} => {
let blob_service = blobservice::from_addr(&blob_service_addr).await?;
let directory_service = GRPCDirectoryService::from_client(
DirectoryServiceClient::connect("http://[::1]:8000").await?,
);
@ -192,7 +204,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
tokio::task::spawn_blocking(move || {
let f = FUSE::new(
Arc::new(blob_service),
blob_service,
Arc::new(directory_service),
Arc::new(path_info_service),
);