refactor(tvix/castore): drop {Directory,File,Symlink}Node
Add a `SymlinkTarget` type to represent validated symlink targets. With this, no invalid states are representable, so we can make `Node` be just an enum of all three kind of types, and allow access to these fields directly. Change-Id: I20bdd480c8d5e64a827649f303c97023b7e390f2 Reviewed-on: https://cl.tvl.fyi/c/depot/+/12216 Reviewed-by: benjaminedwardwebb <benjaminedwardwebb@gmail.com> Autosubmit: flokli <flokli@flokli.de> Reviewed-by: Connor Brewster <cbrewster@hey.com> Tested-by: BuildkiteCI
This commit is contained in:
parent
49b173786c
commit
8ea7d2b60e
27 changed files with 555 additions and 461 deletions
|
|
@ -10,7 +10,7 @@ use petgraph::{
|
|||
use tracing::instrument;
|
||||
|
||||
use super::order_validator::{LeavesToRootValidator, OrderValidator, RootToLeavesValidator};
|
||||
use crate::{B3Digest, Directory, DirectoryNode};
|
||||
use crate::{B3Digest, Directory, Node};
|
||||
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
pub enum Error {
|
||||
|
|
@ -18,6 +18,8 @@ pub enum Error {
|
|||
ValidationError(String),
|
||||
}
|
||||
|
||||
type Edge = (B3Digest, u64);
|
||||
|
||||
/// This can be used to validate and/or re-order a Directory closure (DAG of
|
||||
/// connected Directories), and their insertion order.
|
||||
///
|
||||
|
|
@ -55,7 +57,7 @@ pub struct DirectoryGraph<O> {
|
|||
//
|
||||
// The option in the edge weight tracks the pending validation state of the respective edge, for example if
|
||||
// the child has not been added yet.
|
||||
graph: DiGraph<Option<Directory>, Option<DirectoryNode>>,
|
||||
graph: DiGraph<Option<Directory>, Option<Edge>>,
|
||||
|
||||
// A lookup table from directory digest to node index.
|
||||
digest_to_node_ix: HashMap<B3Digest, NodeIndex>,
|
||||
|
|
@ -64,18 +66,18 @@ pub struct DirectoryGraph<O> {
|
|||
}
|
||||
|
||||
pub struct ValidatedDirectoryGraph {
|
||||
graph: DiGraph<Option<Directory>, Option<DirectoryNode>>,
|
||||
graph: DiGraph<Option<Directory>, Option<Edge>>,
|
||||
|
||||
root: Option<NodeIndex>,
|
||||
}
|
||||
|
||||
fn check_edge(dir: &DirectoryNode, dir_name: &[u8], child: &Directory) -> Result<(), Error> {
|
||||
fn check_edge(dir: &Edge, dir_name: &[u8], child: &Directory) -> Result<(), Error> {
|
||||
// Ensure the size specified in the child node matches our records.
|
||||
if dir.size() != child.size() {
|
||||
if dir.1 != child.size() {
|
||||
return Err(Error::ValidationError(format!(
|
||||
"'{}' has wrong size, specified {}, recorded {}",
|
||||
dir_name.as_bstr(),
|
||||
dir.size(),
|
||||
dir.1,
|
||||
child.size(),
|
||||
)));
|
||||
}
|
||||
|
|
@ -141,21 +143,23 @@ impl<O: OrderValidator> DirectoryGraph<O> {
|
|||
}
|
||||
|
||||
// set up edges to all child directories
|
||||
for (subdir_name, subdir_node) in directory.directories() {
|
||||
let child_ix = *self
|
||||
.digest_to_node_ix
|
||||
.entry(subdir_node.digest().clone())
|
||||
.or_insert_with(|| self.graph.add_node(None));
|
||||
for (name, node) in directory.nodes() {
|
||||
if let Node::Directory { digest, size } = node {
|
||||
let child_ix = *self
|
||||
.digest_to_node_ix
|
||||
.entry(digest.clone())
|
||||
.or_insert_with(|| self.graph.add_node(None));
|
||||
|
||||
let pending_edge_check = match &self.graph[child_ix] {
|
||||
Some(child) => {
|
||||
// child is already available, validate the edge now
|
||||
check_edge(subdir_node, subdir_name, child)?;
|
||||
None
|
||||
}
|
||||
None => Some(subdir_node.clone()), // pending validation
|
||||
};
|
||||
self.graph.add_edge(ix, child_ix, pending_edge_check);
|
||||
let pending_edge_check = match &self.graph[child_ix] {
|
||||
Some(child) => {
|
||||
// child is already available, validate the edge now
|
||||
check_edge(&(digest.to_owned(), *size), name, child)?;
|
||||
None
|
||||
}
|
||||
None => Some((digest.to_owned(), *size)), // pending validation
|
||||
};
|
||||
self.graph.add_edge(ix, child_ix, pending_edge_check);
|
||||
}
|
||||
}
|
||||
|
||||
// validate the edges from parents to this node
|
||||
|
|
@ -270,7 +274,7 @@ impl ValidatedDirectoryGraph {
|
|||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::fixtures::{DIRECTORY_A, DIRECTORY_B, DIRECTORY_C};
|
||||
use crate::{Directory, DirectoryNode, Node};
|
||||
use crate::{Directory, Node};
|
||||
use lazy_static::lazy_static;
|
||||
use rstest::rstest;
|
||||
|
||||
|
|
@ -281,10 +285,10 @@ mod tests {
|
|||
let mut dir = Directory::new();
|
||||
dir.add(
|
||||
"foo".into(),
|
||||
Node::Directory(DirectoryNode::new(
|
||||
DIRECTORY_A.digest(),
|
||||
DIRECTORY_A.size() + 42, // wrong!
|
||||
))).unwrap();
|
||||
Node::Directory{
|
||||
digest: DIRECTORY_A.digest(),
|
||||
size: DIRECTORY_A.size() + 42, // wrong!
|
||||
}).unwrap();
|
||||
dir
|
||||
};
|
||||
}
|
||||
|
|
|
|||
|
|
@ -21,7 +21,7 @@ use super::{
|
|||
RootToLeavesValidator,
|
||||
};
|
||||
use crate::composition::{CompositionContext, ServiceBuilder};
|
||||
use crate::{proto, B3Digest, Error};
|
||||
use crate::{proto, B3Digest, Error, Node};
|
||||
|
||||
/// Stores directory closures in an object store.
|
||||
/// Notably, this makes use of the option to disallow accessing child directories except when
|
||||
|
|
@ -85,7 +85,11 @@ impl DirectoryService for ObjectStoreDirectoryService {
|
|||
|
||||
#[instrument(skip(self, directory), fields(directory.digest = %directory.digest()))]
|
||||
async fn put(&self, directory: Directory) -> Result<B3Digest, Error> {
|
||||
if directory.directories().next().is_some() {
|
||||
// Ensure the directory doesn't contain other directory children
|
||||
if directory
|
||||
.nodes()
|
||||
.any(|(_, e)| matches!(e, Node::Directory { .. }))
|
||||
{
|
||||
return Err(Error::InvalidRequest(
|
||||
"only put_multiple_start is supported by the ObjectStoreDirectoryService for directories with children".into(),
|
||||
));
|
||||
|
|
|
|||
|
|
@ -2,7 +2,7 @@ use std::collections::HashSet;
|
|||
use tracing::warn;
|
||||
|
||||
use super::Directory;
|
||||
use crate::B3Digest;
|
||||
use crate::{B3Digest, Node};
|
||||
|
||||
pub trait OrderValidator {
|
||||
/// Update the order validator's state with the directory
|
||||
|
|
@ -48,9 +48,11 @@ impl RootToLeavesValidator {
|
|||
self.expected_digests.insert(directory.digest());
|
||||
}
|
||||
|
||||
for (_, subdir_node) in directory.directories() {
|
||||
// Allow the children to appear next
|
||||
self.expected_digests.insert(subdir_node.digest().clone());
|
||||
// Allow the children to appear next
|
||||
for (_, node) in directory.nodes() {
|
||||
if let Node::Directory { digest, .. } = node {
|
||||
self.expected_digests.insert(digest.clone());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -79,14 +81,20 @@ impl OrderValidator for LeavesToRootValidator {
|
|||
fn add_directory(&mut self, directory: &Directory) -> bool {
|
||||
let digest = directory.digest();
|
||||
|
||||
for (_, subdir_node) in directory.directories() {
|
||||
if !self.allowed_references.contains(subdir_node.digest()) {
|
||||
warn!(
|
||||
directory.digest = %digest,
|
||||
subdirectory.digest = %subdir_node.digest(),
|
||||
"unexpected directory reference"
|
||||
);
|
||||
return false;
|
||||
for (_, node) in directory.nodes() {
|
||||
if let Node::Directory {
|
||||
digest: subdir_node_digest,
|
||||
..
|
||||
} = node
|
||||
{
|
||||
if !self.allowed_references.contains(subdir_node_digest) {
|
||||
warn!(
|
||||
directory.digest = %digest,
|
||||
subdirectory.digest = %subdir_node_digest,
|
||||
"unexpected directory reference"
|
||||
);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -9,7 +9,7 @@ use rstest_reuse::{self, *};
|
|||
use super::DirectoryService;
|
||||
use crate::directoryservice;
|
||||
use crate::fixtures::{DIRECTORY_A, DIRECTORY_B, DIRECTORY_C, DIRECTORY_D};
|
||||
use crate::{Directory, DirectoryNode, Node};
|
||||
use crate::{Directory, Node};
|
||||
|
||||
mod utils;
|
||||
use self::utils::make_grpc_directory_service_client;
|
||||
|
|
@ -220,12 +220,13 @@ async fn upload_reject_wrong_size(directory_service: impl DirectoryService) {
|
|||
let mut dir = Directory::new();
|
||||
dir.add(
|
||||
"foo".into(),
|
||||
Node::Directory(DirectoryNode::new(
|
||||
DIRECTORY_A.digest(),
|
||||
DIRECTORY_A.size() + 42, // wrong!
|
||||
)),
|
||||
Node::Directory {
|
||||
digest: DIRECTORY_A.digest(),
|
||||
size: DIRECTORY_A.size() + 42, // wrong!
|
||||
},
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
dir
|
||||
};
|
||||
|
||||
|
|
|
|||
|
|
@ -15,26 +15,24 @@ where
|
|||
let mut parent_node = root_node;
|
||||
for component in path.as_ref().components() {
|
||||
match parent_node {
|
||||
Node::File(_) | Node::Symlink(_) => {
|
||||
Node::File { .. } | Node::Symlink { .. } => {
|
||||
// There's still some path left, but the parent node is no directory.
|
||||
// This means the path doesn't exist, as we can't reach it.
|
||||
return Ok(None);
|
||||
}
|
||||
Node::Directory(directory_node) => {
|
||||
Node::Directory { digest, .. } => {
|
||||
// fetch the linked node from the directory_service.
|
||||
let directory = directory_service
|
||||
.as_ref()
|
||||
.get(directory_node.digest())
|
||||
.await?
|
||||
.ok_or_else(|| {
|
||||
// If we didn't get the directory node that's linked, that's a store inconsistency, bail out!
|
||||
warn!("directory {} does not exist", directory_node.digest());
|
||||
let directory =
|
||||
directory_service
|
||||
.as_ref()
|
||||
.get(&digest)
|
||||
.await?
|
||||
.ok_or_else(|| {
|
||||
// If we didn't get the directory node that's linked, that's a store inconsistency, bail out!
|
||||
warn!("directory {} does not exist", digest);
|
||||
|
||||
Error::StorageError(format!(
|
||||
"directory {} does not exist",
|
||||
directory_node.digest()
|
||||
))
|
||||
})?;
|
||||
Error::StorageError(format!("directory {} does not exist", digest))
|
||||
})?;
|
||||
|
||||
// look for the component in the [Directory].
|
||||
if let Some((_child_name, child_node)) = directory
|
||||
|
|
@ -59,8 +57,8 @@ where
|
|||
mod tests {
|
||||
use crate::{
|
||||
directoryservice,
|
||||
fixtures::{DIRECTORY_COMPLICATED, DIRECTORY_WITH_KEEP},
|
||||
DirectoryNode, Node, PathBuf,
|
||||
fixtures::{DIRECTORY_COMPLICATED, DIRECTORY_WITH_KEEP, EMPTY_BLOB_DIGEST},
|
||||
Node, PathBuf,
|
||||
};
|
||||
|
||||
use super::descend_to;
|
||||
|
|
@ -82,23 +80,23 @@ mod tests {
|
|||
handle.close().await.expect("must upload");
|
||||
|
||||
// construct the node for DIRECTORY_COMPLICATED
|
||||
let node_directory_complicated = Node::Directory(DirectoryNode::new(
|
||||
DIRECTORY_COMPLICATED.digest(),
|
||||
DIRECTORY_COMPLICATED.size(),
|
||||
));
|
||||
let node_directory_complicated = Node::Directory {
|
||||
digest: DIRECTORY_COMPLICATED.digest(),
|
||||
size: DIRECTORY_COMPLICATED.size(),
|
||||
};
|
||||
|
||||
// construct the node for DIRECTORY_COMPLICATED
|
||||
let node_directory_with_keep = Node::Directory(
|
||||
DIRECTORY_COMPLICATED
|
||||
.directories()
|
||||
.next()
|
||||
.unwrap()
|
||||
.1
|
||||
.clone(),
|
||||
);
|
||||
let node_directory_with_keep = Node::Directory {
|
||||
digest: DIRECTORY_WITH_KEEP.digest(),
|
||||
size: DIRECTORY_WITH_KEEP.size(),
|
||||
};
|
||||
|
||||
// construct the node for the .keep file
|
||||
let node_file_keep = Node::File(DIRECTORY_WITH_KEEP.files().next().unwrap().1.clone());
|
||||
let node_file_keep = Node::File {
|
||||
digest: EMPTY_BLOB_DIGEST.clone(),
|
||||
size: 0,
|
||||
executable: false,
|
||||
};
|
||||
|
||||
// traversal to an empty subpath should return the root node.
|
||||
{
|
||||
|
|
|
|||
|
|
@ -2,6 +2,7 @@ use super::Directory;
|
|||
use super::DirectoryService;
|
||||
use crate::B3Digest;
|
||||
use crate::Error;
|
||||
use crate::Node;
|
||||
use async_stream::try_stream;
|
||||
use futures::stream::BoxStream;
|
||||
use std::collections::{HashSet, VecDeque};
|
||||
|
|
@ -57,15 +58,15 @@ pub fn traverse_directory<'a, DS: DirectoryService + 'static>(
|
|||
// enqueue all child directory digests to the work queue, as
|
||||
// long as they're not part of the worklist or already sent.
|
||||
// This panics if the digest looks invalid, it's supposed to be checked first.
|
||||
for (_, child_directory_node) in current_directory.directories() {
|
||||
let child_digest = child_directory_node.digest();
|
||||
|
||||
if worklist_directory_digests.contains(child_digest)
|
||||
|| sent_directory_digests.contains(child_digest)
|
||||
{
|
||||
continue;
|
||||
for (_, child_directory_node) in current_directory.nodes() {
|
||||
if let Node::Directory{digest: child_digest, ..} = child_directory_node {
|
||||
if worklist_directory_digests.contains(child_digest)
|
||||
|| sent_directory_digests.contains(child_digest)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
worklist_directory_digests.push_back(child_digest.clone());
|
||||
}
|
||||
worklist_directory_digests.push_back(child_digest.clone());
|
||||
}
|
||||
|
||||
yield current_directory;
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue