refactor(tvix/store): use Box<dyn DirectoryService>
Once we support configuring services at runtime, we don't know what DirectoryService we're using at compile time. This also means, we can't explicitly use the is_closed method from GRPCPutter, without making it part of the DirectoryPutter itself. Change-Id: Icd2a1ec4fc5649a6cd15c9cc7db4c2b473630431 Reviewed-on: https://cl.tvl.fyi/c/depot/+/8727 Autosubmit: flokli <flokli@flokli.de> Reviewed-by: tazjin <tazjin@tvl.su> Tested-by: BuildkiteCI
This commit is contained in:
		
							parent
							
								
									6f85dbfc06
								
							
						
					
					
						commit
						7725eb53ad
					
				
					 18 changed files with 144 additions and 126 deletions
				
			
		| 
						 | 
				
			
			@ -73,12 +73,18 @@ fn interpret(code: &str, path: Option<PathBuf>, args: &Args, explain: bool) -> b
 | 
			
		|||
 | 
			
		||||
    let blob_service = MemoryBlobService::default();
 | 
			
		||||
    let directory_service = MemoryDirectoryService::default();
 | 
			
		||||
    let path_info_service =
 | 
			
		||||
        MemoryPathInfoService::new(Box::new(blob_service.clone()), directory_service.clone());
 | 
			
		||||
    let path_info_service = MemoryPathInfoService::new(
 | 
			
		||||
        Box::new(blob_service.clone()),
 | 
			
		||||
        Box::new(directory_service.clone()),
 | 
			
		||||
    );
 | 
			
		||||
 | 
			
		||||
    eval.io_handle = Box::new(tvix_io::TvixIO::new(
 | 
			
		||||
        known_paths.clone(),
 | 
			
		||||
        tvix_store::TvixStoreIO::new(Box::new(blob_service), directory_service, path_info_service),
 | 
			
		||||
        tvix_store::TvixStoreIO::new(
 | 
			
		||||
            Box::new(blob_service),
 | 
			
		||||
            Box::new(directory_service),
 | 
			
		||||
            path_info_service,
 | 
			
		||||
        ),
 | 
			
		||||
    ));
 | 
			
		||||
 | 
			
		||||
    // bundle fetchurl.nix (used in nixpkgs) by resolving <nix> to
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -9,6 +9,7 @@ use tracing_subscriber::prelude::*;
 | 
			
		|||
use tvix_store::blobservice::BlobService;
 | 
			
		||||
use tvix_store::blobservice::GRPCBlobService;
 | 
			
		||||
use tvix_store::blobservice::SledBlobService;
 | 
			
		||||
use tvix_store::directoryservice::DirectoryService;
 | 
			
		||||
use tvix_store::directoryservice::GRPCDirectoryService;
 | 
			
		||||
use tvix_store::directoryservice::SledDirectoryService;
 | 
			
		||||
use tvix_store::pathinfoservice::GRPCPathInfoService;
 | 
			
		||||
| 
						 | 
				
			
			@ -103,10 +104,12 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
 | 
			
		|||
            let boxed_blob_service: Box<dyn BlobService> = Box::new(blob_service.clone());
 | 
			
		||||
            let boxed_blob_service2: Box<dyn BlobService> = Box::new(blob_service.clone());
 | 
			
		||||
            let directory_service = SledDirectoryService::new("directories.sled".into())?;
 | 
			
		||||
            let boxed_directory_service = Box::new(directory_service.clone());
 | 
			
		||||
            let boxed_directory_service2: Box<dyn DirectoryService> = Box::new(directory_service);
 | 
			
		||||
            let path_info_service = SledPathInfoService::new(
 | 
			
		||||
                "pathinfo.sled".into(),
 | 
			
		||||
                boxed_blob_service,
 | 
			
		||||
                directory_service.clone(),
 | 
			
		||||
                boxed_directory_service,
 | 
			
		||||
            )?;
 | 
			
		||||
 | 
			
		||||
            let listen_address = listen_address
 | 
			
		||||
