From beae2cd27fefdec91fe08410efa7c473053162ae Mon Sep 17 00:00:00 2001 From: Raito Bezarius Date: Mon, 24 Mar 2025 23:44:19 +0100 Subject: [PATCH] feat(nix-compat/derivation/parser): init streaming variant While processing `AddToStore` operation, it is possible to receive a framed derivation as a response instead of a NAR. Due to how the wire protocol works, it's not easy to know if we are done with pulling more bytes (i.e. incomplete derivation) or not (i.e. incorrect derivation). To solve this, a streaming parser would propagate the incompleteness of the parse as a valid response and the reader would be requested more bytes until it returns EOF. Change-Id: Icd2b311f03ad68010a9b48e883f6dbee2fbd2c3e Signed-off-by: Raito Bezarius Reviewed-on: https://cl.snix.dev/c/snix/+/30278 Reviewed-by: Florian Klink Tested-by: besadii Autosubmit: Ryan Lahfa --- snix/nix-compat/src/aterm/parser.rs | 4 +- snix/nix-compat/src/derivation/mod.rs | 61 ++++++++++++++++++++++++ snix/nix-compat/src/derivation/parser.rs | 40 ++++++++++++++-- 3 files changed, 100 insertions(+), 5 deletions(-) diff --git a/snix/nix-compat/src/aterm/parser.rs b/snix/nix-compat/src/aterm/parser.rs index f0993aaa5..2bfba9e3e 100644 --- a/snix/nix-compat/src/aterm/parser.rs +++ b/snix/nix-compat/src/aterm/parser.rs @@ -4,8 +4,8 @@ //! [ATerm]: http://program-transformation.org/Tools/ATermFormat.html use bstr::BString; use nom::branch::alt; -use nom::bytes::complete::{escaped_transform, is_not}; -use nom::character::complete::char as nomchar; +use nom::bytes::streaming::{escaped_transform, is_not}; +use nom::character::streaming::char as nomchar; use nom::combinator::{map_res, opt, value}; use nom::multi::separated_list0; use nom::sequence::delimited; diff --git a/snix/nix-compat/src/derivation/mod.rs b/snix/nix-compat/src/derivation/mod.rs index fe92d9256..acc6c46e4 100644 --- a/snix/nix-compat/src/derivation/mod.rs +++ b/snix/nix-compat/src/derivation/mod.rs @@ -280,6 +280,67 @@ impl Derivation { } } +#[cfg(feature = "async")] +#[allow(dead_code)] +trait DerivationAsyncExt { + /// Parse an Derivation in ATerm serialization, and validate it passes + /// our set of validations, from a asynchronous buffered reader. + /// This is a streaming variant of [Derivation::from_aterm_bytes]. + async fn from_streaming_aterm_bytes(reader: R) -> Result>> + where + R: tokio::io::AsyncBufRead + Unpin + Send; +} + +#[cfg(feature = "async")] +impl DerivationAsyncExt for Derivation { + async fn from_streaming_aterm_bytes( + mut reader: R, + ) -> Result>> + where + R: tokio::io::AsyncBufRead + Unpin + Send, + { + use tokio::io::AsyncBufReadExt; + let mut buffer = Vec::new(); + loop { + let rest = reader.fill_buf().await.unwrap(); + let length = rest.len(); + + // We reached EOF, we can stop and return incompleteness. + if length == 0 { + return Err(ParserError::Incomplete); + } + + buffer.extend_from_slice(rest); + + // Parse the so-far internal buffer of reader. + match parser::parse_streaming(&buffer) { + (Err(parser::Error::Incomplete), _) => { + reader.consume(length); + continue; + } + (Ok(derivation), leftover) => { + // We cannot inline it in the next call because `reader` is mutably borrowed + // and has a relationship with the lifetime of `leftover`. + let leftover_length = leftover.len(); + + // Well, if we already had consumed the leftovers of the past fetch + // while believing we were just parsing incomplete ATerm, there's nothing + // we can do about it. The protocol is made this way. + if length >= leftover_length { + // We still have leftover, let's not consume it. + // It's not for us. + reader.consume(length - leftover_length); + } + return Ok(derivation); + } + (Err(e), _) => { + return Err(e.into()); + } + } + } + } +} + /// Calculate the name part of the store path of a derivation [Output]. /// /// It's the name, and (if it's the non-out output), the output name diff --git a/snix/nix-compat/src/derivation/parser.rs b/snix/nix-compat/src/derivation/parser.rs index 64037ce15..189ea54cf 100644 --- a/snix/nix-compat/src/derivation/parser.rs +++ b/snix/nix-compat/src/derivation/parser.rs @@ -3,9 +3,9 @@ //! //! [ATerm]: http://program-transformation.org/Tools/ATermFormat.html -use nom::bytes::complete::tag; -use nom::character::complete::char as nomchar; -use nom::combinator::{all_consuming, map_res}; +use nom::bytes::streaming::tag; +use nom::character::streaming::char as nomchar; +use nom::combinator::{all_consuming, consumed, map_res}; use nom::multi::{separated_list0, separated_list1}; use nom::sequence::{delimited, preceded, separated_pair, terminated}; use nom::Parser; @@ -57,6 +57,26 @@ pub(crate) fn parse(i: &[u8]) -> Result> { } } +/// This parses a derivation in streaming fashion. +/// If the parse is successful, it returns the leftover bytes which were not used for the parsing. +/// If the parse is unsuccessful, either it returns incomplete or an error with the input as +/// leftover. +#[allow(dead_code)] +pub fn parse_streaming(i: &[u8]) -> (Result>, &[u8]) { + match consumed(parse_derivation).parse(i) { + Ok((_, (rest, derivation))) => { + // invoke validate + if let Err(e) = derivation.validate(true).map_err(Error::Validation) { + return (Err(e), i); + } + + (Ok(derivation), rest) + } + Err(nom::Err::Incomplete(_)) => (Err(Error::Incomplete), i), + Err(nom::Err::Error(e) | nom::Err::Failure(e)) => (Err(e.into()), i), + } +} + /// Consume a string containing the algo, and optionally a `r:` /// prefix, and a digest (bytes), return a [CAHash::Nar] or [CAHash::Flat]. fn from_algo_and_mode_and_digest>( @@ -465,6 +485,20 @@ mod tests { assert_eq!(*expected, parsed); } + #[rstest] + #[case::incomplete_empty(b"[")] + #[case::incomplete_simple(b"[(\"a\",\"1\")")] + #[case::incomplete_complicated_escape(b"[(\"a")] + #[case::incomplete_complicated_sep(b"[(\"a\",")] + #[case::incomplete_complicated_multi_escape(b"[(\"a\",\"")] + #[case::incomplete_complicated_multi_outer_sep(b"[(\"a\",\"b\"),")] + fn parse_kv_incomplete(#[case] input: &'static [u8]) { + assert!(matches!( + super::parse_kv(crate::aterm::parse_bytes_field)(input), + Err(nom::Err::Incomplete(_)) + )); + } + /// Ensures the kv parser complains about duplicate map keys #[test] fn parse_kv_fail_dup_keys() {