feat(nix-compat/derivation/parser): init streaming variant
While processing `AddToStore` operation, it is possible to receive a framed derivation as a response instead of a NAR. Due to how the wire protocol works, it's not easy to know if we are done with pulling more bytes (i.e. incomplete derivation) or not (i.e. incorrect derivation). To solve this, a streaming parser would propagate the incompleteness of the parse as a valid response and the reader would be requested more bytes until it returns EOF. Change-Id: Icd2b311f03ad68010a9b48e883f6dbee2fbd2c3e Signed-off-by: Raito Bezarius <raito@lix.systems> Reviewed-on: https://cl.snix.dev/c/snix/+/30278 Reviewed-by: Florian Klink <flokli@flokli.de> Tested-by: besadii Autosubmit: Ryan Lahfa <masterancpp@gmail.com>
This commit is contained in:
		
							parent
							
								
									74492e9d6e
								
							
						
					
					
						commit
						beae2cd27f
					
				
					 3 changed files with 100 additions and 5 deletions
				
			
		|  | @ -4,8 +4,8 @@ | ||||||
| //! [ATerm]: http://program-transformation.org/Tools/ATermFormat.html
 | //! [ATerm]: http://program-transformation.org/Tools/ATermFormat.html
 | ||||||
| use bstr::BString; | use bstr::BString; | ||||||
| use nom::branch::alt; | use nom::branch::alt; | ||||||
| use nom::bytes::complete::{escaped_transform, is_not}; | use nom::bytes::streaming::{escaped_transform, is_not}; | ||||||
| use nom::character::complete::char as nomchar; | use nom::character::streaming::char as nomchar; | ||||||
| use nom::combinator::{map_res, opt, value}; | use nom::combinator::{map_res, opt, value}; | ||||||
| use nom::multi::separated_list0; | use nom::multi::separated_list0; | ||||||
| use nom::sequence::delimited; | use nom::sequence::delimited; | ||||||
|  |  | ||||||
|  | @ -280,6 +280,67 @@ impl Derivation { | ||||||
|     } |     } | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | #[cfg(feature = "async")] | ||||||
|  | #[allow(dead_code)] | ||||||
|  | trait DerivationAsyncExt { | ||||||
|  |     /// Parse an Derivation in ATerm serialization, and validate it passes
 | ||||||
|  |     /// our set of validations, from a asynchronous buffered reader.
 | ||||||
|  |     /// This is a streaming variant of [Derivation::from_aterm_bytes].
 | ||||||
|  |     async fn from_streaming_aterm_bytes<R>(reader: R) -> Result<Derivation, parser::Error<Vec<u8>>> | ||||||
|  |     where | ||||||
|  |         R: tokio::io::AsyncBufRead + Unpin + Send; | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | #[cfg(feature = "async")] | ||||||
|  | impl DerivationAsyncExt for Derivation { | ||||||
|  |     async fn from_streaming_aterm_bytes<R>( | ||||||
|  |         mut reader: R, | ||||||
|  |     ) -> Result<Derivation, parser::Error<Vec<u8>>> | ||||||
|  |     where | ||||||
|  |         R: tokio::io::AsyncBufRead + Unpin + Send, | ||||||
|  |     { | ||||||
|  |         use tokio::io::AsyncBufReadExt; | ||||||
|  |         let mut buffer = Vec::new(); | ||||||
|  |         loop { | ||||||
|  |             let rest = reader.fill_buf().await.unwrap(); | ||||||
|  |             let length = rest.len(); | ||||||
|  | 
 | ||||||
|  |             // We reached EOF, we can stop and return incompleteness.
 | ||||||
|  |             if length == 0 { | ||||||
|  |                 return Err(ParserError::Incomplete); | ||||||
|  |             } | ||||||
|  | 
 | ||||||
|  |             buffer.extend_from_slice(rest); | ||||||
|  | 
 | ||||||
|  |             // Parse the so-far internal buffer of reader.
 | ||||||
|  |             match parser::parse_streaming(&buffer) { | ||||||
|  |                 (Err(parser::Error::Incomplete), _) => { | ||||||
|  |                     reader.consume(length); | ||||||
|  |                     continue; | ||||||
|  |                 } | ||||||
|  |                 (Ok(derivation), leftover) => { | ||||||
|  |                     // We cannot inline it in the next call because `reader` is mutably borrowed
 | ||||||
|  |                     // and has a relationship with the lifetime of `leftover`.
 | ||||||
|  |                     let leftover_length = leftover.len(); | ||||||
|  | 
 | ||||||
|  |                     // Well, if we already had consumed the leftovers of the past fetch
 | ||||||
|  |                     // while believing we were just parsing incomplete ATerm, there's nothing
 | ||||||
|  |                     // we can do about it. The protocol is made this way.
 | ||||||
|  |                     if length >= leftover_length { | ||||||
|  |                         // We still have leftover, let's not consume it.
 | ||||||
|  |                         // It's not for us.
 | ||||||
|  |                         reader.consume(length - leftover_length); | ||||||
|  |                     } | ||||||
|  |                     return Ok(derivation); | ||||||
|  |                 } | ||||||
|  |                 (Err(e), _) => { | ||||||
|  |                     return Err(e.into()); | ||||||
|  |                 } | ||||||
|  |             } | ||||||
|  |         } | ||||||
|  |     } | ||||||
|  | } | ||||||
|  | 
 | ||||||
| /// Calculate the name part of the store path of a derivation [Output].
 | /// Calculate the name part of the store path of a derivation [Output].
 | ||||||
| ///
 | ///
 | ||||||
| /// It's the name, and (if it's the non-out output), the output name
 | /// It's the name, and (if it's the non-out output), the output name
 | ||||||
|  |  | ||||||
|  | @ -3,9 +3,9 @@ | ||||||
| //!
 | //!
 | ||||||
| //! [ATerm]: http://program-transformation.org/Tools/ATermFormat.html
 | //! [ATerm]: http://program-transformation.org/Tools/ATermFormat.html
 | ||||||
| 
 | 
 | ||||||
| use nom::bytes::complete::tag; | use nom::bytes::streaming::tag; | ||||||
| use nom::character::complete::char as nomchar; | use nom::character::streaming::char as nomchar; | ||||||
| use nom::combinator::{all_consuming, map_res}; | use nom::combinator::{all_consuming, consumed, map_res}; | ||||||
| use nom::multi::{separated_list0, separated_list1}; | use nom::multi::{separated_list0, separated_list1}; | ||||||
| use nom::sequence::{delimited, preceded, separated_pair, terminated}; | use nom::sequence::{delimited, preceded, separated_pair, terminated}; | ||||||
| use nom::Parser; | use nom::Parser; | ||||||
|  | @ -57,6 +57,26 @@ pub(crate) fn parse(i: &[u8]) -> Result<Derivation, Error<&[u8]>> { | ||||||
|     } |     } | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | /// This parses a derivation in streaming fashion.
 | ||||||