| 
						 | 
				
			
			@ -122,7 +125,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
 | 
			
		|||
                    boxed_blob_service2,
 | 
			
		||||
                )))
 | 
			
		||||
                .add_service(DirectoryServiceServer::new(
 | 
			
		||||
                    GRPCDirectoryServiceWrapper::from(directory_service),
 | 
			
		||||
                    GRPCDirectoryServiceWrapper::from(boxed_directory_service2),
 | 
			
		||||
                ))
 | 
			
		||||
                .add_service(PathInfoServiceServer::new(
 | 
			
		||||
                    GRPCPathInfoServiceWrapper::from(path_info_service),
 | 
			
		||||
| 
						 | 
				
			
			@ -154,7 +157,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
 | 
			
		|||
 | 
			
		||||
            let io = Arc::new(TvixStoreIO::new(
 | 
			
		||||
                Box::new(blob_service),
 | 
			
		||||
                directory_service,
 | 
			
		||||
                Box::new(directory_service),
 | 
			
		||||
                path_info_service,
 | 
			
		||||
            ));
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -46,8 +46,6 @@ impl GRPCDirectoryService {
 | 
			
		|||
}
 | 
			
		||||
 | 
			
		||||
impl DirectoryService for GRPCDirectoryService {
 | 
			
		||||
    type DirectoriesIterator = StreamIterator;
 | 
			
		||||
 | 
			
		||||
    fn get(&self, digest: &B3Digest) -> Result<Option<crate::proto::Directory>, crate::Error> {
 | 
			
		||||
        // Get a new handle to the gRPC client, and copy the digest.
 | 
			
		||||
        let mut grpc_client = self.grpc_client.clone();
 | 
			
		||||
| 
						 | 
				
			
			@ -113,7 +111,10 @@ impl DirectoryService for GRPCDirectoryService {
 | 
			
		|||
    }
 | 
			
		||||
 | 
			
		||||
    #[instrument(skip_all, fields(directory.digest = %root_directory_digest))]
 | 
			
		||||
    fn get_recursive(&self, root_directory_digest: &B3Digest) -> Self::DirectoriesIterator {
 | 
			
		||||
    fn get_recursive(
 | 
			
		||||
        &self,
 | 
			
		||||
        root_directory_digest: &B3Digest,
 | 
			
		||||
    ) -> Box<dyn Iterator<Item = Result<proto::Directory, Error>> + Send> {
 | 
			
		||||
        let mut grpc_client = self.grpc_client.clone();
 | 
			
		||||
 | 
			
		||||
        let root_directory_digest_as_vec = root_directory_digest.to_vec();
 | 
			
		||||
| 
						 | 
				
			
			@ -132,17 +133,15 @@ impl DirectoryService for GRPCDirectoryService {
 | 
			
		|||
 | 
			
		||||
        let stream = self.tokio_handle.block_on(task).unwrap().unwrap();
 | 
			
		||||
 | 
			
		||||
        StreamIterator::new(
 | 
			
		||||
        Box::new(StreamIterator::new(
 | 
			
		||||
            self.tokio_handle.clone(),
 | 
			
		||||
            root_directory_digest.clone(),
 | 
			
		||||
            stream,
 | 
			
		||||
        )
 | 
			
		||||
        ))
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    type DirectoryPutter = GRPCPutter;
 | 
			
		||||
 | 
			
		||||
    #[instrument(skip_all)]
 | 
			
		||||
    fn put_multiple_start(&self) -> Self::DirectoryPutter
 | 
			
		||||
    fn put_multiple_start(&self) -> Box<(dyn DirectoryPutter + 'static)>
 | 
			
		||||
    where
 | 
			
		||||
        Self: Clone,
 | 
			
		||||
    {
 | 
			
		||||
| 
						 | 
				
			
			@ -160,7 +159,7 @@ impl DirectoryService for GRPCDirectoryService {
 | 
			
		|||
                Ok(s)
 | 
			
		||||
            });
 | 
			
		||||
 | 
			
		||||
        GRPCPutter::new(self.tokio_handle.clone(), tx, task)
 | 
			
		||||
        Box::new(GRPCPutter::new(self.tokio_handle.clone(), tx, task))
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -276,15 +275,6 @@ impl GRPCPutter {
 | 
			
		|||
            rq: Some((task, directory_sender)),
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    #[allow(dead_code)]
 | 
			
		||||
    // allows checking if the tx part of the channel is closed.
 | 
			
		||||
    fn is_closed(&self) -> bool {
 | 
			
		||||
        match self.rq {
 | 
			
		||||
            None => true,
 | 
			
		||||
            Some((_, ref directory_sender)) => directory_sender.is_closed(),
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl DirectoryPutter for GRPCPutter {
 | 
			
		||||
| 
						 | 
				
			
			@ -329,6 +319,14 @@ impl DirectoryPutter for GRPCPutter {
 | 
			
		|||
            }
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    // allows checking if the tx part of the channel is closed.
 | 
			
		||||
    fn is_closed(&self) -> bool {
 | 
			
		||||
        match self.rq {
 | 
			
		||||
            None => true,
 | 
			
		||||
            Some((_, ref directory_sender)) => directory_sender.is_closed(),
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
#[cfg(test)]
 | 
			
		||||
| 
						 | 
				
			
			@ -342,7 +340,7 @@ mod tests {
 | 
			
		|||
    use tonic::transport::{Endpoint, Server, Uri};
 | 
			
		||||
 | 
			
		||||
    use crate::{
 | 
			
		||||
        directoryservice::{DirectoryPutter, DirectoryService},
 | 
			
		||||
        directoryservice::DirectoryService,
 | 
			
		||||
        proto,
 | 
			
		||||
        proto::{directory_service_server::DirectoryServiceServer, GRPCDirectoryServiceWrapper},
 | 
			
		||||
        tests::{
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -4,7 +4,7 @@ use std::sync::{Arc, RwLock};
 | 
			
		|||
use tracing::{instrument, warn};
 | 
			
		||||
 | 
			
		||||
use super::utils::SimplePutter;
 | 
			
		||||
use super::{DirectoryService, DirectoryTraverser};
 | 
			
		||||
use super::{DirectoryPutter, DirectoryService, DirectoryTraverser};
 | 
			
		||||
 | 
			
		||||
#[derive(Clone, Default)]
 | 
			
		||||
pub struct MemoryDirectoryService {
 | 
			
		||||
| 
						 | 
				
			
			@ -12,8 +12,6 @@ pub struct MemoryDirectoryService {
 | 
			
		|||
}
 | 
			
		||||
 | 
			
		||||
impl DirectoryService for MemoryDirectoryService {
 | 
			
		||||
    type DirectoriesIterator = DirectoryTraverser<Self>;
 | 
			
		||||
 | 
			
		||||
    #[instrument(skip(self, digest), fields(directory.digest = %digest))]
 | 
			
		||||
    fn get(&self, digest: &B3Digest) -> Result<Option<proto::Directory>, Error> {
 | 
			
		||||
        let db = self.db.read()?;
 | 
			
		||||
| 
						 | 
				
			
			@ -68,17 +66,21 @@ impl DirectoryService for MemoryDirectoryService {
 | 
			
		|||
    }
 | 
			
		||||
 | 
			
		||||
    #[instrument(skip_all, fields(directory.digest = %root_directory_digest))]
 | 
			
		||||
    fn get_recursive(&self, root_directory_digest: &B3Digest) -> Self::DirectoriesIterator {
 | 
			
		||||
        DirectoryTraverser::with(self.clone(), root_directory_digest)
 | 
			
		||||
    fn get_recursive(
 | 
			
		||||
        &self,
 | 
			
		||||
        root_directory_digest: &B3Digest,
 | 
			
		||||
    ) -> Box<dyn Iterator<Item = Result<proto::Directory, Error>> + Send> {
 | 
			
		||||
        Box::new(DirectoryTraverser::with(
 | 
			
		||||
            self.clone(),
 | 
			
		||||
            root_directory_digest,
 | 
			
		||||
        ))
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    type DirectoryPutter = SimplePutter<Self>;
 | 
			
		||||
 | 
			
		||||
    #[instrument(skip_all)]
 | 
			
		||||
    fn put_multiple_start(&self) -> Self::DirectoryPutter
 | 
			
		||||
    fn put_multiple_start(&self) -> Box<(dyn DirectoryPutter + 'static)>
 | 
			
		||||
    where
 | 
			
		||||
        Self: Clone,
 | 
			
		||||
    {
 | 
			
		||||
        SimplePutter::new(self.clone())
 | 
			
		||||
        Box::new(SimplePutter::new(self.clone()))
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -14,10 +14,7 @@ pub use self::utils::DirectoryTraverser;
 | 
			
		|||
/// The base trait all Directory services need to implement.
 | 
			
		||||
/// This is a simple get and put of [crate::proto::Directory], returning their
 | 
			
		||||
/// digest.
 | 
			
		||||
pub trait DirectoryService {
 | 
			
		||||
    type DirectoriesIterator: Iterator<Item = Result<proto::Directory, Error>> + Send;
 | 
			
		||||
    type DirectoryPutter: DirectoryPutter;
 | 
			
		||||
 | 
			
		||||
pub trait DirectoryService: Send + Sync {
 | 
			
		||||
    /// Get looks up a single Directory message by its digest.
 | 
			
		||||
    /// In case the directory is not found, Ok(None) is returned.
 | 
			
		||||
    fn get(&self, digest: &B3Digest) -> Result<Option<proto::Directory>, Error>;
 | 
			
		||||
| 
						 | 
				
			
			@ -29,11 +26,14 @@ pub trait DirectoryService {
 | 
			
		|||
    /// Ideally this would be a `impl Iterator<Item = Result<proto::Directory, Error>>`,
 | 
			
		||||
    /// and we'd be able to add a default implementation for it here, but
 | 
			
		||||
    /// we can't have that yet.
 | 
			
		||||
    fn get_recursive(&self, root_directory_digest: &B3Digest) -> Self::DirectoriesIterator;
 | 
			
		||||
    fn get_recursive(
 | 
			
		||||
        &self,
 | 
			
		||||
        root_directory_digest: &B3Digest,
 | 
			
		||||
    ) -> Box<dyn Iterator<Item = Result<proto::Directory, Error>> + Send>;
 | 
			
		||||
 | 
			
		||||
    /// Allows persisting a closure of [proto::Directory], which is a graph of
 | 
			
		||||
    /// connected Directory messages.
 | 
			
		||||
    fn put_multiple_start(&self) -> Self::DirectoryPutter;
 | 
			
		||||
    fn put_multiple_start(&self) -> Box<dyn DirectoryPutter>;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
/// Provides a handle to put a closure of connected [proto::Directory] elements.
 | 
			
		||||
| 
						 | 
				
			
			@ -51,4 +51,8 @@ pub trait DirectoryPutter {
 | 
			
		|||
 | 
			
		||||
    /// Close the stream, and wait for any errors.
 | 
			
		||||
    fn close(&mut self) -> Result<B3Digest, Error>;
 | 
			
		||||
 | 
			
		||||
    /// Return whether the stream is closed or not.
 | 
			
		||||
    /// Used from some [DirectoryService] implementations only.
 | 
			
		||||
    fn is_closed(&self) -> bool;
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -1,3 +1,4 @@
 | 
			
		|||
use crate::directoryservice::DirectoryPutter;
 | 
			
		||||
use crate::proto::Directory;
 | 
			
		||||
use crate::{proto, B3Digest, Error};
 | 
			
		||||
use prost::Message;
 | 
			
		||||
| 
						 | 
				
			
			@ -29,8 +30,6 @@ impl SledDirectoryService {
 | 
			
		|||
}
 | 
			
		||||
 | 
			
		||||
impl DirectoryService for SledDirectoryService {
 | 
			
		||||
    type DirectoriesIterator = DirectoryTraverser<Self>;
 | 
			
		||||
 | 
			
		||||
    #[instrument(skip(self, digest), fields(directory.digest = %digest))]
 | 
			
		||||
    fn get(&self, digest: &B3Digest) -> Result<Option<proto::Directory>, Error> {
 | 
			
		||||
        match self.db.get(digest.to_vec()) {
 | 
			
		||||
| 
						 | 
				
			
			@ -91,17 +90,22 @@ impl DirectoryService for SledDirectoryService {
 | 
			
		|||
    }
 | 
			
		||||
 | 
			
		||||
    #[instrument(skip_all, fields(directory.digest = %root_directory_digest))]
 | 
			
		||||
    fn get_recursive(&self, root_directory_digest: &B3Digest) -> Self::DirectoriesIterator {
 | 
			
		||||
        DirectoryTraverser::with(self.clone(), root_directory_digest)
 | 
			
		||||
    fn get_recursive(
 | 
			
		||||
        &self,
 | 
			
		||||
        root_directory_digest: &B3Digest,
 | 
			
		||||
    ) -> Box<(dyn Iterator<Item = Result<proto::Directory, Error>> + std::marker::Send + 'static)>
 | 
			
		||||
    {
 | 
			
		||||
        Box::new(DirectoryTraverser::with(
 | 
			
		||||
            self.clone(),
 | 
			
		||||
            root_directory_digest,
 | 
			
		||||
        ))
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    type DirectoryPutter = SimplePutter<Self>;
 | 
			
		||||
 | 
			
		||||
    #[instrument(skip_all)]
 | 
			
		||||
    fn put_multiple_start(&self) -> Self::DirectoryPutter
 | 
			
		||||
    fn put_multiple_start(&self) -> Box<(dyn DirectoryPutter + 'static)>
 | 
			
		||||
    where
 | 
			
		||||
        Self: Clone,
 | 
			
		||||
    {
 | 
			
		||||
        SimplePutter::new(self.clone())
 | 
			
		||||
        Box::new(SimplePutter::new(self.clone()))
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -10,8 +10,8 @@ use tracing::{instrument, warn};
 | 
			
		|||
/// TODO: the name of this function (and mod) is a bit bad, because it doesn't
 | 
			
		||||
/// clearly distinguish it from the BFS traversers.
 | 
			
		||||
#[instrument(skip(directory_service))]
 | 
			
		||||
pub fn traverse_to<DS: DirectoryService>(
 | 
			
		||||
    directory_service: &DS,
 | 
			
		||||
pub fn traverse_to(
 | 
			
		||||
    directory_service: &Box<dyn DirectoryService>,
 | 
			
		||||
    node: crate::proto::node::Node,
 | 
			
		||||
    path: &std::path::Path,
 | 
			
		||||
) -> Result<Option<crate::proto::node::Node>, Error> {
 | 
			
		||||
| 
						 | 
				
			
			@ -82,13 +82,9 @@ pub fn traverse_to<DS: DirectoryService>(
 | 
			
		|||
mod tests {
 | 
			
		||||
    use std::path::PathBuf;
 | 
			
		||||
 | 
			
		||||
    use crate::{
 | 
			
		||||
        directoryservice::DirectoryPutter,
 | 
			
		||||
        directoryservice::DirectoryService,
 | 
			
		||||
        tests::{
 | 
			
		||||
    use crate::tests::{
 | 
			
		||||
        fixtures::{DIRECTORY_COMPLICATED, DIRECTORY_WITH_KEEP},
 | 
			
		||||
        utils::gen_directory_service,
 | 
			
		||||
        },
 | 
			
		||||
    };
 | 
			
		||||
 | 
			
		||||
    use super::traverse_to;
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -107,12 +107,14 @@ impl<DS: DirectoryService> Iterator for DirectoryTraverser<DS> {
 | 
			
		|||
pub struct SimplePutter<DS: DirectoryService> {
 | 
			
		||||
    directory_service: DS,
 | 
			
		||||
    last_directory_digest: Option<B3Digest>,
 | 
			
		||||
    closed: bool,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl<DS: DirectoryService> SimplePutter<DS> {
 | 
			
		||||
    pub fn new(directory_service: DS) -> Self {
 | 
			
		||||
        Self {
 | 
			
		||||
            directory_service,
 | 
			
		||||
            closed: false,
 | 
			
		||||
            last_directory_digest: None,
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
| 
						 | 
				
			
			@ -120,6 +122,10 @@ impl<DS: DirectoryService> SimplePutter<DS> {
 | 
			
		|||
 | 
			
		||||
impl<DS: DirectoryService> DirectoryPutter for SimplePutter<DS> {
 | 
			
		||||
    fn put(&mut self, directory: proto::Directory) -> Result<(), Error> {
 | 
			
		||||
        if self.closed {
 | 
			
		||||
            return Err(Error::StorageError("already closed".to_string()));
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        let digest = self.directory_service.put(directory)?;
 | 
			
		||||
 | 
			
		||||
        // track the last directory digest
 | 
			
		||||
| 
						 | 
				
			
			@ -130,11 +136,22 @@ impl<DS: DirectoryService> DirectoryPutter for SimplePutter<DS> {
 | 
			
		|||
 | 
			
		||||
    /// We need to be mutable here, as that's the signature of the trait.
 | 
			
		||||
    fn close(&mut self) -> Result<B3Digest, Error> {
 | 
			
		||||
        if self.closed {
 | 
			
		||||
            return Err(Error::StorageError("already closed".to_string()));
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        match &self.last_directory_digest {
 | 
			
		||||
            Some(last_digest) => Ok(last_digest.clone()),
 | 
			
		||||
            Some(last_digest) => {
 | 
			
		||||
                self.closed = true;
 | 
			
		||||
                Ok(last_digest.clone())
 | 
			
		||||
            }
 | 
			
		||||
            None => Err(Error::InvalidRequest(
 | 
			
		||||
                "no directories sent, can't show root digest".to_string(),
 | 
			
		||||
            )),
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    fn is_closed(&self) -> bool {
 | 
			
		||||
        self.closed
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -56,9 +56,9 @@ impl From<super::Error> for Error {
 | 
			
		|||
//
 | 
			
		||||
// It assumes the caller adds returned nodes to the directories it assembles.
 | 
			
		||||
#[instrument(skip_all, fields(entry.file_type=?&entry.file_type(),entry.path=?entry.path()))]
 | 
			
		||||
fn process_entry<DP: DirectoryPutter>(
 | 
			
		||||
fn process_entry(
 | 
			
		||||
    blob_service: &Box<dyn BlobService>,
 | 
			
		||||
    directory_putter: &mut DP,
 | 
			
		||||
    directory_putter: &mut Box<dyn DirectoryPutter>,
 | 
			
		||||
    entry: &walkdir::DirEntry,
 | 
			
		||||
    maybe_directory: Option<proto::Directory>,
 | 
			
		||||
) -> Result<proto::node::Node, Error> {
 | 
			
		||||
| 
						 | 
				
			
			@ -145,9 +145,9 @@ fn process_entry<DP: DirectoryPutter>(
 | 
			
		|||
/// possibly register it somewhere (and potentially rename it based on some
 | 
			
		||||
/// naming scheme.
 | 
			
		||||
#[instrument(skip(blob_service, directory_service), fields(path=?p))]
 | 
			
		||||
pub fn ingest_path<DS: DirectoryService, P: AsRef<Path> + Debug>(
 | 
			
		||||
pub fn ingest_path<P: AsRef<Path> + Debug>(
 | 
			
		||||
    blob_service: &Box<dyn BlobService>,
 | 
			
		||||
    directory_service: &DS,
 | 
			
		||||
    directory_service: &Box<dyn DirectoryService>,
 | 
			
		||||
    p: P,
 | 
			
		||||
) -> Result<proto::node::Node, Error> {
 | 
			
		||||
    // Probe if the path points to a symlink. If it does, we process it manually,
 | 
			
		||||
| 
						 | 
				
			
			@ -174,6 +174,7 @@ pub fn ingest_path<DS: DirectoryService, P: AsRef<Path> + Debug>(
 | 
			
		|||
 | 
			
		||||
    let mut directories: HashMap<PathBuf, proto::Directory> = HashMap::default();
 | 
			
		||||
 | 
			
		||||
    // TODO: pass this one instead?
 | 
			
		||||
    let mut directory_putter = directory_service.put_multiple_start();
 | 
			
		||||
 | 
			
		||||
    for entry in WalkDir::new(p)
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -13,10 +13,10 @@ use tracing::warn;
 | 
			
		|||
 | 
			
		||||
/// Invoke [render_nar], and return the size and sha256 digest of the produced
 | 
			
		||||
/// NAR output.
 | 
			
		||||
pub fn calculate_size_and_sha256<DS: DirectoryService + Clone>(
 | 
			
		||||
pub fn calculate_size_and_sha256(
 | 
			
		||||
    root_node: &proto::node::Node,
 | 
			
		||||
    blob_service: &Box<dyn BlobService>,
 | 
			
		||||
    directory_service: DS,
 | 
			
		||||
    directory_service: &Box<dyn DirectoryService>,
 | 
			
		||||
) -> Result<(u64, [u8; 32]), RenderError> {
 | 
			
		||||
    let h = Sha256::new();
 | 
			
		||||
    let mut cw = CountWrite::from(h);
 | 
			
		||||
| 
						 | 
				
			
			@ -30,11 +30,11 @@ pub fn calculate_size_and_sha256<DS: DirectoryService + Clone>(
 | 
			
		|||
/// and uses the passed blob_service and directory_service to
 | 
			
		||||
/// perform the necessary lookups as it traverses the structure.
 | 
			
		||||
/// The contents in NAR serialization are writen to the passed [std::io::Write].
 | 
			
		||||
pub fn write_nar<W: std::io::Write, DS: DirectoryService + Clone>(
 | 
			
		||||
pub fn write_nar<W: std::io::Write>(
 | 
			
		||||
    w: &mut W,
 | 
			
		||||
    proto_root_node: &proto::node::Node,
 | 
			
		||||
    blob_service: &Box<dyn BlobService>,
 | 
			
		||||
    directory_service: DS,
 | 
			
		||||
    directory_service: &Box<dyn DirectoryService>,
 | 
			
		||||
) -> Result<(), RenderError> {
 | 
			
		||||
    // Initialize NAR writer
 | 
			
		||||
    let nar_root_node = nar::writer::open(w).map_err(RenderError::NARWriterError)?;
 | 
			
		||||
| 
						 | 
				
			
			@ -49,11 +49,11 @@ pub fn write_nar<W: std::io::Write, DS: DirectoryService + Clone>(
 | 
			
		|||
 | 
			
		||||
/// Process an intermediate node in the structure.
 | 
			
		||||
/// This consumes the node.
 | 
			
		||||
fn walk_node<DS: DirectoryService + Clone>(
 | 
			
		||||
fn walk_node(
 | 
			
		||||
    nar_node: nar::writer::Node,
 | 
			
		||||
    proto_node: &proto::node::Node,
 | 
			
		||||
    blob_service: &Box<dyn BlobService>,
 | 
			
		||||
    directory_service: DS,
 | 
			
		||||
    directory_service: &Box<dyn DirectoryService>,
 | 
			
		||||
) -> Result<(), RenderError> {
 | 
			
		||||
    match proto_node {
 | 
			
		||||
        proto::node::Node::Symlink(proto_symlink_node) => {
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -8,15 +8,18 @@ use std::{
 | 
			
		|||
    sync::{Arc, RwLock},
 | 
			
		||||
};
 | 
			
		||||
 | 
			
		||||
pub struct MemoryPathInfoService<DS: DirectoryService> {
 | 
			
		||||
pub struct MemoryPathInfoService {
 | 
			
		||||
    db: Arc<RwLock<HashMap<[u8; 20], proto::PathInfo>>>,
 | 
			
		||||
 | 
			
		||||
    blob_service: Box<dyn BlobService>,
 | 
			
		||||
    directory_service: DS,
 | 
			
		||||
    directory_service: Box<dyn DirectoryService>,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl<DS: DirectoryService> MemoryPathInfoService<DS> {
 | 
			
		||||
    pub fn new(blob_service: Box<dyn BlobService>, directory_service: DS) -> Self {
 | 
			
		||||
impl MemoryPathInfoService {
 | 
			
		||||
    pub fn new(
 | 
			
		||||
        blob_service: Box<dyn BlobService>,
 | 
			
		||||
        directory_service: Box<dyn DirectoryService>,
 | 
			
		||||
    ) -> Self {
 | 
			
		||||
        Self {
 | 
			
		||||
            db: Default::default(),
 | 
			
		||||
            blob_service,
 | 
			
		||||
| 
						 | 
				
			
			@ -25,7 +28,7 @@ impl<DS: DirectoryService> MemoryPathInfoService<DS> {
 | 
			
		|||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl<DS: DirectoryService + Clone> PathInfoService for MemoryPathInfoService<DS> {
 | 
			
		||||
impl PathInfoService for MemoryPathInfoService {
 | 
			
		||||
    fn get(&self, digest: [u8; 20]) -> Result<Option<proto::PathInfo>, Error> {
 | 
			
		||||
        let db = self.db.read().unwrap();
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -55,11 +58,7 @@ impl<DS: DirectoryService + Clone> PathInfoService for MemoryPathInfoService<DS>
 | 
			
		|||
    }
 | 
			
		||||
 | 
			
		||||
    fn calculate_nar(&self, root_node: &proto::node::Node) -> Result<(u64, [u8; 32]), Error> {
 | 
			
		||||
        calculate_size_and_sha256(
 | 
			
		||||
            root_node,
 | 
			
		||||
            &self.blob_service,
 | 
			
		||||
            self.directory_service.clone(),
 | 
			
		||||
        )
 | 
			
		||||
        calculate_size_and_sha256(root_node, &self.blob_service, &self.directory_service)
 | 
			
		||||
            .map_err(|e| Error::StorageError(e.to_string()))
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -11,18 +11,18 @@ use tracing::warn;
 | 
			
		|||
///
 | 
			
		||||
/// The PathInfo messages are stored as encoded protos, and keyed by their output hash,
 | 
			
		||||
/// as that's currently the only request type available.
 | 
			
		||||
pub struct SledPathInfoService<DS: DirectoryService> {
 | 
			
		||||
pub struct SledPathInfoService {
 | 
			
		||||
    db: sled::Db,
 | 
			
		||||
 | 
			
		||||
    blob_service: Box<dyn BlobService>,
 | 
			
		||||
    directory_service: DS,
 | 
			
		||||
    directory_service: Box<dyn DirectoryService>,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl<DS: DirectoryService> SledPathInfoService<DS> {
 | 
			
		||||
impl SledPathInfoService {
 | 
			
		||||
    pub fn new(
 | 
			
		||||
        p: PathBuf,
 | 
			
		||||
        blob_service: Box<dyn BlobService>,
 | 
			
		||||
        directory_service: DS,
 | 
			
		||||
        directory_service: Box<dyn DirectoryService>,
 | 
			
		||||
    ) -> Result<Self, sled::Error> {
 | 
			
		||||
        let config = sled::Config::default().use_compression(true).path(p);
 | 
			
		||||
        let db = config.open()?;
 | 
			
		||||
| 
						 | 
				
			
			@ -36,7 +36,7 @@ impl<DS: DirectoryService> SledPathInfoService<DS> {
 | 
			
		|||
 | 
			
		||||
    pub fn new_temporary(
 | 
			
		||||
        blob_service: Box<dyn BlobService>,
 | 
			
		||||
        directory_service: DS,
 | 
			
		||||
        directory_service: Box<dyn DirectoryService>,
 | 
			
		||||
    ) -> Result<Self, sled::Error> {
 | 
			
		||||
        let config = sled::Config::default().temporary(true);
 | 
			
		||||
        let db = config.open()?;
 | 
			
		||||
| 
						 | 
				
			
			@ -49,7 +49,7 @@ impl<DS: DirectoryService> SledPathInfoService<DS> {
 | 
			
		|||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl<DS: DirectoryService + Clone> PathInfoService for SledPathInfoService<DS> {
 | 
			
		||||
impl PathInfoService for SledPathInfoService {
 | 
			
		||||
    fn get(&self, digest: [u8; 20]) -> Result<Option<proto::PathInfo>, Error> {
 | 
			
		||||
        match self.db.get(digest) {
 | 
			
		||||
            Ok(None) => Ok(None),
 | 
			
		||||
| 
						 | 
				
			
			@ -95,11 +95,7 @@ impl<DS: DirectoryService + Clone> PathInfoService for SledPathInfoService<DS> {
 | 
			
		|||
    }
 | 
			
		||||
 | 
			
		||||
    fn calculate_nar(&self, root_node: &proto::node::Node) -> Result<(u64, [u8; 32]), Error> {
 | 
			
		||||
        calculate_size_and_sha256(
 | 
			
		||||
            root_node,
 | 
			
		||||
            &self.blob_service,
 | 
			
		||||
            self.directory_service.clone(),
 | 
			
		||||
        )
 | 
			
		||||
        calculate_size_and_sha256(root_node, &self.blob_service, &self.directory_service)
 | 
			
		||||
            .map_err(|e| Error::StorageError(e.to_string()))
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -1,27 +1,26 @@
 | 
			
		|||
use crate::proto;
 | 
			
		||||
use crate::{directoryservice::DirectoryService, B3Digest};
 | 
			
		||||
use std::collections::HashMap;
 | 
			
		||||
use std::sync::Arc;
 | 
			
		||||
use tokio::{sync::mpsc::channel, task};
 | 
			
		||||
use tokio_stream::wrappers::ReceiverStream;
 | 
			
		||||
use tonic::{async_trait, Request, Response, Status, Streaming};
 | 
			
		||||
use tracing::{debug, instrument, warn};
 | 
			
		||||
 | 
			
		||||
pub struct GRPCDirectoryServiceWrapper<C: DirectoryService> {
 | 
			
		||||
    directory_service: C,
 | 
			
		||||
pub struct GRPCDirectoryServiceWrapper {
 | 
			
		||||
    directory_service: Arc<Box<dyn DirectoryService>>,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl<DS: DirectoryService> From<DS> for GRPCDirectoryServiceWrapper<DS> {
 | 
			
		||||
    fn from(value: DS) -> Self {
 | 
			
		||||
impl From<Box<dyn DirectoryService>> for GRPCDirectoryServiceWrapper {
 | 
			
		||||
    fn from(value: Box<dyn DirectoryService>) -> Self {
 | 
			
		||||
        Self {
 | 
			
		||||
            directory_service: value,
 | 
			
		||||
            directory_service: Arc::new(value),
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
#[async_trait]
 | 
			
		||||
impl<DS: DirectoryService + Send + Sync + Clone + 'static>
 | 
			
		||||
    proto::directory_service_server::DirectoryService for GRPCDirectoryServiceWrapper<DS>
 | 
			
		||||
{
 | 
			
		||||
impl proto::directory_service_server::DirectoryService for GRPCDirectoryServiceWrapper {
 | 
			
		||||
    type GetStream = ReceiverStream<tonic::Result<proto::Directory, Status>>;
 | 
			
		||||
 | 
			
		||||
    #[instrument(skip(self))]
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -1,4 +1,3 @@
 | 
			
		|||
use crate::directoryservice::DirectoryService;
 | 
			
		||||
use crate::proto::directory_service_server::DirectoryService as GRPCDirectoryService;
 | 
			
		||||
use crate::proto::get_directory_request::ByWhat;
 | 
			
		||||
use crate::proto::{Directory, DirectoryNode, SymlinkNode};
 | 
			
		||||
| 
						 | 
				
			
			@ -8,8 +7,7 @@ use crate::tests::utils::gen_directory_service;
 | 
			
		|||
use tokio_stream::StreamExt;
 | 
			
		||||
use tonic::Status;
 | 
			
		||||
 | 
			
		||||
fn gen_grpc_service(
 | 
			
		||||
) -> GRPCDirectoryServiceWrapper<impl DirectoryService + Send + Sync + Clone + 'static> {
 | 
			
		||||
fn gen_grpc_service() -> GRPCDirectoryServiceWrapper {
 | 
			
		||||
    let directory_service = gen_directory_service();
 | 
			
		||||
    GRPCDirectoryServiceWrapper::from(directory_service)
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -29,17 +29,17 @@ use crate::{
 | 
			
		|||
/// This is to both cover cases of syntactically valid store paths, that exist
 | 
			
		||||
/// on the filesystem (still managed by Nix), as well as being able to read
 | 
			
		||||
/// files outside store paths.
 | 
			
		||||
pub struct TvixStoreIO<DS: DirectoryService, PS: PathInfoService> {
 | 
			
		||||
pub struct TvixStoreIO<PS: PathInfoService> {
 | 
			
		||||
    blob_service: Box<dyn BlobService>,
 | 
			
		||||
    directory_service: DS,
 | 
			
		||||
    directory_service: Box<dyn DirectoryService>,
 | 
			
		||||
    path_info_service: PS,
 | 
			
		||||
    std_io: StdIO,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl<DS: DirectoryService + Clone, PS: PathInfoService> TvixStoreIO<DS, PS> {
 | 
			
		||||
impl<PS: PathInfoService> TvixStoreIO<PS> {
 | 
			
		||||
    pub fn new(
 | 
			
		||||
        blob_service: Box<dyn BlobService>,
 | 
			
		||||
        directory_service: DS,
 | 
			
		||||
        directory_service: Box<dyn DirectoryService>,
 | 
			
		||||
        path_info_service: PS,
 | 
			
		||||
    ) -> Self {
 | 
			
		||||
        Self {
 | 
			
		||||
| 
						 | 
				
			
			@ -104,11 +104,8 @@ impl<DS: DirectoryService + Clone, PS: PathInfoService> TvixStoreIO<DS, PS> {
 | 
			
		|||
            .expect("error during import_path");
 | 
			
		||||
 | 
			
		||||
        // Render the NAR
 | 
			
		||||
        let (nar_size, nar_sha256) = calculate_size_and_sha256(
 | 
			
		||||
            &root_node,
 | 
			
		||||
            &self.blob_service,
 | 
			
		||||
            self.directory_service.clone(),
 | 
			
		||||
        )
 | 
			
		||||
        let (nar_size, nar_sha256) =
 | 
			
		||||
            calculate_size_and_sha256(&root_node, &self.blob_service, &self.directory_service)
 | 
			
		||||
                .expect("error during nar calculation"); // TODO: handle error
 | 
			
		||||
 | 
			
		||||
        // For given NAR sha256 digest and name, return the new [StorePath] this would have.
 | 
			
		||||
| 
						 | 
				
			
			@ -175,7 +172,7 @@ fn calculate_nar_based_store_path(nar_sha256_digest: &[u8; 32], name: &str) -> S
 | 
			
		|||
    build_regular_ca_path(name, &nar_hash_with_mode, Vec::<String>::new(), false).unwrap()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl<DS: DirectoryService + Clone, PS: PathInfoService> EvalIO for TvixStoreIO<DS, PS> {
 | 
			
		||||
impl<PS: PathInfoService> EvalIO for TvixStoreIO<PS> {
 | 
			
		||||
    #[instrument(skip(self), ret, err)]
 | 
			
		||||
    fn path_exists(&self, path: &Path) -> Result<bool, io::Error> {
 | 
			
		||||
        if let Ok((store_path, sub_path)) =
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -1,5 +1,4 @@
 | 
			
		|||
use super::utils::{gen_blob_service, gen_directory_service};
 | 
			
		||||
use crate::directoryservice::DirectoryService;
 | 
			
		||||
use crate::import::ingest_path;
 | 
			
		||||
use crate::proto;
 | 
			
		||||
use crate::tests::fixtures::DIRECTORY_COMPLICATED;
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -1,4 +1,3 @@
 | 
			
		|||
use crate::directoryservice::DirectoryService;
 | 
			
		||||
use crate::nar::calculate_size_and_sha256;
 | 
			
		||||
use crate::nar::write_nar;
 | 
			
		||||
use crate::proto::DirectoryNode;
 | 
			
		||||
| 
						 | 
				
			
			@ -21,7 +20,7 @@ fn single_symlink() {
 | 
			
		|||
        }),
 | 
			
		||||
        // don't put anything in the stores, as we don't actually do any requests.
 | 
			
		||||
        &gen_blob_service(),
 | 
			
		||||
        gen_directory_service(),
 | 
			
		||||
        &gen_directory_service(),
 | 
			
		||||
    )
 | 
			
		||||
    .expect("must succeed");
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -43,7 +42,7 @@ fn single_file_missing_blob() {
 | 
			
		|||
        }),
 | 
			
		||||
        // the blobservice is empty intentionally, to provoke the error.
 | 
			
		||||
        &gen_blob_service(),
 | 
			
		||||
        gen_directory_service(),
 | 
			
		||||
        &gen_directory_service(),
 | 
			
		||||
    )
 | 
			
		||||
    .expect_err("must fail");
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -83,7 +82,7 @@ fn single_file_wrong_blob_size() {
 | 
			
		|||
                executable: false,
 | 
			
		||||
            }),
 | 
			
		||||
            &blob_service,
 | 
			
		||||
            gen_directory_service(),
 | 
			
		||||
            &gen_directory_service(),
 | 
			
		||||
        )
 | 
			
		||||
        .expect_err("must fail");
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -108,7 +107,7 @@ fn single_file_wrong_blob_size() {
 | 
			
		|||
                executable: false,
 | 
			
		||||
            }),
 | 
			
		||||
            &blob_service,
 | 
			
		||||
            gen_directory_service(),
 | 
			
		||||
            &gen_directory_service(),
 | 
			
		||||
        )
 | 
			
		||||
        .expect_err("must fail");
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -145,7 +144,7 @@ fn single_file() {
 | 
			
		|||
            executable: false,
 | 
			
		||||
        }),
 | 
			
		||||
        &blob_service,
 | 
			
		||||
        gen_directory_service(),
 | 
			
		||||
        &gen_directory_service(),
 | 
			
		||||
    )
 | 
			
		||||
    .expect("must succeed");
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -182,7 +181,7 @@ fn test_complicated() {
 | 
			
		|||
            size: DIRECTORY_COMPLICATED.size(),
 | 
			
		||||
        }),
 | 
			
		||||
        &blob_service,
 | 
			
		||||
        directory_service.clone(),
 | 
			
		||||
        &directory_service,
 | 
			
		||||
    )
 | 
			
		||||
    .expect("must succeed");
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -196,7 +195,7 @@ fn test_complicated() {
 | 
			
		|||
            size: DIRECTORY_COMPLICATED.size(),
 | 
			
		||||
        }),
 | 
			
		||||
        &blob_service,
 | 
			
		||||
        directory_service,
 | 
			
		||||
        &directory_service,
 | 
			
		||||
    )
 | 
			
		||||
    .expect("must succeed");
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -8,13 +8,13 @@ pub fn gen_blob_service() -> Box<dyn BlobService> {
 | 
			
		|||
    Box::new(MemoryBlobService::default())
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
pub fn gen_directory_service() -> impl DirectoryService + Send + Sync + Clone + 'static {
 | 
			
		||||
    MemoryDirectoryService::default()
 | 
			
		||||
pub fn gen_directory_service() -> Box<dyn DirectoryService> {
 | 
			
		||||
    Box::new(MemoryDirectoryService::default())
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
pub fn gen_pathinfo_service<DS: DirectoryService + Clone>(
 | 
			
		||||
pub fn gen_pathinfo_service(
 | 
			
		||||
    blob_service: Box<dyn BlobService>,
 | 
			
		||||
    directory_service: DS,
 | 
			
		||||
    directory_service: Box<dyn DirectoryService>,
 | 
			
		||||
) -> impl PathInfoService {
 | 
			
		||||
    MemoryPathInfoService::new(blob_service, directory_service)
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue