feat(nix-daemon): Implement more nix daemon operations.

In particular QueryPathFromHashPart, QueryValidPaths, QueryValidDerivers

Change-Id: Ie6ad83cec5ce9580044b85e201e4e23394f87075
Reviewed-on: https://cl.tvl.fyi/c/depot/+/12762
Tested-by: BuildkiteCI
Reviewed-by: edef <edef@edef.eu>
Reviewed-by: flokli <flokli@flokli.de>
This commit is contained in:
Vova Kryachko 2024-11-11 23:15:54 -05:00 committed by Vladimir Kryachko
parent 6aada91062
commit fa9c067dc9
16 changed files with 538 additions and 60 deletions

View file

@ -1,5 +1,6 @@
use std::{future::Future, sync::Arc};
use bytes::Bytes;
use tokio::{
io::{split, AsyncReadExt, AsyncWriteExt, ReadHalf, WriteHalf},
sync::Mutex,
@ -7,6 +8,7 @@ use tokio::{
use tracing::debug;
use super::{
types::QueryValidPaths,
worker_protocol::{server_handshake_client, ClientSettings, Operation, Trust, STDERR_LAST},
NixDaemonIO,
};
@ -114,7 +116,16 @@ where
loop {
let op_code = self.reader.read_number().await?;
match TryInto::<Operation>::try_into(op_code) {
// Note: please keep operations sorted in ascending order of their numerical op number.
Ok(operation) => match operation {
Operation::IsValidPath => {
let path: StorePath<String> = self.reader.read_value().await?;
self.handle(io.is_valid_path(&path)).await?
}
// Note this operation does not currently delegate to NixDaemonIO,
// The general idea is that we will pass relevant ClientSettings
// into individual NixDaemonIO method calls if the need arises.
// For now we just store the settings in the NixDaemon for future use.
Operation::SetOptions => {
self.client_settings = self.reader.read_value().await?;
self.handle(async { Ok(()) }).await?
@ -123,10 +134,17 @@ where
let path: StorePath<String> = self.reader.read_value().await?;
self.handle(io.query_path_info(&path)).await?
}
Operation::IsValidPath => {
Operation::QueryPathFromHashPart => {
let hash: Bytes = self.reader.read_value().await?;
self.handle(io.query_path_from_hash_part(&hash)).await?
}
Operation::QueryValidPaths => {
let query: QueryValidPaths = self.reader.read_value().await?;
self.handle(io.query_valid_paths(&query)).await?
}
Operation::QueryValidDerivers => {
let path: StorePath<String> = self.reader.read_value().await?;
self.handle(async { Ok(io.query_path_info(&path).await?.is_some()) })
.await?
self.handle(io.query_valid_derivers(&path)).await?
}
_ => {
return Err(std::io::Error::other(format!(
@ -202,6 +220,13 @@ mod tests {
) -> Result<Option<UnkeyedValidPathInfo>> {
Ok(None)
}
async fn query_path_from_hash_part(
&self,
_hash: &[u8],
) -> Result<Option<UnkeyedValidPathInfo>> {
Ok(None)
}
}
#[tokio::test]

View file

@ -2,7 +2,9 @@ pub mod worker_protocol;
use std::io::Result;
use types::UnkeyedValidPathInfo;
use futures::future::try_join_all;
use tracing::warn;
use types::{QueryValidPaths, UnkeyedValidPathInfo};
use crate::store_path::StorePath;
@ -10,9 +12,200 @@ pub mod handler;
pub mod types;
/// Represents all possible operations over the nix-daemon protocol.
pub trait NixDaemonIO {
pub trait NixDaemonIO: Sync {
fn is_valid_path(
&self,
path: &StorePath<String>,
) -> impl std::future::Future<Output = Result<bool>> + Send {
async move { Ok(self.query_path_info(path).await?.is_some()) }
}
fn query_path_info(
&self,
path: &StorePath<String>,
) -> impl std::future::Future<Output = Result<Option<UnkeyedValidPathInfo>>> + Send;
fn query_path_from_hash_part(
&self,
hash: &[u8],
) -> impl std::future::Future<Output = Result<Option<UnkeyedValidPathInfo>>> + Send;
fn query_valid_paths(
&self,
request: &QueryValidPaths,
) -> impl std::future::Future<Output = Result<Vec<UnkeyedValidPathInfo>>> + Send {
async move {
if request.substitute {
warn!("tvix does not yet support substitution, ignoring the 'substitute' flag...");
}
// Using try_join_all here to avoid returning partial results to the client.
// The only reason query_path_info can fail is due to transient IO errors,
// so we return such errors to the client as opposed to only returning paths
// that succeeded.
let result =
try_join_all(request.paths.iter().map(|path| self.query_path_info(path))).await?;
let result: Vec<UnkeyedValidPathInfo> = result.into_iter().flatten().collect();
Ok(result)
}
}
fn query_valid_derivers(
&self,
path: &StorePath<String>,
) -> impl std::future::Future<Output = Result<Vec<StorePath<String>>>> + Send {
async move {
let result = self.query_path_info(path).await?;
let result: Vec<_> = result.into_iter().filter_map(|info| info.deriver).collect();
Ok(result)
}
}
}
#[cfg(test)]
mod tests {
use crate::{nix_daemon::types::QueryValidPaths, store_path::StorePath};
use super::{types::UnkeyedValidPathInfo, NixDaemonIO};
// Very simple mock
// Unable to use mockall as it does not support unboxed async traits.
pub struct MockNixDaemonIO {
query_path_info_result: Option<UnkeyedValidPathInfo>,
}
impl NixDaemonIO for MockNixDaemonIO {
async fn query_path_info(
&self,
_path: &StorePath<String>,
) -> std::io::Result<Option<UnkeyedValidPathInfo>> {
Ok(self.query_path_info_result.clone())
}
async fn query_path_from_hash_part(
&self,
_hash: &[u8],
) -> std::io::Result<Option<UnkeyedValidPathInfo>> {
Ok(None)
}
}
#[tokio::test]
async fn test_is_valid_path_returns_true() {
let path =
StorePath::<String>::from_bytes("z6r3bn5l51679pwkvh9nalp6c317z34m-hello".as_bytes())
.unwrap();
let io = MockNixDaemonIO {
query_path_info_result: Some(UnkeyedValidPathInfo::default()),
};
let result = io
.is_valid_path(&path)
.await
.expect("expected to get a non-empty response");
assert!(result, "expected to get true");
}
#[tokio::test]
async fn test_is_valid_path_returns_false() {
let path =
StorePath::<String>::from_bytes("z6r3bn5l51679pwkvh9nalp6c317z34m-hello".as_bytes())
.unwrap();
let io = MockNixDaemonIO {
query_path_info_result: None,
};
let result = io
.is_valid_path(&path)
.await
.expect("expected to get a non-empty response");
assert!(!result, "expected to get false");
}
#[tokio::test]
async fn test_query_valid_paths_returns_empty_response() {
let path =
StorePath::<String>::from_bytes("z6r3bn5l51679pwkvh9nalp6c317z34m-hello".as_bytes())
.unwrap();
let io = MockNixDaemonIO {
query_path_info_result: None,
};
let result = io
.query_valid_paths(&QueryValidPaths {
paths: vec![path],
substitute: false,
})
.await
.expect("expected to get a non-empty response");
assert_eq!(result, vec![], "expected to get empty response");
}
#[tokio::test]
async fn test_query_valid_paths_returns_non_empty_response() {
let path =
StorePath::<String>::from_bytes("z6r3bn5l51679pwkvh9nalp6c317z34m-hello".as_bytes())
.unwrap();
let io = MockNixDaemonIO {
query_path_info_result: Some(UnkeyedValidPathInfo::default()),
};
let result = io
.query_valid_paths(&QueryValidPaths {
paths: vec![path],
substitute: false,
})
.await
.expect("expected to get a non-empty response");
assert_eq!(
result,
vec![UnkeyedValidPathInfo::default()],
"expected to get non empty response"
);
}
#[tokio::test]
async fn test_query_valid_derivers_returns_empty_response() {
let path =
StorePath::<String>::from_bytes("z6r3bn5l51679pwkvh9nalp6c317z34m-hello".as_bytes())
.unwrap();
let io = MockNixDaemonIO {
query_path_info_result: None,
};
let result = io
.query_valid_derivers(&path)
.await
.expect("expected to get a non-empty response");
assert_eq!(result, vec![], "expected to get empty response");
}
#[tokio::test]
async fn test_query_valid_derivers_returns_non_empty_response() {
let path =
StorePath::<String>::from_bytes("z6r3bn5l51679pwkvh9nalp6c317z34m-hello".as_bytes())
.unwrap();
let deriver = StorePath::<String>::from_bytes(
"z6r3bn5l51679pwkvh9nalp6c317z34m-hello.drv".as_bytes(),
)
.unwrap();
let io = MockNixDaemonIO {
query_path_info_result: Some(UnkeyedValidPathInfo {
deriver: Some(deriver.clone()),
nar_hash: "".to_owned(),
references: vec![],
registration_time: 0,
nar_size: 1,
ultimate: true,
signatures: vec![],
ca: None,
}),
};
let result = io
.query_valid_derivers(&path)
.await
.expect("expected to get a non-empty response");
assert_eq!(result, vec![deriver], "expected to get non empty response");
}
}

View file

@ -152,7 +152,7 @@ impl NixSerialize for Option<StorePath<String>> {
}
}
#[derive(NixSerialize, Debug)]
#[derive(NixSerialize, Debug, Clone, Default, PartialEq)]
pub struct UnkeyedValidPathInfo {
pub deriver: Option<StorePath<String>>,
pub nar_hash: String,
@ -164,5 +164,13 @@ pub struct UnkeyedValidPathInfo {
pub ca: Option<CAHash>,
}
#[cfg(test)]
mod tests {}
/// Request tupe for [super::worker_protocol::Operation::QueryValidPaths]
#[derive(NixDeserialize)]
pub struct QueryValidPaths {
// Paths to query
pub paths: Vec<StorePath<String>>,
// Whether to try and substitute the paths.
#[nix(version = "27..")]
pub substitute: bool,
}