feat(snix/castore): Add daemon subcommand

Change-Id: I315c473385d40999f69084d525e36d8eda82d15c
Reviewed-on: https://cl.snix.dev/c/snix/+/30654
Reviewed-by: Florian Klink <flokli@flokli.de>
Tested-by: besadii
This commit is contained in:
Mai 2025-08-05 03:44:29 +03:00
parent 0ff37a7217
commit 2a29b90c7f
4 changed files with 106 additions and 0 deletions

3
snix/Cargo.lock generated
View file

@ -4223,6 +4223,7 @@ dependencies = [
"thiserror 2.0.9", "thiserror 2.0.9",
"threadpool", "threadpool",
"tokio", "tokio",
"tokio-listener",
"tokio-retry", "tokio-retry",
"tokio-stream", "tokio-stream",
"tokio-tar", "tokio-tar",
@ -4231,8 +4232,10 @@ dependencies = [
"toml 0.8.19", "toml 0.8.19",
"tonic", "tonic",
"tonic-build", "tonic-build",
"tonic-health",
"tonic-reflection", "tonic-reflection",
"tower 0.4.13", "tower 0.4.13",
"tower-http",
"tracing", "tracing",
"tracing-indicatif", "tracing-indicatif",
"url", "url",

View file

@ -13787,6 +13787,11 @@ rec {
packageId = "tokio"; packageId = "tokio";
features = [ "fs" "macros" "net" "rt" "rt-multi-thread" "signal" ]; features = [ "fs" "macros" "net" "rt" "rt-multi-thread" "signal" ];
} }
{
name = "tokio-listener";
packageId = "tokio-listener";
features = [ "clap" "multi-listener" "sd_listen" "tonic012" ];
}
{ {
name = "tokio-stream"; name = "tokio-stream";
packageId = "tokio-stream"; packageId = "tokio-stream";
@ -13810,6 +13815,11 @@ rec {
name = "tonic"; name = "tonic";
packageId = "tonic"; packageId = "tonic";
} }
{
name = "tonic-health";
packageId = "tonic-health";
usesDefaultFeatures = false;
}
{ {
name = "tonic-reflection"; name = "tonic-reflection";
packageId = "tonic-reflection"; packageId = "tonic-reflection";
@ -13819,6 +13829,11 @@ rec {
name = "tower"; name = "tower";
packageId = "tower 0.4.13"; packageId = "tower 0.4.13";
} }
{
name = "tower-http";
packageId = "tower-http";
features = [ "trace" ];
}
{ {
name = "tracing"; name = "tracing";
packageId = "tracing"; packageId = "tracing";

View file

@ -31,9 +31,17 @@ tokio = { workspace = true, features = [
"rt-multi-thread", "rt-multi-thread",
"signal", "signal",
] } ] }
tokio-listener = { workspace = true, features = [
"clap",
"multi-listener",
"sd_listen",
"tonic012",
] }
toml = { version = "0.8.19", optional = true } toml = { version = "0.8.19", optional = true }
tonic.workspace = true tonic.workspace = true
tonic-health.workspace = true
tower.workspace = true tower.workspace = true
tower-http = { workspace = true, features = ["trace"] }
tracing.workspace = true tracing.workspace = true
tracing-indicatif.workspace = true tracing-indicatif.workspace = true
snix-tracing = { path = "../tracing", features = ["tonic"] } snix-tracing = { path = "../tracing", features = ["tonic"] }

View file

@ -8,12 +8,20 @@ use snix_castore::fs::fuse::FuseDaemon;
#[cfg(feature = "virtiofs")] #[cfg(feature = "virtiofs")]
use snix_castore::fs::virtiofs::start_virtiofs_daemon; use snix_castore::fs::virtiofs::start_virtiofs_daemon;
use snix_castore::import::{archive::ingest_archive, fs::ingest_path}; use snix_castore::import::{archive::ingest_archive, fs::ingest_path};
use snix_castore::proto::blob_service_server::BlobServiceServer;
use snix_castore::proto::directory_service_server::DirectoryServiceServer;
use snix_castore::proto::{GRPCBlobServiceWrapper, GRPCDirectoryServiceWrapper};
use snix_castore::{Node, utils::ServiceUrls}; use snix_castore::{Node, utils::ServiceUrls};
use std::error::Error; use std::error::Error;
use std::io::Write; use std::io::Write;
use std::path::PathBuf; use std::path::PathBuf;
use tokio::fs::{self, File}; use tokio::fs::{self, File};
use tokio_tar::Archive; use tokio_tar::Archive;
use tonic::transport::Server;
use tower::ServiceBuilder;
use tower_http::classify::{GrpcCode, GrpcErrorsAsFailures, SharedClassifier};
use tower_http::trace::{DefaultMakeSpan, TraceLayer};
use tracing::{Level, info};
#[derive(Parser)] #[derive(Parser)]
#[command(version, about, long_about = None)] #[command(version, about, long_about = None)]
@ -24,6 +32,15 @@ struct Cli {
#[derive(Subcommand)] #[derive(Subcommand)]
enum Commands { enum Commands {
/// Runs the snix-castore daemon
Daemon {
#[clap(flatten)]
listen_args: tokio_listener::ListenerAddressLFlag,
#[clap(flatten)]
service_addrs: ServiceUrls,
},
/// Ingest a folder or tar archive and return its B3Digest /// Ingest a folder or tar archive and return its B3Digest
Ingest { Ingest {
/// Path of the folder or tar archive to import /// Path of the folder or tar archive to import
@ -83,6 +100,69 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
}, },
res = async { res = async {
match cli.command { match cli.command {
Commands::Daemon {
listen_args,
service_addrs,
} => {
let (blob_service, directory_service) =
snix_castore::utils::construct_services(service_addrs).await?;
let mut server = Server::builder().layer(
ServiceBuilder::new()
.layer(
TraceLayer::new(SharedClassifier::new(
GrpcErrorsAsFailures::new()
.with_success(GrpcCode::InvalidArgument)
.with_success(GrpcCode::NotFound),
))
.make_span_with(
DefaultMakeSpan::new()
.level(Level::INFO)
.include_headers(true),
),
)
.map_request(snix_tracing::propagate::tonic::accept_trace),
);
let (_health_reporter, health_service) = tonic_health::server::health_reporter();
#[allow(unused_mut)]
let mut router = server
.add_service(health_service)
.add_service(BlobServiceServer::new(GRPCBlobServiceWrapper::new(blob_service)))
.add_service(DirectoryServiceServer::new(GRPCDirectoryServiceWrapper::new(directory_service)));
#[cfg(feature = "tonic-reflection")]
{
router = router.add_service(
tonic_reflection::server::Builder::configure()
.register_encoded_file_descriptor_set(snix_castore::proto::FILE_DESCRIPTOR_SET)
.build_v1alpha()?,
);
router = router.add_service(
tonic_reflection::server::Builder::configure()
.register_encoded_file_descriptor_set(snix_castore::proto::FILE_DESCRIPTOR_SET)
.build_v1()?,
);
}
let listen_address = &listen_args.listen_address.unwrap_or_else(|| {
"[::]:8000"
.parse()
.expect("invalid fallback listen address")
});
let listener = tokio_listener::Listener::bind(
listen_address,
&Default::default(),
&listen_args.listener_options,
)
.await?;
info!(listen_address=%listen_address, "starting daemon");
router.serve_with_incoming(listener).await?;
}
Commands::Ingest { Commands::Ingest {
input, input,
service_addrs, service_addrs,