diff --git a/snix/nix-compat/src/aterm/parser.rs b/snix/nix-compat/src/aterm/parser.rs index f0993aaa5..2bfba9e3e 100644 --- a/snix/nix-compat/src/aterm/parser.rs +++ b/snix/nix-compat/src/aterm/parser.rs @@ -4,8 +4,8 @@ //! [ATerm]: http://program-transformation.org/Tools/ATermFormat.html use bstr::BString; use nom::branch::alt; -use nom::bytes::complete::{escaped_transform, is_not}; -use nom::character::complete::char as nomchar; +use nom::bytes::streaming::{escaped_transform, is_not}; +use nom::character::streaming::char as nomchar; use nom::combinator::{map_res, opt, value}; use nom::multi::separated_list0; use nom::sequence::delimited; diff --git a/snix/nix-compat/src/derivation/mod.rs b/snix/nix-compat/src/derivation/mod.rs index fe92d9256..acc6c46e4 100644 --- a/snix/nix-compat/src/derivation/mod.rs +++ b/snix/nix-compat/src/derivation/mod.rs @@ -280,6 +280,67 @@ impl Derivation { } } +#[cfg(feature = "async")] +#[allow(dead_code)] +trait DerivationAsyncExt { + /// Parse an Derivation in ATerm serialization, and validate it passes + /// our set of validations, from a asynchronous buffered reader. + /// This is a streaming variant of [Derivation::from_aterm_bytes]. + async fn from_streaming_aterm_bytes(reader: R) -> Result>> + where + R: tokio::io::AsyncBufRead + Unpin + Send; +} + +#[cfg(feature = "async")] +impl DerivationAsyncExt for Derivation { + async fn from_streaming_aterm_bytes( + mut reader: R, + ) -> Result>> + where + R: tokio::io::AsyncBufRead + Unpin + Send, + { + use tokio::io::AsyncBufReadExt; + let mut buffer = Vec::new(); + loop { + let rest = reader.fill_buf().await.unwrap(); + let length = rest.len(); + + // We reached EOF, we can stop and return incompleteness. + if length == 0 { + return Err(ParserError::Incomplete); + } + + buffer.extend_from_slice(rest); + + // Parse the so-far internal buffer of reader. + match parser::parse_streaming(&buffer) { + (Err(parser::Error::Incomplete), _) => { + reader.consume(length); + continue; + } + (Ok(derivation), leftover) => { + // We cannot inline it in the next call because `reader` is mutably borrowed + // and has a relationship with the lifetime of `leftover`. + let leftover_length = leftover.len(); + + // Well, if we already had consumed the leftovers of the past fetch + // while believing we were just parsing incomplete ATerm, there's nothing + // we can do about it. The protocol is made this way. + if length >= leftover_length { + // We still have leftover, let's not consume it. + // It's not for us. + reader.consume(length - leftover_length); + } + return Ok(derivation); + } + (Err(e), _) => { + return Err(e.into()); + } + } + } + } +} + /// Calculate the name part of the store path of a derivation [Output]. /// /// It's the name, and (if it's the non-out output), the output name diff --git a/snix/nix-compat/src/derivation/parser.rs b/snix/nix-compat/src/derivation/parser.rs index 64037ce15..189ea54cf 100644 --- a/snix/nix-compat/src/derivation/parser.rs +++ b/snix/nix-compat/src/derivation/parser.rs @@ -3,9 +3,9 @@ //! //! [ATerm]: http://program-transformation.org/Tools/ATermFormat.html -use nom::bytes::complete::tag; -use nom::character::complete::char as nomchar; -use nom::combinator::{all_consuming, map_res}; +use nom::bytes::streaming::tag; +use nom::character::streaming::char as nomchar; +use nom::combinator::{all_consuming, consumed, map_res}; use nom::multi::{separated_list0, separated_list1}; use nom::sequence::{delimited, preceded, separated_pair, terminated}; use nom::Parser; @@ -57,6 +57,26 @@ pub(crate) fn parse(i: &[u8]) -> Result> { } } +/// This parses a derivation in streaming fashion. +/// If the parse is successful, it returns the leftover bytes which were not used for the parsing. +/// If the parse is unsuccessful, either it returns incomplete or an error with the input as +/// leftover. +#[allow(dead_code)] +pub fn parse_streaming(i: &[u8]) -> (Result>, &[u8]) { + match consumed(parse_derivation).parse(i) { + Ok((_, (rest, derivation))) => { + // invoke validate + if let Err(e) = derivation.validate(true).map_err(Error::Validation) { + return (Err(e), i); + } + + (Ok(derivation), rest) + } + Err(nom::Err::Incomplete(_)) => (Err(Error::Incomplete), i), + Err(nom::Err::Error(e) | nom::Err::Failure(e)) => (Err(e.into()), i), + } +} + /// Consume a string containing the algo, and optionally a `r:` /// prefix, and a digest (bytes), return a [CAHash::Nar] or [CAHash::Flat]. fn from_algo_and_mode_and_digest>( @@ -465,6 +485,20 @@ mod tests { assert_eq!(*expected, parsed); } + #[rstest] + #[case::incomplete_empty(b"[")] + #[case::incomplete_simple(b"[(\"a\",\"1\")")] + #[case::incomplete_complicated_escape(b"[(\"a")] + #[case::incomplete_complicated_sep(b"[(\"a\",")] + #[case::incomplete_complicated_multi_escape(b"[(\"a\",\"")] + #[case::incomplete_complicated_multi_outer_sep(b"[(\"a\",\"b\"),")] + fn parse_kv_incomplete(#[case] input: &'static [u8]) { + assert!(matches!( + super::parse_kv(crate::aterm::parse_bytes_field)(input), + Err(nom::Err::Incomplete(_)) + )); + } + /// Ensures the kv parser complains about duplicate map keys #[test] fn parse_kv_fail_dup_keys() {