feat(nix-daemon): Implement client handler.
This change includes only the basic nix handshake protocol handling and sets up a client session. The only supported operation at this point is SetOptions. Additional operations will be implemented in subsequent cls. Change-Id: I3eccd9e0ceb270c3865929543c702f1491768852 Reviewed-on: https://cl.tvl.fyi/c/depot/+/12743 Autosubmit: Vladimir Kryachko <v.kryachko@gmail.com> Tested-by: BuildkiteCI Reviewed-by: flokli <flokli@flokli.de> Reviewed-by: edef <edef@edef.eu> Reviewed-by: Brian Olsen <me@griff.name>
This commit is contained in:
parent
72bc4e0270
commit
b564ed9d43
25 changed files with 1822 additions and 253 deletions
|
|
@ -1,8 +1,10 @@
|
|||
use clap::Parser;
|
||||
use mimalloc::MiMalloc;
|
||||
use std::error::Error;
|
||||
use tokio::io::AsyncWriteExt;
|
||||
use nix_compat::nix_daemon::handler::NixDaemon;
|
||||
use nix_daemon::TvixDaemon;
|
||||
use std::{error::Error, sync::Arc};
|
||||
use tokio_listener::SystemOptions;
|
||||
use tracing::error;
|
||||
use tvix_store::utils::{construct_services, ServiceUrlsGrpc};
|
||||
|
||||
#[global_allocator]
|
||||
|
|
@ -58,7 +60,7 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
|
|||
}
|
||||
|
||||
async fn run(cli: Cli) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
||||
let (_blob_service, _directory_service, _path_info_service, _nar_calculation_service) =
|
||||
let (_blob_service, _directory_service, path_info_service, _nar_calculation_service) =
|
||||
construct_services(cli.service_addrs).await?;
|
||||
|
||||
let listen_address = cli.listen_args.listen_address.unwrap_or_else(|| {
|
||||
|
|
@ -74,16 +76,29 @@ async fn run(cli: Cli) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
|||
)
|
||||
.await?;
|
||||
|
||||
while let Ok((mut connection, _)) = listener.accept().await {
|
||||
tokio::spawn(async move {
|
||||
let ucred = connection
|
||||
.try_borrow_unix()
|
||||
.and_then(|u| u.peer_cred().ok());
|
||||
let io = Arc::new(TvixDaemon::new(path_info_service));
|
||||
|
||||
// For now we just write the connected process credentials into the connection.
|
||||
let _ = connection
|
||||
.write_all(format!("Hello {:?}", ucred).as_bytes())
|
||||
.await;
|
||||
while let Ok((connection, _)) = listener.accept().await {
|
||||
let io = io.clone();
|
||||
tokio::spawn(async move {
|
||||
match NixDaemon::initialize(io.clone(), connection).await {
|
||||
Ok(mut daemon) => {
|
||||
if let Err(error) = daemon.handle_client().await {
|
||||
match error.kind() {
|
||||
std::io::ErrorKind::UnexpectedEof => {
|
||||
// client disconnected, nothing to do
|
||||
}
|
||||
_ => {
|
||||
// otherwise log the error and disconnect
|
||||
error!(error=?error, "client error");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(error) => {
|
||||
error!(error=?error, "nix-daemon handshake failed");
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
Ok(())
|
||||
|
|
|
|||
18
tvix/nix-daemon/src/lib.rs
Normal file
18
tvix/nix-daemon/src/lib.rs
Normal file
|
|
@ -0,0 +1,18 @@
|
|||
use std::sync::Arc;
|
||||
|
||||
use nix_compat::nix_daemon::NixDaemonIO;
|
||||
use tvix_store::pathinfoservice::PathInfoService;
|
||||
|
||||
#[allow(dead_code)]
|
||||
pub struct TvixDaemon {
|
||||
path_info_service: Arc<dyn PathInfoService>,
|
||||
}
|
||||
|
||||
impl TvixDaemon {
|
||||
pub fn new(path_info_service: Arc<dyn PathInfoService>) -> Self {
|
||||
Self { path_info_service }
|
||||
}
|
||||
}
|
||||
|
||||
/// Implements [NixDaemonIO] backed by tvix services.
|
||||
impl NixDaemonIO for TvixDaemon {}
|
||||
Loading…
Add table
Add a link
Reference in a new issue