feat(tvix/castore/refscan): share the scanner between readers
This changes the only actual state the ReferenceScanner has to use atomic bools, so it no longer requires a mutable borrow for .scan(). This allows passing an immutable borrow of a reference scanner to multiple threads which might be ingesting blobs in parallel, and using them in the ReferenceReader or calling .scan() there. Change-Id: Id5c30bcebb06bf15eae8c4451d70eb806cab722e Reviewed-on: https://cl.tvl.fyi/c/depot/+/12528 Autosubmit: yuka <yuka@yuka.dev> Reviewed-by: flokli <flokli@flokli.de> Tested-by: BuildkiteCI
This commit is contained in:
		
							parent
							
								
									84f4ea5e7c
								
							
						
					
					
						commit
						d277bd9fbf
					
				
					 1 changed files with 30 additions and 39 deletions
				
			
		|  | @ -9,6 +9,7 @@ | ||||||
| use pin_project::pin_project; | use pin_project::pin_project; | ||||||
| use std::collections::BTreeSet; | use std::collections::BTreeSet; | ||||||
| use std::pin::Pin; | use std::pin::Pin; | ||||||
|  | use std::sync::atomic::{AtomicBool, Ordering}; | ||||||
| use std::sync::Arc; | use std::sync::Arc; | ||||||
| use std::task::{ready, Poll}; | use std::task::{ready, Poll}; | ||||||
| use tokio::io::{AsyncBufRead, AsyncRead, ReadBuf}; | use tokio::io::{AsyncBufRead, AsyncRead, ReadBuf}; | ||||||
|  | @ -74,7 +75,7 @@ where | ||||||
| /// of bytes patterns to scan for.
 | /// of bytes patterns to scan for.
 | ||||||