|  | /// If the parse is successful, it returns the leftover bytes which were not used for the parsing.
 | ||||||
|  | /// If the parse is unsuccessful, either it returns incomplete or an error with the input as
 | ||||||
|  | /// leftover.
 | ||||||
|  | #[allow(dead_code)] | ||||||
|  | pub fn parse_streaming(i: &[u8]) -> (Result<Derivation, Error<&[u8]>>, &[u8]) { | ||||||
|  |     match consumed(parse_derivation).parse(i) { | ||||||
|  |         Ok((_, (rest, derivation))) => { | ||||||
|  |             // invoke validate
 | ||||||
|  |             if let Err(e) = derivation.validate(true).map_err(Error::Validation) { | ||||||
|  |                 return (Err(e), i); | ||||||
|  |             } | ||||||
|  | 
 | ||||||
|  |             (Ok(derivation), rest) | ||||||
|  |         } | ||||||
|  |         Err(nom::Err::Incomplete(_)) => (Err(Error::Incomplete), i), | ||||||
|  |         Err(nom::Err::Error(e) | nom::Err::Failure(e)) => (Err(e.into()), i), | ||||||
|  |     } | ||||||
|  | } | ||||||
|  | 
 | ||||||
| /// Consume a string containing the algo, and optionally a `r:`
 | /// Consume a string containing the algo, and optionally a `r:`
 | ||||||
| /// prefix, and a digest (bytes), return a [CAHash::Nar] or [CAHash::Flat].
 | /// prefix, and a digest (bytes), return a [CAHash::Nar] or [CAHash::Flat].
 | ||||||
| fn from_algo_and_mode_and_digest<B: AsRef<[u8]>>( | fn from_algo_and_mode_and_digest<B: AsRef<[u8]>>( | ||||||
|  | @ -465,6 +485,20 @@ mod tests { | ||||||
|         assert_eq!(*expected, parsed); |         assert_eq!(*expected, parsed); | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|  |     #[rstest] | ||||||
|  |     #[case::incomplete_empty(b"[")] | ||||||
|  |     #[case::incomplete_simple(b"[(\"a\",\"1\")")] | ||||||
|  |     #[case::incomplete_complicated_escape(b"[(\"a")] | ||||||
|  |     #[case::incomplete_complicated_sep(b"[(\"a\",")] | ||||||
|  |     #[case::incomplete_complicated_multi_escape(b"[(\"a\",\"")] | ||||||
|  |     #[case::incomplete_complicated_multi_outer_sep(b"[(\"a\",\"b\"),")] | ||||||
|  |     fn parse_kv_incomplete(#[case] input: &'static [u8]) { | ||||||
|  |         assert!(matches!( | ||||||
|  |             super::parse_kv(crate::aterm::parse_bytes_field)(input), | ||||||
|  |             Err(nom::Err::Incomplete(_)) | ||||||
|  |         )); | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|     /// Ensures the kv parser complains about duplicate map keys
 |     /// Ensures the kv parser complains about duplicate map keys
 | ||||||
|     #[test] |     #[test] | ||||||
|     fn parse_kv_fail_dup_keys() { |     fn parse_kv_fail_dup_keys() { | ||||||
|  |  | ||||||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue