refactor(tvix/store): move upload_chunk out of blobwriter
This is useful not only in blobwriter contexts. Change-Id: I4c584b5264ff7b4bb3b1a9671affc39e18bf4ccf Reviewed-on: https://cl.tvl.fyi/c/depot/+/8245 Tested-by: BuildkiteCI Autosubmit: flokli <flokli@flokli.de> Reviewed-by: raitobezarius <tvl@lahfa.xyz>
This commit is contained in:
		
							parent
							
								
									05b2f1ccb4
								
							
						
					
					
						commit
						3506d3bd0e
					
				
					 3 changed files with 35 additions and 27 deletions
				
			
		| 
						 | 
				
			
			@ -1,7 +1,7 @@
 | 
			
		|||
use crate::chunkservice::ChunkService;
 | 
			
		||||
use crate::chunkservice::{upload_chunk, ChunkService};
 | 
			
		||||
use crate::{proto, Error};
 | 
			
		||||
use rayon::prelude::*;
 | 
			
		||||
use tracing::{debug, instrument};
 | 
			
		||||
use tracing::instrument;
 | 
			
		||||
 | 
			
		||||
pub struct BlobWriter<'a, CS: ChunkService> {
 | 
			
		||||
    chunk_service: &'a CS,
 | 
			
		||||
| 
						 | 
				
			
			@ -14,31 +14,6 @@ pub struct BlobWriter<'a, CS: ChunkService> {
 | 
			
		|||
    buf: Vec<u8>,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// upload a chunk to the chunk service, and return its digest (or an error) when done.
 | 
			
		||||
#[instrument(skip_all)]
 | 
			
		||||
fn upload_chunk<CS: ChunkService>(
 | 
			
		||||
    chunk_service: &CS,
 | 
			
		||||
    chunk_data: Vec<u8>,
 | 
			
		||||
) -> Result<Vec<u8>, Error> {
 | 
			
		||||
    let mut hasher = blake3::Hasher::new();
 | 
			
		||||
    // TODO: benchmark this number and factor it out
 | 
			
		||||
    if chunk_data.len() >= 128 * 1024 {
 | 
			
		||||
        hasher.update_rayon(&chunk_data);
 | 
			
		||||
    } else {
 | 
			
		||||
        hasher.update(&chunk_data);
 | 
			
		||||
    }
 | 
			
		||||
    let digest = hasher.finalize();
 | 
			
		||||
 | 
			
		||||
    if chunk_service.has(digest.as_bytes())? {
 | 
			
		||||
        debug!("already has chunk, skipping");
 | 
			
		||||
    }
 | 
			
		||||
    let digest_resp = chunk_service.put(chunk_data)?;
 | 
			
		||||
 | 
			
		||||
    assert_eq!(digest_resp, digest.as_bytes());
 | 
			
		||||
 | 
			
		||||
    Ok(digest.as_bytes().to_vec())
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl<'a, CS: ChunkService> BlobWriter<'a, CS> {
 | 
			
		||||
    pub fn new(chunk_service: &'a CS) -> Self {
 | 
			
		||||
        Self {
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -1,3 +1,5 @@
 | 
			
		|||
mod util;
 | 
			
		||||
 | 
			
		||||
pub mod memory;
 | 
			
		||||
pub mod sled;
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -5,6 +7,7 @@ use crate::Error;
 | 
			
		|||
 | 
			
		||||
pub use self::memory::MemoryChunkService;
 | 
			
		||||
pub use self::sled::SledChunkService;
 | 
			
		||||
pub use self::util::upload_chunk;
 | 
			
		||||
 | 
			
		||||
/// The base trait all ChunkService services need to implement.
 | 
			
		||||
/// It allows checking for the existence, download and upload of chunks.
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
							
								
								
									
										30
									
								
								tvix/store/src/chunkservice/util.rs
									
										
									
									
									
										Normal file
									
								
							
							
						
						
									
										30
									
								
								tvix/store/src/chunkservice/util.rs
									
										
									
									
									
										Normal file
									
								
							| 
						 | 
				
			
			@ -0,0 +1,30 @@
 | 
			
		|||
use tracing::{debug, instrument};
 | 
			
		||||
 | 
			
		||||
use crate::Error;
 | 
			
		||||
 | 
			
		||||
use super::ChunkService;
 | 
			
		||||
 | 
			
		||||
// upload a chunk to the chunk service, and return its digest (or an error) when done.
 | 
			
		||||
#[instrument(skip_all, err)]
 | 
			
		||||
pub fn upload_chunk<CS: ChunkService>(
 | 
			
		||||
    chunk_service: &CS,
 | 
			
		||||
    chunk_data: Vec<u8>,
 | 
			
		||||
) -> Result<Vec<u8>, Error> {
 | 
			
		||||
    let mut hasher = blake3::Hasher::new();
 | 
			
		||||
    // TODO: benchmark this number and factor it out
 | 
			
		||||
    if chunk_data.len() >= 128 * 1024 {
 | 
			
		||||
        hasher.update_rayon(&chunk_data);
 | 
			
		||||
    } else {
 | 
			
		||||
        hasher.update(&chunk_data);
 | 
			
		||||
    }
 | 
			
		||||
    let digest = hasher.finalize();
 | 
			
		||||
 | 
			
		||||
    if chunk_service.has(digest.as_bytes())? {
 | 
			
		||||
        debug!("already has chunk, skipping");
 | 
			
		||||
    }
 | 
			
		||||
    let digest_resp = chunk_service.put(chunk_data)?;
 | 
			
		||||
 | 
			
		||||
    assert_eq!(digest_resp, digest.as_bytes());
 | 
			
		||||
 | 
			
		||||
    Ok(digest.as_bytes().to_vec())
 | 
			
		||||
}
 | 
			
		||||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue