refactor(tvix/castore): remove name from Nodes

Nodes only have names if they're contained inside a Directory, or if
they're a root node and have something else possibly giving them a name
externally.

This removes all `name` fields in the three different Nodes, and instead
maintains it inside a BTreeMap inside the Directory.

It also removes the NamedNode trait (they don't have a get_name()), as
well as Node::rename(self, name), and all [Partial]Ord implementations
for Node (as they don't have names to use for sorting).

The `nodes()`, `directories()`, `files()` iterators inside a `Directory`
now return a tuple of Name and Node, as does the RootNodesProvider.

The different {Directory,File,Symlink}Node struct constructors got
simpler, and the {Directory,File}Node ones became infallible - as
there's no more possibility to represent invalid state.

The proto structs stayed the same - there's now from_name_and_node and
into_name_and_node to convert back and forth between the two `Node`
structs.

Some further cleanups:

The error types for Node validation were renamed. Everything related to
names is now in the DirectoryError (not yet happy about the naming)

There's some leftover cleanups to do:
 - There should be a from_(sorted_)iter and into_iter in Directory, so
   we can construct and deconstruct in one go.
   That should also enable us to implement conversions from and to the
   proto representation that moves, rather than clones.

 - The BuildRequest and PathInfo structs are still proto-based, so we
   still do a bunch of conversions back and forth there (and have some
   ugly expect there). There's not much point for error handling here,
   this will be moved to stricter types in a followup CL.

Change-Id: I7369a8e3a426f44419c349077cb4fcab2044ebb6
Reviewed-on: https://cl.tvl.fyi/c/depot/+/12205
Tested-by: BuildkiteCI
Reviewed-by: yuka <yuka@yuka.dev>
Autosubmit: flokli <flokli@flokli.de>
Reviewed-by: benjaminedwardwebb <benjaminedwardwebb@gmail.com>
Reviewed-by: Connor Brewster <cbrewster@hey.com>
This commit is contained in:
Florian Klink 2024-08-14 22:00:12 +03:00 committed by clbot
parent 04e9531e65
commit 49b173786c
46 changed files with 785 additions and 1002 deletions

View file

@ -68,15 +68,11 @@ async fn populate_blob_a(
root_nodes.insert(
BLOB_A_NAME.into(),
Node::File(
FileNode::new(
BLOB_A_NAME.into(),
fixtures::BLOB_A_DIGEST.clone(),
fixtures::BLOB_A.len() as u64,
false,
)
.unwrap(),
),
Node::File(FileNode::new(
fixtures::BLOB_A_DIGEST.clone(),
fixtures::BLOB_A.len() as u64,
false,
)),
);
}
@ -92,15 +88,11 @@ async fn populate_blob_b(
root_nodes.insert(
BLOB_B_NAME.into(),
Node::File(
FileNode::new(
BLOB_B_NAME.into(),
fixtures::BLOB_B_DIGEST.clone(),
fixtures::BLOB_B.len() as u64,
false,
)
.unwrap(),
),
Node::File(FileNode::new(
fixtures::BLOB_B_DIGEST.clone(),
fixtures::BLOB_B.len() as u64,
false,
)),
);
}
@ -120,22 +112,18 @@ async fn populate_blob_helloworld(
root_nodes.insert(
HELLOWORLD_BLOB_NAME.into(),
Node::File(
FileNode::new(
HELLOWORLD_BLOB_NAME.into(),
fixtures::HELLOWORLD_BLOB_DIGEST.clone(),
fixtures::HELLOWORLD_BLOB_CONTENTS.len() as u64,
true,
)
.unwrap(),
),
Node::File(FileNode::new(
fixtures::HELLOWORLD_BLOB_DIGEST.clone(),
fixtures::HELLOWORLD_BLOB_CONTENTS.len() as u64,
true,
)),
);
}
async fn populate_symlink(root_nodes: &mut BTreeMap<Bytes, Node>) {
root_nodes.insert(
SYMLINK_NAME.into(),
Node::Symlink(SymlinkNode::new(SYMLINK_NAME.into(), BLOB_A_NAME.into()).unwrap()),
Node::Symlink(SymlinkNode::new(BLOB_A_NAME.into()).unwrap()),
);
}
@ -144,9 +132,7 @@ async fn populate_symlink(root_nodes: &mut BTreeMap<Bytes, Node>) {
async fn populate_symlink2(root_nodes: &mut BTreeMap<Bytes, Node>) {
root_nodes.insert(
SYMLINK_NAME2.into(),
Node::Symlink(
SymlinkNode::new(SYMLINK_NAME2.into(), "/nix/store/somewhereelse".into()).unwrap(),
),
Node::Symlink(SymlinkNode::new("/nix/store/somewhereelse".into()).unwrap()),
);
}
@ -170,14 +156,10 @@ async fn populate_directory_with_keep(
root_nodes.insert(
DIRECTORY_WITH_KEEP_NAME.into(),
Node::Directory(
DirectoryNode::new(
DIRECTORY_WITH_KEEP_NAME.into(),
fixtures::DIRECTORY_WITH_KEEP.digest(),
fixtures::DIRECTORY_WITH_KEEP.size(),
)
.unwrap(),
),
Node::Directory(DirectoryNode::new(
fixtures::DIRECTORY_WITH_KEEP.digest(),
fixtures::DIRECTORY_WITH_KEEP.size(),
)),
);
}
@ -186,14 +168,10 @@ async fn populate_directory_with_keep(
async fn populate_directorynode_without_directory(root_nodes: &mut BTreeMap<Bytes, Node>) {
root_nodes.insert(
DIRECTORY_WITH_KEEP_NAME.into(),
Node::Directory(
DirectoryNode::new(
DIRECTORY_WITH_KEEP_NAME.into(),
fixtures::DIRECTORY_WITH_KEEP.digest(),
fixtures::DIRECTORY_WITH_KEEP.size(),
)
.unwrap(),
),
Node::Directory(DirectoryNode::new(
fixtures::DIRECTORY_WITH_KEEP.digest(),
fixtures::DIRECTORY_WITH_KEEP.size(),
)),
);
}
@ -201,15 +179,11 @@ async fn populate_directorynode_without_directory(root_nodes: &mut BTreeMap<Byte
async fn populate_filenode_without_blob(root_nodes: &mut BTreeMap<Bytes, Node>) {
root_nodes.insert(
BLOB_A_NAME.into(),
Node::File(
FileNode::new(
BLOB_A_NAME.into(),
fixtures::BLOB_A_DIGEST.clone(),
fixtures::BLOB_A.len() as u64,
false,
)
.unwrap(),
),
Node::File(FileNode::new(
fixtures::BLOB_A_DIGEST.clone(),
fixtures::BLOB_A.len() as u64,
false,
)),
);
}
@ -239,14 +213,10 @@ async fn populate_directory_complicated(
root_nodes.insert(
DIRECTORY_COMPLICATED_NAME.into(),
Node::Directory(
DirectoryNode::new(
DIRECTORY_COMPLICATED_NAME.into(),
fixtures::DIRECTORY_COMPLICATED.digest(),
fixtures::DIRECTORY_COMPLICATED.size(),
)
.unwrap(),
),
Node::Directory(DirectoryNode::new(
fixtures::DIRECTORY_COMPLICATED.digest(),
fixtures::DIRECTORY_COMPLICATED.size(),
)),
);
}

View file

@ -2,9 +2,7 @@
//! about inodes, which present tvix-castore nodes in a filesystem.
use std::time::Duration;
use bytes::Bytes;
use crate::{B3Digest, NamedNode, Node};
use crate::{B3Digest, Node};
#[derive(Clone, Debug)]
pub enum InodeData {
@ -19,24 +17,19 @@ pub enum InodeData {
/// lookup and did fetch the data.
#[derive(Clone, Debug)]
pub enum DirectoryInodeData {
Sparse(B3Digest, u64), // digest, size
Populated(B3Digest, Vec<(u64, Node)>), // [(child_inode, node)]
Sparse(B3Digest, u64), // digest, size
Populated(B3Digest, Vec<(u64, bytes::Bytes, Node)>), // [(child_inode, name, node)]
}
impl InodeData {
/// Constructs a new InodeData by consuming a [Node].
/// It splits off the orginal name, so it can be used later.
pub fn from_node(node: &Node) -> (Self, Bytes) {
pub fn from_node(node: &Node) -> Self {
match node {
Node::Directory(n) => (
Self::Directory(DirectoryInodeData::Sparse(n.digest().clone(), n.size())),
n.get_name().clone(),
),
Node::File(n) => (
Self::Regular(n.digest().clone(), n.size(), n.executable()),
n.get_name().clone(),
),
Node::Symlink(n) => (Self::Symlink(n.target().clone()), n.get_name().clone()),
Node::Directory(n) => {
Self::Directory(DirectoryInodeData::Sparse(n.digest().clone(), n.size()))
}
Node::File(n) => Self::Regular(n.digest().clone(), n.size(), n.executable()),
Node::Symlink(n) => Self::Symlink(n.target().clone()),
}
}

View file

@ -18,7 +18,7 @@ use self::{
use crate::{
blobservice::{BlobReader, BlobService},
directoryservice::DirectoryService,
{B3Digest, NamedNode, Node},
{B3Digest, Node},
};
use bstr::ByteVec;
use bytes::Bytes;
@ -103,7 +103,7 @@ pub struct TvixStoreFs<BS, DS, RN> {
u64,
(
Span,
Arc<Mutex<mpsc::Receiver<(usize, Result<Node, crate::Error>)>>>,
Arc<Mutex<mpsc::Receiver<(usize, Result<(Bytes, Node), crate::Error>)>>>,
),
>,
>,
@ -165,8 +165,12 @@ where
/// It is ok if it's a [DirectoryInodeData::Sparse] - in that case, a lookup
/// in self.directory_service is performed, and self.inode_tracker is updated with the
/// [DirectoryInodeData::Populated].
#[allow(clippy::type_complexity)]
#[instrument(skip(self), err)]
fn get_directory_children(&self, ino: u64) -> io::Result<(B3Digest, Vec<(u64, Node)>)> {
fn get_directory_children(
&self,
ino: u64,
) -> io::Result<(B3Digest, Vec<(u64, bytes::Bytes, Node)>)> {
let data = self.inode_tracker.read().get(ino).unwrap();
match *data {
// if it's populated already, return children.
@ -196,13 +200,13 @@ where
let children = {
let mut inode_tracker = self.inode_tracker.write();
let children: Vec<(u64, Node)> = directory
let children: Vec<(u64, Bytes, Node)> = directory
.nodes()
.map(|child_node| {
let (inode_data, _) = InodeData::from_node(child_node);
.map(|(child_name, child_node)| {
let inode_data = InodeData::from_node(child_node);
let child_ino = inode_tracker.put(inode_data);
(child_ino, child_node.clone())
(child_ino, child_name.to_owned(), child_node.clone())
})
.collect();
@ -263,12 +267,6 @@ where
Ok(None) => Err(io::Error::from_raw_os_error(libc::ENOENT)),
// The root node does exist
Ok(Some(root_node)) => {
// The name must match what's passed in the lookup, otherwise this is also a ENOENT.
if root_node.get_name() != name.to_bytes() {
debug!(root_node.name=?root_node.get_name(), found_node.name=%name.to_string_lossy(), "node name mismatch");
return Err(io::Error::from_raw_os_error(libc::ENOENT));
}
// Let's check if someone else beat us to updating the inode tracker and
// root_nodes map. This avoids locking inode_tracker for writing.
if let Some(ino) = self.root_nodes.read().get(name.to_bytes()) {
@ -285,9 +283,9 @@ where
// insert the (sparse) inode data and register in
// self.root_nodes.
let (inode_data, name) = InodeData::from_node(&root_node);
let inode_data = InodeData::from_node(&root_node);
let ino = inode_tracker.put(inode_data.clone());
root_nodes.insert(name, ino);
root_nodes.insert(bytes::Bytes::copy_from_slice(name.to_bytes()), ino);
Ok((ino, Arc::new(inode_data)))
}
@ -362,7 +360,7 @@ where
// Search for that name in the list of children and return the FileAttrs.
// in the children, find the one with the desired name.
if let Some((child_ino, _)) = children.iter().find(|e| e.1.get_name() == name.to_bytes()) {
if let Some((child_ino, _, _)) = children.iter().find(|(_, n, _)| n == name.to_bytes()) {
// lookup the child [InodeData] in [self.inode_tracker].
// We know the inodes for children have already been allocated.
let child_inode_data = self.inode_tracker.read().get(*child_ino).unwrap();
@ -398,8 +396,8 @@ where
self.tokio_handle.spawn(
async move {
let mut stream = root_nodes_provider.list().enumerate();
while let Some(node) = stream.next().await {
if tx.send(node).await.is_err() {
while let Some(e) = stream.next().await {
if tx.send(e).await.is_err() {
// If we get a send error, it means the sync code
// doesn't want any more entries.
break;
@ -461,12 +459,12 @@ where
.map_err(|_| crate::Error::StorageError("mutex poisoned".into()))?;
while let Some((i, n)) = rx.blocking_recv() {
let root_node = n.map_err(|e| {
let (name, node) = n.map_err(|e| {
warn!("failed to retrieve root node: {}", e);
io::Error::from_raw_os_error(libc::EIO)
})?;
let (inode_data, name) = InodeData::from_node(&root_node);
let inode_data = InodeData::from_node(&node);
// obtain the inode, or allocate a new one.
let ino = self.get_inode_for_root_name(&name).unwrap_or_else(|| {
@ -495,15 +493,17 @@ where
let (parent_digest, children) = self.get_directory_children(inode)?;
Span::current().record("directory.digest", parent_digest.to_string());
for (i, (ino, child_node)) in children.into_iter().skip(offset as usize).enumerate() {
let (inode_data, name) = InodeData::from_node(&child_node);
for (i, (ino, child_name, child_node)) in
children.into_iter().skip(offset as usize).enumerate()
{
let inode_data = InodeData::from_node(&child_node);
// the second parameter will become the "offset" parameter on the next call.
let written = add_entry(fuse_backend_rs::api::filesystem::DirEntry {
ino,
offset: offset + (i as u64) + 1,
type_: inode_data.as_fuse_type(),
name: &name,
name: &child_name,
})?;
// If the buffer is full, add_entry will return `Ok(0)`.
if written == 0 {
@ -548,12 +548,12 @@ where
.map_err(|_| crate::Error::StorageError("mutex poisoned".into()))?;
while let Some((i, n)) = rx.blocking_recv() {
let root_node = n.map_err(|e| {
let (name, node) = n.map_err(|e| {
warn!("failed to retrieve root node: {}", e);
io::Error::from_raw_os_error(libc::EPERM)
})?;
let (inode_data, name) = InodeData::from_node(&root_node);
let inode_data = InodeData::from_node(&node);
// obtain the inode, or allocate a new one.
let ino = self.get_inode_for_root_name(&name).unwrap_or_else(|| {
@ -585,8 +585,8 @@ where
let (parent_digest, children) = self.get_directory_children(inode)?;
Span::current().record("directory.digest", parent_digest.to_string());
for (i, (ino, child_node)) in children.into_iter().skip(offset as usize).enumerate() {
let (inode_data, name) = InodeData::from_node(&child_node);
for (i, (ino, name, child_node)) in children.into_iter().skip(offset as usize).enumerate() {
let inode_data = InodeData::from_node(&child_node);
// the second parameter will become the "offset" parameter on the next call.
let written = add_entry(

View file

@ -12,9 +12,10 @@ pub trait RootNodes: Send + Sync {
/// directory of the filesystem.
async fn get_by_basename(&self, name: &[u8]) -> Result<Option<Node>, Error>;
/// Lists all root CA nodes in the filesystem. An error can be returned
/// in case listing is not allowed
fn list(&self) -> BoxStream<Result<Node, Error>>;
/// Lists all root CA nodes in the filesystem, as a tuple of (base)name
/// and Node.
/// An error can be returned in case listing is not allowed.
fn list(&self) -> BoxStream<Result<(bytes::Bytes, Node), Error>>;
}
#[async_trait]
@ -28,9 +29,11 @@ where
Ok(self.as_ref().get(name).cloned())
}
fn list(&self) -> BoxStream<Result<Node, Error>> {
fn list(&self) -> BoxStream<Result<(bytes::Bytes, Node), Error>> {
Box::pin(tokio_stream::iter(
self.as_ref().iter().map(|(_, v)| Ok(v.clone())),
self.as_ref()
.iter()
.map(|(name, node)| Ok((name.to_owned(), node.to_owned()))),
))
}
}