feat(tvix/store): move to daemon subcommand, add import subcommand
This exposes the previous default behavior at the `tvix-store daemon` subcommand. It also adds a `tvix-store import` command, which will ingest a given path into the store. Change-Id: Ide14f1d409b9364e7f98090690c744326486e470 Reviewed-on: https://cl.tvl.fyi/c/depot/+/8166 Tested-by: BuildkiteCI Reviewed-by: raitobezarius <tvl@lahfa.xyz>
This commit is contained in:
		
							parent
							
								
									6f6ddccc92
								
							
						
					
					
						commit
						a4d06b68d8
					
				
					 1 changed files with 101 additions and 40 deletions
				
			
		|  | @ -1,7 +1,11 @@ | ||||||
|  | use clap::Subcommand; | ||||||
|  | use data_encoding::BASE64; | ||||||
|  | use std::path::PathBuf; | ||||||
| use tracing_subscriber::prelude::*; | use tracing_subscriber::prelude::*; | ||||||
| use tvix_store::blobservice::SledBlobService; | use tvix_store::blobservice::SledBlobService; | ||||||
| use tvix_store::chunkservice::SledChunkService; | use tvix_store::chunkservice::SledChunkService; | ||||||
| use tvix_store::directoryservice::SledDirectoryService; | use tvix_store::directoryservice::SledDirectoryService; | ||||||
|  | use tvix_store::import::import_path; | ||||||
| use tvix_store::nar::NonCachingNARCalculationService; | use tvix_store::nar::NonCachingNARCalculationService; | ||||||
| use tvix_store::pathinfoservice::SledPathInfoService; | use tvix_store::pathinfoservice::SledPathInfoService; | ||||||
| use tvix_store::proto::blob_service_server::BlobServiceServer; | use tvix_store::proto::blob_service_server::BlobServiceServer; | ||||||
|  | @ -21,25 +25,36 @@ use tracing::{info, Level}; | ||||||
| #[derive(Parser)] | #[derive(Parser)] | ||||||
| #[command(author, version, about, long_about = None)] | #[command(author, version, about, long_about = None)] | ||||||
| struct Cli { | struct Cli { | ||||||
|     #[clap(long, short = 'l')] |  | ||||||
|     listen_address: Option<String>, |  | ||||||
|     /// Whether to log in JSON
 |     /// Whether to log in JSON
 | ||||||
|     #[clap(long)] |     #[arg(long)] | ||||||
|     json: bool, |     json: bool, | ||||||
| 
 | 
 | ||||||
|     #[clap(long)] |     #[arg(long)] | ||||||
|     log_level: Option<Level>, |     log_level: Option<Level>, | ||||||
|  | 
 | ||||||
|  |     #[command(subcommand)] | ||||||
|  |     command: Commands, | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | #[derive(Subcommand)] | ||||||
|  | enum Commands { | ||||||
|  |     /// Runs the tvix-store daemon.
 | ||||||
|  |     Daemon { | ||||||
|  |         #[arg(long, short = 'l')] | ||||||
|  |         listen_address: Option<String>, | ||||||
|  |     }, | ||||||
|  |     /// Imports a list of paths into the store (not using the daemon)
 | ||||||
|  |     Import { | ||||||
|  |         #[clap(value_name = "PATH")] | ||||||
|  |         paths: Vec<PathBuf>, | ||||||
|  |     }, | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| #[tokio::main] | #[tokio::main] | ||||||
| async fn main() -> Result<(), Box<dyn std::error::Error>> { | async fn main() -> Result<(), Box<dyn std::error::Error>> { | ||||||
|     let cli = Cli::parse(); |     let cli = Cli::parse(); | ||||||
|     let listen_address = cli |  | ||||||
|         .listen_address |  | ||||||
|         .unwrap_or_else(|| "[::]:8000".to_string()) |  | ||||||
|         .parse() |  | ||||||
|         .unwrap(); |  | ||||||
| 
 | 
 | ||||||
|  |     // configure log settings
 | ||||||
|     let level = cli.log_level.unwrap_or(Level::INFO); |     let level = cli.log_level.unwrap_or(Level::INFO); | ||||||
| 
 | 
 | ||||||
|     let subscriber = tracing_subscriber::registry() |     let subscriber = tracing_subscriber::registry() | ||||||
|  | @ -64,43 +79,89 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> { | ||||||
| 
 | 
 | ||||||
|     tracing::subscriber::set_global_default(subscriber).expect("Unable to set global subscriber"); |     tracing::subscriber::set_global_default(subscriber).expect("Unable to set global subscriber"); | ||||||
| 
 | 
 | ||||||
|     let mut server = Server::builder(); |     // initialize stores
 | ||||||
| 
 |     let mut blob_service = SledBlobService::new("blobs.sled".into())?; | ||||||
|     let blob_service = SledBlobService::new("blobs.sled".into())?; |     let mut chunk_service = SledChunkService::new("chunks.sled".into())?; | ||||||
|     let chunk_service = SledChunkService::new("chunks.sled".into())?; |     let mut directory_service = SledDirectoryService::new("directories.sled".into())?; | ||||||
|     let directory_service = SledDirectoryService::new("directories.sled".into())?; |  | ||||||
|     let path_info_service = SledPathInfoService::new("pathinfo.sled".into())?; |     let path_info_service = SledPathInfoService::new("pathinfo.sled".into())?; | ||||||
| 
 | 
 | ||||||
|     let nar_calculation_service = NonCachingNARCalculationService::new( |     match cli.command { | ||||||
|         blob_service.clone(), |         Commands::Daemon { listen_address } => { | ||||||
|         chunk_service.clone(), |             let listen_address = listen_address | ||||||
|         directory_service.clone(), |                 .unwrap_or_else(|| "[::]:8000".to_string()) | ||||||
|     ); |                 .parse() | ||||||
|  |                 .unwrap(); | ||||||
| 
 | 
 | ||||||
|     let mut router = server |             let mut server = Server::builder(); | ||||||
|         .add_service(BlobServiceServer::new(GRPCBlobServiceWrapper::new( |  | ||||||
|             blob_service, |  | ||||||
|             chunk_service, |  | ||||||
|         ))) |  | ||||||
|         .add_service(DirectoryServiceServer::new( |  | ||||||
|             GRPCDirectoryServiceWrapper::from(directory_service), |  | ||||||
|         )) |  | ||||||
|         .add_service(PathInfoServiceServer::new(GRPCPathInfoServiceWrapper::new( |  | ||||||
|             path_info_service, |  | ||||||
|             nar_calculation_service, |  | ||||||
|         ))); |  | ||||||
| 
 | 
 | ||||||
|     #[cfg(feature = "reflection")] |             let nar_calculation_service = NonCachingNARCalculationService::new( | ||||||
|     { |                 blob_service.clone(), | ||||||
|         let reflection_svc = tonic_reflection::server::Builder::configure() |                 chunk_service.clone(), | ||||||
|             .register_encoded_file_descriptor_set(FILE_DESCRIPTOR_SET) |                 directory_service.clone(), | ||||||
|             .build()?; |             ); | ||||||
|         router = router.add_service(reflection_svc); |  | ||||||
|     } |  | ||||||
| 
 | 
 | ||||||
|     info!("tvix-store listening on {}", listen_address); |             let mut router = server | ||||||
|  |                 .add_service(BlobServiceServer::new(GRPCBlobServiceWrapper::new( | ||||||
|  |                     blob_service, | ||||||
|  |                     chunk_service, | ||||||
|  |                 ))) | ||||||
|  |                 .add_service(DirectoryServiceServer::new( | ||||||
|  |                     GRPCDirectoryServiceWrapper::from(directory_service), | ||||||
|  |                 )) | ||||||
|  |                 .add_service(PathInfoServiceServer::new(GRPCPathInfoServiceWrapper::new( | ||||||
|  |                     path_info_service, | ||||||
|  |                     nar_calculation_service, | ||||||
|  |                 ))); | ||||||
| 
 | 
 | ||||||
|     router.serve(listen_address).await?; |             #[cfg(feature = "reflection")] | ||||||
|  |             { | ||||||
|  |                 let reflection_svc = tonic_reflection::server::Builder::configure() | ||||||
|  |                     .register_encoded_file_descriptor_set(FILE_DESCRIPTOR_SET) | ||||||
|  |                     .build()?; | ||||||
|  |                 router = router.add_service(reflection_svc); | ||||||
|  |             } | ||||||
| 
 | 
 | ||||||
|  |             info!("tvix-store listening on {}", listen_address); | ||||||
|  | 
 | ||||||
|  |             router.serve(listen_address).await?; | ||||||
|  |         } | ||||||
|  |         Commands::Import { paths } => { | ||||||
|  |             for path in paths { | ||||||
|  |                 let root_node = import_path( | ||||||
|  |                     &mut blob_service, | ||||||
|  |                     &mut chunk_service, | ||||||
|  |                     &mut directory_service, | ||||||
|  |                     &path, | ||||||
|  |                 )?; | ||||||
|  | 
 | ||||||
|  |                 match root_node { | ||||||
|  |                     tvix_store::proto::node::Node::Directory(directory_node) => { | ||||||
|  |                         info!( | ||||||
|  |                             path = ?path, | ||||||
|  |                             name = directory_node.name, | ||||||
|  |                             digest = BASE64.encode(&directory_node.digest), | ||||||
|  |                             "import successful", | ||||||
|  |                         ) | ||||||
|  |                     } | ||||||
|  |                     tvix_store::proto::node::Node::File(file_node) => { | ||||||
|  |                         info!( | ||||||
|  |                             path = ?path, | ||||||
|  |                             name = file_node.name, | ||||||
|  |                             digest = BASE64.encode(&file_node.digest), | ||||||
|  |                             "import successful" | ||||||
|  |                         ) | ||||||
|  |                     } | ||||||
|  |                     tvix_store::proto::node::Node::Symlink(symlink_node) => { | ||||||
|  |                         info!( | ||||||
|  |                             path = ?path, | ||||||
|  |                             name = symlink_node.name, | ||||||
|  |                             target = symlink_node.target, | ||||||
|  |                             "import successful" | ||||||
|  |                         ) | ||||||
|  |                     } | ||||||
|  |                 } | ||||||
|  |             } | ||||||
|  |         } | ||||||
|  |     }; | ||||||
|     Ok(()) |     Ok(()) | ||||||
| } | } | ||||||
|  |  | ||||||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue