snix/tvix/castore/src/directoryservice/memory.rs
Yureka 3ca0b53840 refactor(tvix/castore): use Directory struct separate from proto one
This uses our own data type to deal with Directories in the castore model.

It makes some undesired states unrepresentable, removing the need for conversions and checking in various places:

 - In the protobuf, blake3 digests could have a wrong length, as proto doesn't know fixed-size fields. We now use `B3Digest`, which makes cloning cheaper, and removes the need to do size-checking everywhere.
 - In the protobuf, we had three different lists for `files`, `symlinks` and `directories`. This was mostly a protobuf size optimization, but made interacting with them a bit awkward. This has now been replaced with a list of enums, and convenience iterators to get various nodes, and add new ones.

Change-Id: I7b92691bb06d77ff3f58a5ccea94a22c16f84f04
Reviewed-on: https://cl.tvl.fyi/c/depot/+/12057
Tested-by: BuildkiteCI
Reviewed-by: flokli <flokli@flokli.de>
2024-08-13 12:17:01 +00:00

101 lines
3.3 KiB
Rust

use crate::{B3Digest, Error};
use futures::stream::BoxStream;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLock;
use tonic::async_trait;
use tracing::{instrument, warn};
use super::utils::traverse_directory;
use super::{Directory, DirectoryPutter, DirectoryService, SimplePutter};
use crate::composition::{CompositionContext, ServiceBuilder};
use crate::proto;
#[derive(Clone, Default)]
pub struct MemoryDirectoryService {
db: Arc<RwLock<HashMap<B3Digest, proto::Directory>>>,
}
#[async_trait]
impl DirectoryService for MemoryDirectoryService {
#[instrument(skip(self, digest), fields(directory.digest = %digest))]
async fn get(&self, digest: &B3Digest) -> Result<Option<Directory>, Error> {
let db = self.db.read().await;
match db.get(digest) {
// The directory was not found, return
None => Ok(None),
// The directory was found, try to parse the data as Directory message
Some(directory) => {
// Validate the retrieved Directory indeed has the
// digest we expect it to have, to detect corruptions.
let actual_digest = directory.digest();
if actual_digest != *digest {
return Err(Error::StorageError(format!(
"requested directory with digest {}, but got {}",
digest, actual_digest
)));
}
Ok(Some(directory.clone().try_into().map_err(|e| {
crate::Error::StorageError(format!("corrupted directory: {}", e))
})?))
}
}
}
#[instrument(skip(self, directory), fields(directory.digest = %directory.digest()))]
async fn put(&self, directory: Directory) -> Result<B3Digest, Error> {
let digest = directory.digest();
// store it
let mut db = self.db.write().await;
db.insert(digest.clone(), directory.into());
Ok(digest)
}
#[instrument(skip_all, fields(directory.digest = %root_directory_digest))]
fn get_recursive(
&self,
root_directory_digest: &B3Digest,
) -> BoxStream<'static, Result<Directory, Error>> {
traverse_directory(self.clone(), root_directory_digest)
}
#[instrument(skip_all)]
fn put_multiple_start(&self) -> Box<(dyn DirectoryPutter + 'static)>
where
Self: Clone,
{
Box::new(SimplePutter::new(self.clone()))
}
}
#[derive(serde::Deserialize, Debug)]
#[serde(deny_unknown_fields)]
pub struct MemoryDirectoryServiceConfig {}
impl TryFrom<url::Url> for MemoryDirectoryServiceConfig {
type Error = Box<dyn std::error::Error + Send + Sync>;
fn try_from(url: url::Url) -> Result<Self, Self::Error> {
// memory doesn't support host or path in the URL.
if url.has_host() || !url.path().is_empty() {
return Err(Error::StorageError("invalid url".to_string()).into());
}
Ok(MemoryDirectoryServiceConfig {})
}
}
#[async_trait]
impl ServiceBuilder for MemoryDirectoryServiceConfig {
type Output = dyn DirectoryService;
async fn build<'a>(
&'a self,
_instance_name: &str,
_context: &CompositionContext,
) -> Result<Arc<dyn DirectoryService>, Box<dyn std::error::Error + Send + Sync + 'static>> {
Ok(Arc::new(MemoryDirectoryService::default()))
}
}