feat(tvix/castore): add RedbDirectoryService
This provides a DirectoryService implementation which uses redb (https://github.com/cberner/redb) as the database. It provides both in-memory and persistent on-filesystem implementations. Change-Id: Id8f7c812e2cf401cccd1c382b19907b17a6887bc Reviewed-on: https://cl.tvl.fyi/c/depot/+/12038 Tested-by: BuildkiteCI Autosubmit: Ilan Joselevich <personal@ilanjoselevich.com> Reviewed-by: flokli <flokli@flokli.de>
This commit is contained in:
		
							parent
							
								
									87d4b00ff5
								
							
						
					
					
						commit
						41dc9ee6a2
					
				
					 6 changed files with 341 additions and 0 deletions
				
			
		|  | @ -172,6 +172,7 @@ depot.nix.readTree.drvTargets | |||
|   closure-nixos = (mkBootTest { | ||||
|     blobServiceAddr = "objectstore+file:///build/blobs"; | ||||
|     pathInfoServiceAddr = "redb:///build/pathinfo.redb"; | ||||
|     directoryServiceAddr = "redb:///build/directories.redb"; | ||||
|     path = testSystem; | ||||
|     isClosure = true; | ||||
|     vmCmdline = "init=${testSystem}/init panic=-1"; # reboot immediately on panic | ||||
|  |  | |||
|  | @ -26,6 +26,11 @@ impl From<B3Digest> for bytes::Bytes { | |||
|     } | ||||
| } | ||||
| 
 | ||||
| impl From<blake3::Hash> for B3Digest { | ||||
|     fn from(value: blake3::Hash) -> Self { | ||||
|         Self(Bytes::copy_from_slice(value.as_bytes())) | ||||
|     } | ||||
| } | ||||
| impl From<digest::Output<blake3::Hasher>> for B3Digest { | ||||
|     fn from(value: digest::Output<blake3::Hasher>) -> Self { | ||||
|         let v = Into::<[u8; B3_LEN]>::into(value); | ||||
|  | @ -67,6 +72,12 @@ impl From<&[u8; B3_LEN]> for B3Digest { | |||
|     } | ||||
| } | ||||
| 
 | ||||
| impl From<B3Digest> for [u8; B3_LEN] { | ||||
|     fn from(value: B3Digest) -> Self { | ||||
|         value.0.to_vec().try_into().unwrap() | ||||
|     } | ||||
| } | ||||
| 
 | ||||
| impl Clone for B3Digest { | ||||
|     fn clone(&self) -> Self { | ||||
|         Self(self.0.to_owned()) | ||||
|  |  | |||
|  | @ -18,6 +18,11 @@ use super::DirectoryService; | |||
| /// - `sled:///absolute/path/to/somewhere`
 | ||||
| ///   Uses sled, using a path on the disk for persistency. Can be only opened
 | ||||
| ///   from one process at the same time.
 | ||||
| /// - `redb:`
 | ||||
| ///   Uses a in-memory redb implementation.
 | ||||
| /// - `redb:///absolute/path/to/somewhere`
 | ||||
| ///   Uses redb, using a path on the disk for persistency. Can be only opened
 | ||||
| ///   from one process at the same time.
 | ||||
| /// - `grpc+unix:///absolute/path/to/somewhere`
 | ||||
| ///   Connects to a local tvix-store gRPC service via Unix socket.
 | ||||
| /// - `grpc+http://host:port`, `grpc+https://host:port`
 | ||||
|  | @ -52,6 +57,8 @@ mod tests { | |||
|     lazy_static! { | ||||
|         static ref TMPDIR_SLED_1: TempDir = TempDir::new().unwrap(); | ||||
|         static ref TMPDIR_SLED_2: TempDir = TempDir::new().unwrap(); | ||||
|         static ref TMPDIR_REDB_1: TempDir = TempDir::new().unwrap(); | ||||
|         static ref TMPDIR_REDB_2: TempDir = TempDir::new().unwrap(); | ||||
|     } | ||||
| 
 | ||||
|     #[rstest] | ||||
|  | @ -75,6 +82,16 @@ mod tests { | |||
|     #[case::memory_invalid_root_path("memory:///", false)] | ||||
|     /// This sets a memory url path to "/foo", which is invalid.
 | ||||
|     #[case::memory_invalid_root_path_foo("memory:///foo", false)] | ||||
|     /// This configures redb in temporary mode.
 | ||||
|     #[case::redb_valid_temporary("redb://", true)] | ||||
|     /// This configures redb with /, which should fail.
 | ||||
|     #[case::redb_invalid_root("redb:///", false)] | ||||
|     /// This configures redb with a host, not path, which should fail.
 | ||||
|     #[case::redb_invalid_host("redb://foo.example", false)] | ||||
|     /// This configures redb with a valid path, which should succeed.
 | ||||
|     #[case::redb_valid_path(&format!("redb://{}", &TMPDIR_REDB_1.path().join("foo").to_str().unwrap()), true)] | ||||
|     /// This configures redb with a host, and a valid path path, which should fail.
 | ||||
|     #[case::redb_invalid_host_with_valid_path(&format!("redb://foo.example{}", &TMPDIR_REDB_2.path().join("bar").to_str().unwrap()), false)] | ||||
|     /// Correct scheme to connect to a unix socket.
 | ||||
|     #[case::grpc_valid_unix_socket("grpc+unix:///path/to/somewhere", true)] | ||||
|     /// Correct scheme for unix socket, but setting a host too, which is invalid.
 | ||||
|  |  | |||
|  | @ -10,6 +10,7 @@ mod grpc; | |||
| mod memory; | ||||
| mod object_store; | ||||
| mod order_validator; | ||||
| mod redb; | ||||
| mod simple_putter; | ||||
| mod sled; | ||||
| #[cfg(test)] | ||||
|  | @ -24,6 +25,7 @@ pub use self::grpc::{GRPCDirectoryService, GRPCDirectoryServiceConfig}; | |||
| pub use self::memory::{MemoryDirectoryService, MemoryDirectoryServiceConfig}; | ||||
| pub use self::object_store::{ObjectStoreDirectoryService, ObjectStoreDirectoryServiceConfig}; | ||||
| pub use self::order_validator::{LeavesToRootValidator, OrderValidator, RootToLeavesValidator}; | ||||
| pub use self::redb::{RedbDirectoryService, RedbDirectoryServiceConfig}; | ||||
| pub use self::simple_putter::SimplePutter; | ||||
| pub use self::sled::{SledDirectoryService, SledDirectoryServiceConfig}; | ||||
| pub use self::traverse::descend_to; | ||||
|  | @ -137,6 +139,7 @@ pub(crate) fn register_directory_services(reg: &mut Registry) { | |||
|     reg.register::<Box<dyn ServiceBuilder<Output = dyn DirectoryService>>, super::directoryservice::CacheConfig>("cache"); | ||||
|     reg.register::<Box<dyn ServiceBuilder<Output = dyn DirectoryService>>, super::directoryservice::GRPCDirectoryServiceConfig>("grpc"); | ||||
|     reg.register::<Box<dyn ServiceBuilder<Output = dyn DirectoryService>>, super::directoryservice::SledDirectoryServiceConfig>("sled"); | ||||
|     reg.register::<Box<dyn ServiceBuilder<Output = dyn DirectoryService>>, super::directoryservice::RedbDirectoryServiceConfig>("redb"); | ||||
|     #[cfg(feature = "cloud")] | ||||
|     { | ||||
|         reg.register::<Box<dyn ServiceBuilder<Output = dyn DirectoryService>>, super::directoryservice::BigtableParameters>("bigtable"); | ||||
|  |  | |||
							
								
								
									
										308
									
								
								tvix/castore/src/directoryservice/redb.rs
									
										
									
									
									
										Normal file
									
								
							
							
						
						
									
										308
									
								
								tvix/castore/src/directoryservice/redb.rs
									
										
									
									
									
										Normal file
									
								
							|  | @ -0,0 +1,308 @@ | |||
| use futures::stream::BoxStream; | ||||
| use prost::Message; | ||||
| use redb::{Database, TableDefinition}; | ||||
| use std::{path::PathBuf, sync::Arc}; | ||||
| use tonic::async_trait; | ||||
| use tracing::{instrument, warn}; | ||||
| 
 | ||||
| use crate::{ | ||||
|     composition::{CompositionContext, ServiceBuilder}, | ||||
|     digests, proto, B3Digest, Error, | ||||
| }; | ||||
| 
 | ||||
| use super::{ | ||||
|     traverse_directory, DirectoryGraph, DirectoryPutter, DirectoryService, LeavesToRootValidator, | ||||
| }; | ||||
| 
 | ||||
| const DIRECTORY_TABLE: TableDefinition<[u8; digests::B3_LEN], Vec<u8>> = | ||||
|     TableDefinition::new("directory"); | ||||
| 
 | ||||
| #[derive(Clone)] | ||||
| pub struct RedbDirectoryService { | ||||
|     // We wrap the db in an Arc to be able to move it into spawn_blocking,
 | ||||
|     // as discussed in https://github.com/cberner/redb/issues/789
 | ||||
|     db: Arc<Database>, | ||||
| } | ||||
| 
 | ||||
| impl RedbDirectoryService { | ||||
|     /// Constructs a new instance using the specified filesystem path for
 | ||||
|     /// storage.
 | ||||
|     pub async fn new(path: PathBuf) -> Result<Self, Error> { | ||||
|         if path == PathBuf::from("/") { | ||||
|             return Err(Error::StorageError( | ||||
|                 "cowardly refusing to open / with redb".to_string(), | ||||
|             )); | ||||
|         } | ||||
| 
 | ||||
|         let db = tokio::task::spawn_blocking(|| -> Result<_, redb::Error> { | ||||
|             let db = redb::Database::create(path)?; | ||||
|             create_schema(&db)?; | ||||
|             Ok(db) | ||||
|         }) | ||||
|         .await??; | ||||
| 
 | ||||
|         Ok(Self { db: Arc::new(db) }) | ||||
|     } | ||||
| 
 | ||||
|     /// Constructs a new instance using the in-memory backend.
 | ||||
|     pub fn new_temporary() -> Result<Self, Error> { | ||||
|         let db = | ||||
|             redb::Database::builder().create_with_backend(redb::backends::InMemoryBackend::new())?; | ||||
| 
 | ||||
|         create_schema(&db)?; | ||||
| 
 | ||||
|         Ok(Self { db: Arc::new(db) }) | ||||
|     } | ||||
| } | ||||
| 
 | ||||
| /// Ensures all tables are present.
 | ||||
| /// Opens a write transaction and calls open_table on DIRECTORY_TABLE, which will
 | ||||
| /// create it if not present.
 | ||||
| fn create_schema(db: &redb::Database) -> Result<(), redb::Error> { | ||||
|     let txn = db.begin_write()?; | ||||
|     txn.open_table(DIRECTORY_TABLE)?; | ||||
|     txn.commit()?; | ||||
| 
 | ||||
|     Ok(()) | ||||
| } | ||||
| 
 | ||||
| #[async_trait] | ||||
| impl DirectoryService for RedbDirectoryService { | ||||
|     #[instrument(skip(self, digest), fields(directory.digest = %digest))] | ||||
|     async fn get(&self, digest: &B3Digest) -> Result<Option<proto::Directory>, Error> { | ||||
|         let db = self.db.clone(); | ||||
| 
 | ||||
|         // Retrieves the protobuf-encoded Directory for the corresponding digest.
 | ||||
|         let db_get_resp = tokio::task::spawn_blocking({ | ||||
|             let digest_as_array: [u8; digests::B3_LEN] = digest.to_owned().into(); | ||||
|             move || -> Result<_, redb::Error> { | ||||
|                 let txn = db.begin_read()?; | ||||
|                 let table = txn.open_table(DIRECTORY_TABLE)?; | ||||
|                 Ok(table.get(digest_as_array)?) | ||||
|             } | ||||
|         }) | ||||
|         .await? | ||||
|         .map_err(|e| { | ||||
|             warn!(err=%e, "failed to retrieve Directory"); | ||||
|             Error::StorageError("failed to retrieve Directory".to_string()) | ||||
|         })?; | ||||
| 
 | ||||
|         // The Directory was not found, return None.
 | ||||
|         let directory_data = match db_get_resp { | ||||
|             None => return Ok(None), | ||||
|             Some(d) => d, | ||||
|         }; | ||||
| 
 | ||||
|         // We check that the digest of the retrieved Directory matches the expected digest.
 | ||||
|         let actual_digest = blake3::hash(directory_data.value().as_slice()); | ||||
|         if actual_digest.as_bytes() != digest.as_slice() { | ||||
|             warn!(directory.actual_digest=%actual_digest, "requested Directory got the wrong digest"); | ||||
|             return Err(Error::StorageError( | ||||
|                 "requested Directory got the wrong digest".to_string(), | ||||
|             )); | ||||
|         } | ||||
| 
 | ||||
|         // Attempt to decode the retrieved protobuf-encoded Directory, returning a parsing error if
 | ||||
|         // the decoding failed.
 | ||||
|         let directory = match proto::Directory::decode(&*directory_data.value()) { | ||||
|             Ok(dir) => { | ||||
|                 // The returned Directory must be valid.
 | ||||
|                 if let Err(e) = dir.validate() { | ||||
|                     warn!(err=%e, "Directory failed validation"); | ||||
|                     return Err(Error::StorageError( | ||||
|                         "Directory failed validation".to_string(), | ||||
|                     )); | ||||
|                 } | ||||
|                 dir | ||||
|             } | ||||
|             Err(e) => { | ||||
|                 warn!(err=%e, "failed to parse Directory"); | ||||
|                 return Err(Error::StorageError("failed to parse Directory".to_string())); | ||||
|             } | ||||
|         }; | ||||
| 
 | ||||
|         Ok(Some(directory)) | ||||
|     } | ||||
| 
 | ||||
|     #[instrument(skip(self, directory), fields(directory.digest = %directory.digest()))] | ||||
|     async fn put(&self, directory: proto::Directory) -> Result<B3Digest, Error> { | ||||
|         tokio::task::spawn_blocking({ | ||||
|             let db = self.db.clone(); | ||||
|             move || { | ||||
|                 let digest = directory.digest(); | ||||
| 
 | ||||
|                 // Validate the directory.
 | ||||
|                 if let Err(e) = directory.validate() { | ||||
|                     warn!(err=%e, "Directory failed validation"); | ||||
|                     return Err(Error::StorageError( | ||||
|                         "Directory failed validation".to_string(), | ||||
|                     )); | ||||
|                 } | ||||
| 
 | ||||
|                 // Store the directory in the table.
 | ||||
|                 let txn = db.begin_write()?; | ||||
|                 { | ||||
|                     let mut table = txn.open_table(DIRECTORY_TABLE)?; | ||||
|                     let digest_as_array: [u8; digests::B3_LEN] = digest.clone().into(); | ||||
|                     table.insert(digest_as_array, directory.encode_to_vec())?; | ||||
|                 } | ||||
|                 txn.commit()?; | ||||
| 
 | ||||
|                 Ok(digest) | ||||
|             } | ||||
|         }) | ||||
|         .await? | ||||
|     } | ||||
| 
 | ||||
|     #[instrument(skip_all, fields(directory.digest = %root_directory_digest))] | ||||
|     fn get_recursive( | ||||
|         &self, | ||||
|         root_directory_digest: &B3Digest, | ||||
|     ) -> BoxStream<'static, Result<proto::Directory, Error>> { | ||||
|         // FUTUREWORK: Ideally we should have all of the directory traversing happen in a single
 | ||||
|         // redb transaction to avoid constantly closing and opening new transactions for the
 | ||||
|         // database.
 | ||||
|         traverse_directory(self.clone(), root_directory_digest) | ||||
|     } | ||||
| 
 | ||||
|     #[instrument(skip_all)] | ||||
|     fn put_multiple_start(&self) -> Box<dyn DirectoryPutter> { | ||||
|         Box::new(RedbDirectoryPutter { | ||||
|             db: self.db.clone(), | ||||
|             directory_validator: Some(Default::default()), | ||||
|         }) | ||||
|     } | ||||
| } | ||||
| 
 | ||||
| pub struct RedbDirectoryPutter { | ||||
|     db: Arc<Database>, | ||||
| 
 | ||||
|     /// The directories (inside the directory validator) that we insert later,
 | ||||
|     /// or None, if they were already inserted.
 | ||||
|     directory_validator: Option<DirectoryGraph<LeavesToRootValidator>>, | ||||
| } | ||||
| 
 | ||||
| #[async_trait] | ||||
| impl DirectoryPutter for RedbDirectoryPutter { | ||||
|     #[instrument(level = "trace", skip_all, fields(directory.digest=%directory.digest()), err)] | ||||
|     async fn put(&mut self, directory: proto::Directory) -> Result<(), Error> { | ||||
|         match self.directory_validator { | ||||
|             None => return Err(Error::StorageError("already closed".to_string())), | ||||
|             Some(ref mut validator) => { | ||||
|                 validator | ||||
|                     .add(directory) | ||||
|                     .map_err(|e| Error::StorageError(e.to_string()))?; | ||||
|             } | ||||
|         } | ||||
| 
 | ||||
|         Ok(()) | ||||
|     } | ||||
| 
 | ||||
|     #[instrument(level = "trace", skip_all, ret, err)] | ||||
|     async fn close(&mut self) -> Result<B3Digest, Error> { | ||||
|         match self.directory_validator.take() { | ||||
|             None => Err(Error::StorageError("already closed".to_string())), | ||||
|             Some(validator) => { | ||||
|                 // Insert all directories as a batch.
 | ||||
|                 tokio::task::spawn_blocking({ | ||||
|                     let txn = self.db.begin_write()?; | ||||
|                     move || { | ||||
|                         // Retrieve the validated directories.
 | ||||
|                         let directories = validator | ||||
|                             .validate() | ||||
|                             .map_err(|e| Error::StorageError(e.to_string()))? | ||||
|                             .drain_leaves_to_root() | ||||
|                             .collect::<Vec<_>>(); | ||||
| 
 | ||||
|                         // Get the root digest, which is at the end (cf. insertion order)
 | ||||
|                         let root_digest = directories | ||||
|                             .last() | ||||
|                             .ok_or_else(|| Error::StorageError("got no directories".to_string()))? | ||||
|                             .digest(); | ||||
| 
 | ||||
|                         { | ||||
|                             let mut table = txn.open_table(DIRECTORY_TABLE)?; | ||||
| 
 | ||||
|                             // Looping over all the verified directories, queuing them up for a
 | ||||
|                             // batch insertion.
 | ||||
|                             for directory in directories { | ||||
|                                 let digest_as_array: [u8; digests::B3_LEN] = | ||||
|                                     directory.digest().into(); | ||||
|                                 table.insert(digest_as_array, directory.encode_to_vec())?; | ||||
|                             } | ||||
|                         } | ||||
| 
 | ||||
|                         txn.commit()?; | ||||
| 
 | ||||
|                         Ok(root_digest) | ||||
|                     } | ||||
|                 }) | ||||
|                 .await? | ||||
|             } | ||||
|         } | ||||
|     } | ||||
| } | ||||
| 
 | ||||
| #[derive(serde::Deserialize)] | ||||
| #[serde(deny_unknown_fields)] | ||||
| pub struct RedbDirectoryServiceConfig { | ||||
|     is_temporary: bool, | ||||
|     #[serde(default)] | ||||
|     /// required when is_temporary = false
 | ||||
|     path: Option<PathBuf>, | ||||
| } | ||||
| 
 | ||||
| impl TryFrom<url::Url> for RedbDirectoryServiceConfig { | ||||
|     type Error = Box<dyn std::error::Error + Send + Sync>; | ||||
|     fn try_from(url: url::Url) -> Result<Self, Self::Error> { | ||||
|         // redb doesn't support host, and a path can be provided (otherwise
 | ||||
|         // it'll live in memory only).
 | ||||
|         if url.has_host() { | ||||
|             return Err(Error::StorageError("no host allowed".to_string()).into()); | ||||
|         } | ||||
| 
 | ||||
|         Ok(if url.path().is_empty() { | ||||
|             RedbDirectoryServiceConfig { | ||||
|                 is_temporary: true, | ||||
|                 path: None, | ||||
|             } | ||||
|         } else { | ||||
|             RedbDirectoryServiceConfig { | ||||
|                 is_temporary: false, | ||||
|                 path: Some(url.path().into()), | ||||
|             } | ||||
|         }) | ||||
|     } | ||||
| } | ||||
| 
 | ||||
| #[async_trait] | ||||
| impl ServiceBuilder for RedbDirectoryServiceConfig { | ||||
|     type Output = dyn DirectoryService; | ||||
|     async fn build<'a>( | ||||
|         &'a self, | ||||
|         _instance_name: &str, | ||||
|         _context: &CompositionContext, | ||||
|     ) -> Result<Arc<dyn DirectoryService>, Box<dyn std::error::Error + Send + Sync + 'static>> { | ||||
|         match self { | ||||
|             RedbDirectoryServiceConfig { | ||||
|                 is_temporary: true, | ||||
|                 path: None, | ||||
|             } => Ok(Arc::new(RedbDirectoryService::new_temporary()?)), | ||||
|             RedbDirectoryServiceConfig { | ||||
|                 is_temporary: true, | ||||
|                 path: Some(_), | ||||
|             } => Err(Error::StorageError( | ||||
|                 "Temporary RedbDirectoryService can not have path".into(), | ||||
|             ) | ||||
|             .into()), | ||||
|             RedbDirectoryServiceConfig { | ||||
|                 is_temporary: false, | ||||
|                 path: None, | ||||
|             } => Err(Error::StorageError("RedbDirectoryService is missing path".into()).into()), | ||||
|             RedbDirectoryServiceConfig { | ||||
|                 is_temporary: false, | ||||
|                 path: Some(path), | ||||
|             } => Ok(Arc::new(RedbDirectoryService::new(path.into()).await?)), | ||||
|         } | ||||
|     } | ||||
| } | ||||
|  | @ -26,6 +26,7 @@ use self::utils::make_grpc_directory_service_client; | |||
| #[case::grpc(make_grpc_directory_service_client().await)] | ||||
| #[case::memory(directoryservice::from_addr("memory://").await.unwrap())] | ||||
| #[case::sled(directoryservice::from_addr("sled://").await.unwrap())] | ||||
| #[case::redb(directoryservice::from_addr("redb://").await.unwrap())] | ||||
| #[case::objectstore(directoryservice::from_addr("objectstore+memory://").await.unwrap())] | ||||
| #[cfg_attr(all(feature = "cloud", feature = "integration"), case::bigtable(directoryservice::from_addr("bigtable://instance-1?project_id=project-1&table_name=table-1&family_name=cf1").await.unwrap()))] | ||||
| pub fn directory_services(#[case] directory_service: impl DirectoryService) {} | ||||
|  |  | |||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue