refactor(tvix/store/pathinfoservice): make more generic
We don't need Arcs in most of the cases, we're fine with some container. Change-Id: Ic4f8acb5b9d93e2b0923bb607463fb91e9d0e4fe Reviewed-on: https://cl.tvl.fyi/c/depot/+/10606 Autosubmit: flokli <flokli@flokli.de> Tested-by: BuildkiteCI Reviewed-by: raitobezarius <tvl@lahfa.xyz>
This commit is contained in:
		
							parent
							
								
									7d51193f7d
								
							
						
					
					
						commit
						b59df53774
					
				
					 6 changed files with 51 additions and 54 deletions
				
			
		| 
						 | 
					@ -159,8 +159,8 @@ mod tests {
 | 
				
			||||||
            let router = server.add_service(
 | 
					            let router = server.add_service(
 | 
				
			||||||
                crate::proto::path_info_service_server::PathInfoServiceServer::new(
 | 
					                crate::proto::path_info_service_server::PathInfoServiceServer::new(
 | 
				
			||||||
                    GRPCPathInfoServiceWrapper::new(Box::new(MemoryPathInfoService::new(
 | 
					                    GRPCPathInfoServiceWrapper::new(Box::new(MemoryPathInfoService::new(
 | 
				
			||||||
                        gen_blob_service().into(),
 | 
					                        gen_blob_service(),
 | 
				
			||||||
                        gen_directory_service().into(),
 | 
					                        gen_directory_service(),
 | 
				
			||||||
                    ))
 | 
					                    ))
 | 
				
			||||||
                        as Box<dyn PathInfoService>),
 | 
					                        as Box<dyn PathInfoService>),
 | 
				
			||||||
                ),
 | 
					                ),
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -11,18 +11,15 @@ use tvix_castore::proto as castorepb;
 | 
				
			||||||
use tvix_castore::Error;
 | 
					use tvix_castore::Error;
 | 
				
			||||||
use tvix_castore::{blobservice::BlobService, directoryservice::DirectoryService};
 | 
					use tvix_castore::{blobservice::BlobService, directoryservice::DirectoryService};
 | 
				
			||||||
 | 
					
 | 
				
			||||||
pub struct MemoryPathInfoService {
 | 
					pub struct MemoryPathInfoService<BS, DS> {
 | 
				
			||||||
    db: Arc<RwLock<HashMap<[u8; 20], PathInfo>>>,
 | 
					    db: Arc<RwLock<HashMap<[u8; 20], PathInfo>>>,
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    blob_service: Arc<dyn BlobService>,
 | 
					    blob_service: BS,
 | 
				
			||||||
    directory_service: Arc<dyn DirectoryService>,
 | 
					    directory_service: DS,
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
impl MemoryPathInfoService {
 | 
					impl<BS, DS> MemoryPathInfoService<BS, DS> {
 | 
				
			||||||
    pub fn new(
 | 
					    pub fn new(blob_service: BS, directory_service: DS) -> Self {
 | 
				
			||||||
        blob_service: Arc<dyn BlobService>,
 | 
					 | 
				
			||||||
        directory_service: Arc<dyn DirectoryService>,
 | 
					 | 
				
			||||||
    ) -> Self {
 | 
					 | 
				
			||||||
        Self {
 | 
					        Self {
 | 
				
			||||||
            db: Default::default(),
 | 
					            db: Default::default(),
 | 
				
			||||||
            blob_service,
 | 
					            blob_service,
 | 
				
			||||||
| 
						 | 
					@ -32,7 +29,11 @@ impl MemoryPathInfoService {
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
#[async_trait]
 | 
					#[async_trait]
 | 
				
			||||||
impl PathInfoService for MemoryPathInfoService {
 | 
					impl<BS, DS> PathInfoService for MemoryPathInfoService<BS, DS>
 | 
				
			||||||
 | 
					where
 | 
				
			||||||
 | 
					    BS: AsRef<dyn BlobService> + Send + Sync,
 | 
				
			||||||
 | 
					    DS: AsRef<dyn DirectoryService> + Send + Sync,
 | 
				
			||||||
 | 
					{
 | 
				
			||||||
    async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, Error> {
 | 
					    async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, Error> {
 | 
				
			||||||
        let db = self.db.read().unwrap();
 | 
					        let db = self.db.read().unwrap();
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -65,13 +66,9 @@ impl PathInfoService for MemoryPathInfoService {
 | 
				
			||||||
        &self,
 | 
					        &self,
 | 
				
			||||||
        root_node: &castorepb::node::Node,
 | 
					        root_node: &castorepb::node::Node,
 | 
				
			||||||
    ) -> Result<(u64, [u8; 32]), Error> {
 | 
					    ) -> Result<(u64, [u8; 32]), Error> {
 | 
				
			||||||
        calculate_size_and_sha256(
 | 
					        calculate_size_and_sha256(root_node, &self.blob_service, &self.directory_service)
 | 
				
			||||||
            root_node,
 | 
					            .await
 | 
				
			||||||
            self.blob_service.clone(),
 | 
					            .map_err(|e| Error::StorageError(e.to_string()))
 | 
				
			||||||
            self.directory_service.clone(),
 | 
					 | 
				
			||||||
        )
 | 
					 | 
				
			||||||
        .await
 | 
					 | 
				
			||||||
        .map_err(|e| Error::StorageError(e.to_string()))
 | 
					 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    fn list(&self) -> Pin<Box<dyn Stream<Item = Result<PathInfo, Error>> + Send>> {
 | 
					    fn list(&self) -> Pin<Box<dyn Stream<Item = Result<PathInfo, Error>> + Send>> {
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -1,7 +1,6 @@
 | 
				
			||||||
use std::{
 | 
					use std::{
 | 
				
			||||||
    io::{self, BufRead, Read, Write},
 | 
					    io::{self, BufRead, Read, Write},
 | 
				
			||||||
    pin::Pin,
 | 
					    pin::Pin,
 | 
				
			||||||
    sync::Arc,
 | 
					 | 
				
			||||||
};
 | 
					};
 | 
				
			||||||
 | 
					
 | 
				
			||||||
use data_encoding::BASE64;
 | 
					use data_encoding::BASE64;
 | 
				
			||||||
| 
						 | 
					@ -38,24 +37,20 @@ use super::PathInfoService;
 | 
				
			||||||
/// [PathInfoService::put] and [PathInfoService::calculate_nar] are not
 | 
					/// [PathInfoService::put] and [PathInfoService::calculate_nar] are not
 | 
				
			||||||
/// implemented and return an error if called.
 | 
					/// implemented and return an error if called.
 | 
				
			||||||
/// TODO: what about reading from nix-cache-info?
 | 
					/// TODO: what about reading from nix-cache-info?
 | 
				
			||||||
pub struct NixHTTPPathInfoService {
 | 
					pub struct NixHTTPPathInfoService<BS, DS> {
 | 
				
			||||||
    base_url: url::Url,
 | 
					    base_url: url::Url,
 | 
				
			||||||
    http_client: reqwest::Client,
 | 
					    http_client: reqwest::Client,
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    blob_service: Arc<dyn BlobService>,
 | 
					    blob_service: BS,
 | 
				
			||||||
    directory_service: Arc<dyn DirectoryService>,
 | 
					    directory_service: DS,
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    /// An optional list of [narinfo::PubKey].
 | 
					    /// An optional list of [narinfo::PubKey].
 | 
				
			||||||
    /// If set, the .narinfo files received need to have correct signature by at least one of these.
 | 
					    /// If set, the .narinfo files received need to have correct signature by at least one of these.
 | 
				
			||||||
    public_keys: Option<Vec<narinfo::PubKey>>,
 | 
					    public_keys: Option<Vec<narinfo::PubKey>>,
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
impl NixHTTPPathInfoService {
 | 
					impl<BS, DS> NixHTTPPathInfoService<BS, DS> {
 | 
				
			||||||
    pub fn new(
 | 
					    pub fn new(base_url: url::Url, blob_service: BS, directory_service: DS) -> Self {
 | 
				
			||||||
        base_url: url::Url,
 | 
					 | 
				
			||||||
        blob_service: Arc<dyn BlobService>,
 | 
					 | 
				
			||||||
        directory_service: Arc<dyn DirectoryService>,
 | 
					 | 
				
			||||||
    ) -> Self {
 | 
					 | 
				
			||||||
        Self {
 | 
					        Self {
 | 
				
			||||||
            base_url,
 | 
					            base_url,
 | 
				
			||||||
            http_client: reqwest::Client::new(),
 | 
					            http_client: reqwest::Client::new(),
 | 
				
			||||||
| 
						 | 
					@ -73,7 +68,11 @@ impl NixHTTPPathInfoService {
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
#[async_trait]
 | 
					#[async_trait]
 | 
				
			||||||
impl PathInfoService for NixHTTPPathInfoService {
 | 
					impl<BS, DS> PathInfoService for NixHTTPPathInfoService<BS, DS>
 | 
				
			||||||
 | 
					where
 | 
				
			||||||
 | 
					    BS: AsRef<dyn BlobService> + Send + Sync + Clone + 'static,
 | 
				
			||||||
 | 
					    DS: AsRef<dyn DirectoryService> + Send + Sync + Clone + 'static,
 | 
				
			||||||
 | 
					{
 | 
				
			||||||
    #[instrument(skip_all, err, fields(path.digest=BASE64.encode(&digest)))]
 | 
					    #[instrument(skip_all, err, fields(path.digest=BASE64.encode(&digest)))]
 | 
				
			||||||
    async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, Error> {
 | 
					    async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, Error> {
 | 
				
			||||||
        let narinfo_url = self
 | 
					        let narinfo_url = self
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -3,7 +3,7 @@ use crate::nar::calculate_size_and_sha256;
 | 
				
			||||||
use crate::proto::PathInfo;
 | 
					use crate::proto::PathInfo;
 | 
				
			||||||
use futures::{stream::iter, Stream};
 | 
					use futures::{stream::iter, Stream};
 | 
				
			||||||
use prost::Message;
 | 
					use prost::Message;
 | 
				
			||||||
use std::{path::Path, pin::Pin, sync::Arc};
 | 
					use std::{path::Path, pin::Pin};
 | 
				
			||||||
use tonic::async_trait;
 | 
					use tonic::async_trait;
 | 
				
			||||||
use tracing::warn;
 | 
					use tracing::warn;
 | 
				
			||||||
use tvix_castore::proto as castorepb;
 | 
					use tvix_castore::proto as castorepb;
 | 
				
			||||||
| 
						 | 
					@ -13,18 +13,18 @@ use tvix_castore::{blobservice::BlobService, directoryservice::DirectoryService,
 | 
				
			||||||
///
 | 
					///
 | 
				
			||||||
/// The PathInfo messages are stored as encoded protos, and keyed by their output hash,
 | 
					/// The PathInfo messages are stored as encoded protos, and keyed by their output hash,
 | 
				
			||||||
/// as that's currently the only request type available.
 | 
					/// as that's currently the only request type available.
 | 
				
			||||||
pub struct SledPathInfoService {
 | 
					pub struct SledPathInfoService<BS, DS> {
 | 
				
			||||||
    db: sled::Db,
 | 
					    db: sled::Db,
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    blob_service: Arc<dyn BlobService>,
 | 
					    blob_service: BS,
 | 
				
			||||||
    directory_service: Arc<dyn DirectoryService>,
 | 
					    directory_service: DS,
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
impl SledPathInfoService {
 | 
					impl<BS, DS> SledPathInfoService<BS, DS> {
 | 
				
			||||||
    pub fn new<P: AsRef<Path>>(
 | 
					    pub fn new<P: AsRef<Path>>(
 | 
				
			||||||
        p: P,
 | 
					        p: P,
 | 
				
			||||||
        blob_service: Arc<dyn BlobService>,
 | 
					        blob_service: BS,
 | 
				
			||||||
        directory_service: Arc<dyn DirectoryService>,
 | 
					        directory_service: DS,
 | 
				
			||||||
    ) -> Result<Self, sled::Error> {
 | 
					    ) -> Result<Self, sled::Error> {
 | 
				
			||||||
        let config = sled::Config::default()
 | 
					        let config = sled::Config::default()
 | 
				
			||||||
            .use_compression(false) // is a required parameter
 | 
					            .use_compression(false) // is a required parameter
 | 
				
			||||||
| 
						 | 
					@ -38,10 +38,7 @@ impl SledPathInfoService {
 | 
				
			||||||
        })
 | 
					        })
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    pub fn new_temporary(
 | 
					    pub fn new_temporary(blob_service: BS, directory_service: DS) -> Result<Self, sled::Error> {
 | 
				
			||||||
        blob_service: Arc<dyn BlobService>,
 | 
					 | 
				
			||||||
        directory_service: Arc<dyn DirectoryService>,
 | 
					 | 
				
			||||||
    ) -> Result<Self, sled::Error> {
 | 
					 | 
				
			||||||
        let config = sled::Config::default().temporary(true);
 | 
					        let config = sled::Config::default().temporary(true);
 | 
				
			||||||
        let db = config.open()?;
 | 
					        let db = config.open()?;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -54,7 +51,11 @@ impl SledPathInfoService {
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
#[async_trait]
 | 
					#[async_trait]
 | 
				
			||||||
impl PathInfoService for SledPathInfoService {
 | 
					impl<BS, DS> PathInfoService for SledPathInfoService<BS, DS>
 | 
				
			||||||
 | 
					where
 | 
				
			||||||
 | 
					    BS: AsRef<dyn BlobService> + Send + Sync,
 | 
				
			||||||
 | 
					    DS: AsRef<dyn DirectoryService> + Send + Sync,
 | 
				
			||||||
 | 
					{
 | 
				
			||||||
    async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, Error> {
 | 
					    async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, Error> {
 | 
				
			||||||
        match self.db.get(digest) {
 | 
					        match self.db.get(digest) {
 | 
				
			||||||
            Ok(None) => Ok(None),
 | 
					            Ok(None) => Ok(None),
 | 
				
			||||||
| 
						 | 
					@ -106,13 +107,9 @@ impl PathInfoService for SledPathInfoService {
 | 
				
			||||||
        &self,
 | 
					        &self,
 | 
				
			||||||
        root_node: &castorepb::node::Node,
 | 
					        root_node: &castorepb::node::Node,
 | 
				
			||||||
    ) -> Result<(u64, [u8; 32]), Error> {
 | 
					    ) -> Result<(u64, [u8; 32]), Error> {
 | 
				
			||||||
        calculate_size_and_sha256(
 | 
					        calculate_size_and_sha256(root_node, &self.blob_service, &self.directory_service)
 | 
				
			||||||
            root_node,
 | 
					            .await
 | 
				
			||||||
            self.blob_service.clone(),
 | 
					            .map_err(|e| Error::StorageError(e.to_string()))
 | 
				
			||||||
            self.directory_service.clone(),
 | 
					 | 
				
			||||||
        )
 | 
					 | 
				
			||||||
        .await
 | 
					 | 
				
			||||||
        .map_err(|e| Error::StorageError(e.to_string()))
 | 
					 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    fn list(&self) -> Pin<Box<dyn Stream<Item = Result<PathInfo, Error>> + Send>> {
 | 
					    fn list(&self) -> Pin<Box<dyn Stream<Item = Result<PathInfo, Error>> + Send>> {
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -22,8 +22,8 @@ fn gen_grpc_service(
 | 
				
			||||||
    let blob_service = gen_blob_service();
 | 
					    let blob_service = gen_blob_service();
 | 
				
			||||||
    let directory_service = gen_directory_service();
 | 
					    let directory_service = gen_directory_service();
 | 
				
			||||||
    Arc::new(GRPCPathInfoServiceWrapper::new(gen_pathinfo_service(
 | 
					    Arc::new(GRPCPathInfoServiceWrapper::new(gen_pathinfo_service(
 | 
				
			||||||
        blob_service.into(),
 | 
					        blob_service,
 | 
				
			||||||
        directory_service.into(),
 | 
					        directory_service,
 | 
				
			||||||
    )))
 | 
					    )))
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -4,9 +4,13 @@ use tvix_castore::{blobservice::BlobService, directoryservice::DirectoryService}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
pub use tvix_castore::utils::*;
 | 
					pub use tvix_castore::utils::*;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
pub fn gen_pathinfo_service(
 | 
					pub fn gen_pathinfo_service<BS, DS>(
 | 
				
			||||||
    blob_service: Arc<dyn BlobService>,
 | 
					    blob_service: BS,
 | 
				
			||||||
    directory_service: Arc<dyn DirectoryService>,
 | 
					    directory_service: DS,
 | 
				
			||||||
) -> Arc<dyn PathInfoService> {
 | 
					) -> Arc<dyn PathInfoService>
 | 
				
			||||||
 | 
					where
 | 
				
			||||||
 | 
					    BS: AsRef<dyn BlobService> + Send + Sync + 'static,
 | 
				
			||||||
 | 
					    DS: AsRef<dyn DirectoryService> + Send + Sync + 'static,
 | 
				
			||||||
 | 
					{
 | 
				
			||||||
    Arc::new(MemoryPathInfoService::new(blob_service, directory_service))
 | 
					    Arc::new(MemoryPathInfoService::new(blob_service, directory_service))
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue