refactor(tvix/store/pathinfosvc): add from_addr

Change-Id: I24e822351a837fce2aed568a647d009099ef32ec
Reviewed-on: https://cl.tvl.fyi/c/depot/+/8747
Reviewed-by: tazjin <tazjin@tvl.su>
Autosubmit: flokli <flokli@flokli.de>
Tested-by: BuildkiteCI
This commit is contained in:
Florian Klink 2023-06-12 00:04:00 +03:00 committed by clbot
parent bb7c76739a
commit 35bff2bda6
6 changed files with 552 additions and 18 deletions

View file

@ -8,13 +8,10 @@ use std::sync::Arc;
use tracing_subscriber::prelude::*;
use tvix_store::blobservice;
use tvix_store::directoryservice;
use tvix_store::pathinfoservice::GRPCPathInfoService;
use tvix_store::pathinfoservice::PathInfoService;
use tvix_store::pathinfoservice::SledPathInfoService;
use tvix_store::pathinfoservice;
use tvix_store::proto::blob_service_server::BlobServiceServer;
use tvix_store::proto::directory_service_server::DirectoryServiceServer;
use tvix_store::proto::node::Node;
use tvix_store::proto::path_info_service_client::PathInfoServiceClient;
use tvix_store::proto::path_info_service_server::PathInfoServiceServer;
use tvix_store::proto::GRPCBlobServiceWrapper;
use tvix_store::proto::GRPCDirectoryServiceWrapper;
@ -59,6 +56,9 @@ enum Commands {
default_value = "sled:///var/lib/tvix-store/directories.sled"
)]
directory_service_addr: String,
#[arg(long, env, default_value = "sled:///var/lib/tvix-store/pathinfo.sled")]
path_info_service_addr: String,
},
/// Imports a list of paths into the store (not using the daemon)
Import {
@ -70,6 +70,9 @@ enum Commands {
#[arg(long, env, default_value = "grpc+http://[::1]:8000")]
directory_service_addr: String,
#[arg(long, env, default_value = "grpc+http://[::1]:8000")]
path_info_service_addr: String,
},
/// Mounts a tvix-store at the given mountpoint
#[cfg(feature = "fuse")]
@ -82,6 +85,9 @@ enum Commands {
#[arg(long, env, default_value = "grpc+http://[::1]:8000")]
directory_service_addr: String,
#[arg(long, env, default_value = "grpc+http://[::1]:8000")]
path_info_service_addr: String,
},
}
@ -119,15 +125,16 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
listen_address,
blob_service_addr,
directory_service_addr,
path_info_service_addr,
} => {
// initialize stores
let blob_service = blobservice::from_addr(&blob_service_addr).await?;
let directory_service = directoryservice::from_addr(&directory_service_addr)?;
let path_info_service: Arc<dyn PathInfoService> = Arc::new(SledPathInfoService::new(
"pathinfo.sled".into(),
let path_info_service = pathinfoservice::from_addr(
&path_info_service_addr,
blob_service.clone(),
directory_service.clone(),
)?);
)?;
let listen_address = listen_address
.unwrap_or_else(|| "[::]:8000".to_string())
@ -164,18 +171,20 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
paths,
blob_service_addr,
directory_service_addr,
path_info_service_addr,
} => {
let blob_service = blobservice::from_addr(&blob_service_addr).await?;
let directory_service = directoryservice::from_addr(&directory_service_addr)?;
let path_info_service_client =
PathInfoServiceClient::connect("http://[::1]:8000").await?;
let path_info_service =
GRPCPathInfoService::from_client(path_info_service_client.clone());
let path_info_service = pathinfoservice::from_addr(
&path_info_service_addr,
blob_service.clone(),
directory_service.clone(),
)?;
let io = Arc::new(TvixStoreIO::new(
blob_service,
directory_service,
Arc::new(path_info_service),
path_info_service,
));
let tasks = paths
@ -200,16 +209,18 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
dest,
blob_service_addr,
directory_service_addr,
path_info_service_addr,
} => {
let blob_service = blobservice::from_addr(&blob_service_addr).await?;
let directory_service = directoryservice::from_addr(&directory_service_addr)?;
let path_info_service_client =
PathInfoServiceClient::connect("http://[::1]:8000").await?;
let path_info_service =
GRPCPathInfoService::from_client(path_info_service_client.clone());
let path_info_service = pathinfoservice::from_addr(
&path_info_service_addr,
blob_service.clone(),
directory_service.clone(),
)?;
tokio::task::spawn_blocking(move || {
let f = FUSE::new(blob_service, directory_service, Arc::new(path_info_service));
let f = FUSE::new(blob_service, directory_service, path_info_service);
fuser::mount2(f, &dest, &[])
})
.await??