diff --git a/snix/Cargo.lock b/snix/Cargo.lock index ad09cbbdb..04a261a23 100644 --- a/snix/Cargo.lock +++ b/snix/Cargo.lock @@ -4223,6 +4223,7 @@ dependencies = [ "thiserror 2.0.9", "threadpool", "tokio", + "tokio-listener", "tokio-retry", "tokio-stream", "tokio-tar", @@ -4231,8 +4232,10 @@ dependencies = [ "toml 0.8.19", "tonic", "tonic-build", + "tonic-health", "tonic-reflection", "tower 0.4.13", + "tower-http", "tracing", "tracing-indicatif", "url", diff --git a/snix/Cargo.nix b/snix/Cargo.nix index dd7b7344e..de3c14421 100644 --- a/snix/Cargo.nix +++ b/snix/Cargo.nix @@ -13787,6 +13787,11 @@ rec { packageId = "tokio"; features = [ "fs" "macros" "net" "rt" "rt-multi-thread" "signal" ]; } + { + name = "tokio-listener"; + packageId = "tokio-listener"; + features = [ "clap" "multi-listener" "sd_listen" "tonic012" ]; + } { name = "tokio-stream"; packageId = "tokio-stream"; @@ -13810,6 +13815,11 @@ rec { name = "tonic"; packageId = "tonic"; } + { + name = "tonic-health"; + packageId = "tonic-health"; + usesDefaultFeatures = false; + } { name = "tonic-reflection"; packageId = "tonic-reflection"; @@ -13819,6 +13829,11 @@ rec { name = "tower"; packageId = "tower 0.4.13"; } + { + name = "tower-http"; + packageId = "tower-http"; + features = [ "trace" ]; + } { name = "tracing"; packageId = "tracing"; diff --git a/snix/castore/Cargo.toml b/snix/castore/Cargo.toml index 189919960..e9adcb6ef 100644 --- a/snix/castore/Cargo.toml +++ b/snix/castore/Cargo.toml @@ -31,9 +31,17 @@ tokio = { workspace = true, features = [ "rt-multi-thread", "signal", ] } +tokio-listener = { workspace = true, features = [ + "clap", + "multi-listener", + "sd_listen", + "tonic012", +] } toml = { version = "0.8.19", optional = true } tonic.workspace = true +tonic-health.workspace = true tower.workspace = true +tower-http = { workspace = true, features = ["trace"] } tracing.workspace = true tracing-indicatif.workspace = true snix-tracing = { path = "../tracing", features = ["tonic"] } diff --git a/snix/castore/src/bin/snix-castore.rs b/snix/castore/src/bin/snix-castore.rs index 4150363ee..66cac0dff 100644 --- a/snix/castore/src/bin/snix-castore.rs +++ b/snix/castore/src/bin/snix-castore.rs @@ -8,12 +8,20 @@ use snix_castore::fs::fuse::FuseDaemon; #[cfg(feature = "virtiofs")] use snix_castore::fs::virtiofs::start_virtiofs_daemon; use snix_castore::import::{archive::ingest_archive, fs::ingest_path}; +use snix_castore::proto::blob_service_server::BlobServiceServer; +use snix_castore::proto::directory_service_server::DirectoryServiceServer; +use snix_castore::proto::{GRPCBlobServiceWrapper, GRPCDirectoryServiceWrapper}; use snix_castore::{Node, utils::ServiceUrls}; use std::error::Error; use std::io::Write; use std::path::PathBuf; use tokio::fs::{self, File}; use tokio_tar::Archive; +use tonic::transport::Server; +use tower::ServiceBuilder; +use tower_http::classify::{GrpcCode, GrpcErrorsAsFailures, SharedClassifier}; +use tower_http::trace::{DefaultMakeSpan, TraceLayer}; +use tracing::{Level, info}; #[derive(Parser)] #[command(version, about, long_about = None)] @@ -24,6 +32,15 @@ struct Cli { #[derive(Subcommand)] enum Commands { + /// Runs the snix-castore daemon + Daemon { + #[clap(flatten)] + listen_args: tokio_listener::ListenerAddressLFlag, + + #[clap(flatten)] + service_addrs: ServiceUrls, + }, + /// Ingest a folder or tar archive and return its B3Digest Ingest { /// Path of the folder or tar archive to import @@ -83,6 +100,69 @@ async fn main() -> Result<(), Box> { }, res = async { match cli.command { + Commands::Daemon { + listen_args, + service_addrs, + } => { + let (blob_service, directory_service) = + snix_castore::utils::construct_services(service_addrs).await?; + + let mut server = Server::builder().layer( + ServiceBuilder::new() + .layer( + TraceLayer::new(SharedClassifier::new( + GrpcErrorsAsFailures::new() + .with_success(GrpcCode::InvalidArgument) + .with_success(GrpcCode::NotFound), + )) + .make_span_with( + DefaultMakeSpan::new() + .level(Level::INFO) + .include_headers(true), + ), + ) + .map_request(snix_tracing::propagate::tonic::accept_trace), + ); + + let (_health_reporter, health_service) = tonic_health::server::health_reporter(); + + #[allow(unused_mut)] + let mut router = server + .add_service(health_service) + .add_service(BlobServiceServer::new(GRPCBlobServiceWrapper::new(blob_service))) + .add_service(DirectoryServiceServer::new(GRPCDirectoryServiceWrapper::new(directory_service))); + + #[cfg(feature = "tonic-reflection")] + { + router = router.add_service( + tonic_reflection::server::Builder::configure() + .register_encoded_file_descriptor_set(snix_castore::proto::FILE_DESCRIPTOR_SET) + .build_v1alpha()?, + ); + router = router.add_service( + tonic_reflection::server::Builder::configure() + .register_encoded_file_descriptor_set(snix_castore::proto::FILE_DESCRIPTOR_SET) + .build_v1()?, + ); + } + + let listen_address = &listen_args.listen_address.unwrap_or_else(|| { + "[::]:8000" + .parse() + .expect("invalid fallback listen address") + }); + + let listener = tokio_listener::Listener::bind( + listen_address, + &Default::default(), + &listen_args.listener_options, + ) + .await?; + + info!(listen_address=%listen_address, "starting daemon"); + + router.serve_with_incoming(listener).await?; + } Commands::Ingest { input, service_addrs,