| pub struct ReferenceScanner<P> { | pub struct ReferenceScanner<P> { | ||||||
|     pattern: ReferencePattern<P>, |     pattern: ReferencePattern<P>, | ||||||
|     matches: Vec<bool>, |     matches: Vec<AtomicBool>, | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| impl<P: AsRef<[u8]>> ReferenceScanner<P> { | impl<P: AsRef<[u8]>> ReferenceScanner<P> { | ||||||
|  | @ -82,20 +83,23 @@ impl<P: AsRef<[u8]>> ReferenceScanner<P> { | ||||||
|     /// candidate bytes patterns.
 |     /// candidate bytes patterns.
 | ||||||
|     pub fn new<IP: Into<ReferencePattern<P>>>(pattern: IP) -> Self { |     pub fn new<IP: Into<ReferencePattern<P>>>(pattern: IP) -> Self { | ||||||
|         let pattern = pattern.into(); |         let pattern = pattern.into(); | ||||||
|         let matches = vec![false; pattern.candidates().len()]; |         let mut matches = Vec::new(); | ||||||
|  |         for _ in 0..pattern.candidates().len() { | ||||||
|  |             matches.push(AtomicBool::new(false)); | ||||||
|  |         } | ||||||
|         ReferenceScanner { pattern, matches } |         ReferenceScanner { pattern, matches } | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     /// Scan the given buffer for all non-overlapping matches and collect them
 |     /// Scan the given buffer for all non-overlapping matches and collect them
 | ||||||
|     /// in the scanner.
 |     /// in the scanner.
 | ||||||
|     pub fn scan<S: AsRef<[u8]>>(&mut self, haystack: S) { |     pub fn scan<S: AsRef<[u8]>>(&self, haystack: S) { | ||||||
|         if haystack.as_ref().len() < self.pattern.longest_candidate() { |         if haystack.as_ref().len() < self.pattern.longest_candidate() { | ||||||
|             return; |             return; | ||||||
|         } |         } | ||||||
| 
 | 
 | ||||||
|         if let Some(searcher) = &self.pattern.inner.searcher { |         if let Some(searcher) = &self.pattern.inner.searcher { | ||||||
|             for m in searcher.find(haystack) { |             for m in searcher.find(haystack) { | ||||||
|                 self.matches[m.pat_idx] = true; |                 self.matches[m.pat_idx].store(true, Ordering::Release); | ||||||
|             } |             } | ||||||
|         } |         } | ||||||
|     } |     } | ||||||
|  | @ -104,14 +108,17 @@ impl<P: AsRef<[u8]>> ReferenceScanner<P> { | ||||||
|         &self.pattern |         &self.pattern | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     pub fn matches(&self) -> &[bool] { |     pub fn matches(&self) -> Vec<bool> { | ||||||
|         &self.matches |         self.matches | ||||||
|  |             .iter() | ||||||
|  |             .map(|m| m.load(Ordering::Acquire)) | ||||||
|  |             .collect() | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     pub fn candidate_matches(&self) -> impl Iterator<Item = &P> { |     pub fn candidate_matches(&self) -> impl Iterator<Item = &P> { | ||||||
|         let candidates = self.pattern.candidates(); |         let candidates = self.pattern.candidates(); | ||||||
|         self.matches.iter().enumerate().filter_map(|(idx, found)| { |         self.matches.iter().enumerate().filter_map(|(idx, found)| { | ||||||
|             if *found { |             if found.load(Ordering::Acquire) { | ||||||
|                 Some(&candidates[idx]) |                 Some(&candidates[idx]) | ||||||
|             } else { |             } else { | ||||||
|                 None |                 None | ||||||
|  | @ -130,52 +137,35 @@ impl<P: Clone + Ord + AsRef<[u8]>> ReferenceScanner<P> { | ||||||
| const DEFAULT_BUF_SIZE: usize = 8 * 1024; | const DEFAULT_BUF_SIZE: usize = 8 * 1024; | ||||||
| 
 | 
 | ||||||
| #[pin_project] | #[pin_project] | ||||||
| pub struct ReferenceReader<P, R> { | pub struct ReferenceReader<'a, P, R> { | ||||||
|     scanner: ReferenceScanner<P>, |     scanner: &'a ReferenceScanner<P>, | ||||||
|     buffer: Vec<u8>, |     buffer: Vec<u8>, | ||||||
|     consumed: usize, |     consumed: usize, | ||||||
|     #[pin] |     #[pin] | ||||||
|     reader: R, |     reader: R, | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| impl<P, R> ReferenceReader<P, R> | impl<'a, P, R> ReferenceReader<'a, P, R> | ||||||
| where | where | ||||||
|     P: AsRef<[u8]>, |     P: AsRef<[u8]>, | ||||||
| { | { | ||||||
|     pub fn new(pattern: ReferencePattern<P>, reader: R) -> ReferenceReader<P, R> { |     pub fn new(scanner: &'a ReferenceScanner<P>, reader: R) -> Self { | ||||||
|         Self::with_capacity(DEFAULT_BUF_SIZE, pattern, reader) |         Self::with_capacity(DEFAULT_BUF_SIZE, scanner, reader) | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     pub fn with_capacity( |     pub fn with_capacity(capacity: usize, scanner: &'a ReferenceScanner<P>, reader: R) -> Self { | ||||||
|         capacity: usize, |  | ||||||
|         pattern: ReferencePattern<P>, |  | ||||||
|         reader: R, |  | ||||||
|     ) -> ReferenceReader<P, R> { |  | ||||||
|         // If capacity is not at least as long as longest_candidate we can't do a scan
 |         // If capacity is not at least as long as longest_candidate we can't do a scan
 | ||||||
|         let capacity = capacity.max(pattern.longest_candidate()); |         let capacity = capacity.max(scanner.pattern().longest_candidate()); | ||||||
|         ReferenceReader { |         ReferenceReader { | ||||||
|             scanner: ReferenceScanner::new(pattern), |             scanner, | ||||||
|             buffer: Vec::with_capacity(capacity), |             buffer: Vec::with_capacity(capacity), | ||||||
|             consumed: 0, |             consumed: 0, | ||||||
|             reader, |             reader, | ||||||
|         } |         } | ||||||
|     } |     } | ||||||
| 
 |  | ||||||
|     pub fn scanner(&self) -> &ReferenceScanner<P> { |  | ||||||
|         &self.scanner |  | ||||||
|     } |  | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| impl<P, R> ReferenceReader<P, R> | impl<'a, P, R> AsyncRead for ReferenceReader<'a, P, R> | ||||||
| where |  | ||||||
|     P: Clone + Ord + AsRef<[u8]>, |  | ||||||
| { |  | ||||||
|     pub fn finalise(self) -> BTreeSet<P> { |  | ||||||
|         self.scanner.finalise() |  | ||||||
|     } |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| impl<P, R> AsyncRead for ReferenceReader<P, R> |  | ||||||
| where | where | ||||||
|     R: AsyncRead, |     R: AsyncRead, | ||||||
|     P: AsRef<[u8]>, |     P: AsRef<[u8]>, | ||||||
|  | @ -193,7 +183,7 @@ where | ||||||
|     } |     } | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| impl<P, R> AsyncBufRead for ReferenceReader<P, R> | impl<'a, P, R> AsyncBufRead for ReferenceReader<'a, P, R> | ||||||
| where | where | ||||||
|     R: AsyncRead, |     R: AsyncRead, | ||||||
|     P: AsRef<[u8]>, |     P: AsRef<[u8]>, | ||||||
|  | @ -257,7 +247,7 @@ mod tests { | ||||||
| 
 | 
 | ||||||
|     #[test] |     #[test] | ||||||
|     fn test_no_patterns() { |     fn test_no_patterns() { | ||||||
|         let mut scanner: ReferenceScanner<String> = ReferenceScanner::new(vec![]); |         let scanner: ReferenceScanner<String> = ReferenceScanner::new(vec![]); | ||||||
| 
 | 
 | ||||||
|         scanner.scan(HELLO_DRV); |         scanner.scan(HELLO_DRV); | ||||||
| 
 | 
 | ||||||
|  | @ -268,7 +258,7 @@ mod tests { | ||||||
| 
 | 
 | ||||||
|     #[test] |     #[test] | ||||||
|     fn test_single_match() { |     fn test_single_match() { | ||||||
|         let mut scanner = ReferenceScanner::new(vec![ |         let scanner = ReferenceScanner::new(vec![ | ||||||
|             "/nix/store/4xw8n979xpivdc46a9ndcvyhwgif00hz-bash-5.1-p16".to_string(), |             "/nix/store/4xw8n979xpivdc46a9ndcvyhwgif00hz-bash-5.1-p16".to_string(), | ||||||
|         ]); |         ]); | ||||||
|         scanner.scan(HELLO_DRV); |         scanner.scan(HELLO_DRV); | ||||||
|  | @ -290,7 +280,7 @@ mod tests { | ||||||
|             "/nix/store/fn7zvafq26f0c8b17brs7s95s10ibfzs-emacs-28.2.drv".to_string(), |             "/nix/store/fn7zvafq26f0c8b17brs7s95s10ibfzs-emacs-28.2.drv".to_string(), | ||||||
|         ]; |         ]; | ||||||
| 
 | 
 | ||||||
|         let mut scanner = ReferenceScanner::new(candidates.clone()); |         let scanner = ReferenceScanner::new(candidates.clone()); | ||||||
|         scanner.scan(HELLO_DRV); |         scanner.scan(HELLO_DRV); | ||||||
| 
 | 
 | ||||||
|         let result = scanner.finalise(); |         let result = scanner.finalise(); | ||||||
|  | @ -317,17 +307,18 @@ mod tests { | ||||||
|             "fn7zvafq26f0c8b17brs7s95s10ibfzs", |             "fn7zvafq26f0c8b17brs7s95s10ibfzs", | ||||||
|         ]; |         ]; | ||||||
|         let pattern = ReferencePattern::new(candidates.clone()); |         let pattern = ReferencePattern::new(candidates.clone()); | ||||||
|  |         let scanner = ReferenceScanner::new(pattern); | ||||||
|         let mut mock = Builder::new(); |         let mut mock = Builder::new(); | ||||||
|         for c in HELLO_DRV.as_bytes().chunks(chunk_size) { |         for c in HELLO_DRV.as_bytes().chunks(chunk_size) { | ||||||
|             mock.read(c); |             mock.read(c); | ||||||
|         } |         } | ||||||
|         let mock = mock.build(); |         let mock = mock.build(); | ||||||
|         let mut reader = ReferenceReader::with_capacity(capacity, pattern, mock); |         let mut reader = ReferenceReader::with_capacity(capacity, &scanner, mock); | ||||||
|         let mut s = String::new(); |         let mut s = String::new(); | ||||||
|         reader.read_to_string(&mut s).await.unwrap(); |         reader.read_to_string(&mut s).await.unwrap(); | ||||||
|         assert_eq!(s, HELLO_DRV); |         assert_eq!(s, HELLO_DRV); | ||||||
| 
 | 
 | ||||||
|         let result = reader.finalise(); |         let result = scanner.finalise(); | ||||||
|         assert_eq!(result.len(), 3); |         assert_eq!(result.len(), 3); | ||||||
| 
 | 
 | ||||||
|         for c in candidates[..3].iter() { |         for c in candidates[..3].iter() { | ||||||
|  |  | ||||||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue