refactor(tvix/store/fs): mv sparse -> populated directories

Do this upgrade whenever someone is actually interested in the children
of a directory, but that directory doesn't contain a more detailed
listing. This is much more predictable, and removes a bunch of confusing
code from the inode tracker itself.

Change-Id: Ib3a13694d6d5d22887d2d04ae429592137f39cb4
Reviewed-on: https://cl.tvl.fyi/c/depot/+/9982
Autosubmit: flokli <flokli@flokli.de>
Tested-by: BuildkiteCI
Reviewed-by: Connor Brewster <cbrewster@hey.com>
This commit is contained in:
Florian Klink 2023-11-07 21:54:47 +02:00 committed by clbot
parent a778b855d2
commit 9cd2e92065
3 changed files with 54 additions and 317 deletions

View file

@ -30,11 +30,12 @@ use tokio::{
sync::mpsc,
};
use tracing::{debug, info_span, instrument, warn};
use tvix_castore::proto as castorepb;
use tvix_castore::{
blobservice::{BlobReader, BlobService},
directoryservice::DirectoryService,
proto::{node::Node, NamedNode},
B3Digest, Error,
B3Digest,
};
use self::{
@ -146,50 +147,49 @@ impl TvixStoreFs {
ref parent_digest,
ref children,
)) => Ok((parent_digest.clone(), children.clone())),
// if it's sparse, fetch data using fetch_directory_inode_data and insert.
// if it's sparse, fetch data using directory_service, populate child nodes
// and update it in [self.inode_tracker].
InodeData::Directory(DirectoryInodeData::Sparse(ref parent_digest, _)) => {
let directory_service = self.directory_service.clone();
let parent_digest = parent_digest.to_owned();
match self
let directory = self
.tokio_handle
.block_on(self.tokio_handle.spawn(async move {
match directory_service.get(&parent_digest).await? {
// If the Directory can't be found, this is a hole, bail out.
None => {
warn!(directory.digest=%parent_digest, "directory not found");
Err(Error::StorageError(format!(
"directory {} not found",
parent_digest
)))
}
Some(directory) => Ok(directory.into()),
}
.block_on(self.tokio_handle.spawn({
let directory_service = self.directory_service.clone();
let parent_digest = parent_digest.to_owned();
async move { directory_service.get(&parent_digest).await }
}))
.unwrap()
{
Err(_e) => Err(io::Error::from_raw_os_error(libc::EIO)),
Ok(
ref data @ InodeData::Directory(DirectoryInodeData::Populated(
ref parent_digest,
_,
)),
) => {
// update data in [self.inode_tracker] with populated variant.
// we need to round-trip via self.inode_tracker so
// inodes for children are populated.
self.inode_tracker.write().put(data.clone());
let children = match *self.inode_tracker.read().get(ino).unwrap() {
InodeData::Directory(DirectoryInodeData::Populated(
_,
ref children,
)) => children.to_owned(),
_ => unreachable!(),
};
Ok((parent_digest.clone(), children))
}
// we know fetch_directory_inode_data only returns InodeData::Directory(DirectoryInodeData::Populated(..))
Ok(_) => panic!("unexpected type"),
}
.unwrap()?
.ok_or_else(|| {
warn!(directory.digest=%parent_digest, "directory not found");
// If the Directory can't be found, this is a hole, bail out.
io::Error::from_raw_os_error(libc::EIO)
})?;
// Turn the retrieved directory into a InodeData::Directory(DirectoryInodeData::Populated(..)),
// allocating inodes for the children on the way.
let children = {
let mut inode_tracker = self.inode_tracker.write();
let children: Vec<(u64, castorepb::node::Node)> = directory
.nodes()
.map(|child_node| {
let child_ino = inode_tracker.put((&child_node).into());
(child_ino, child_node)
})
.collect();
// replace.
inode_tracker.replace(
ino,
Arc::new(InodeData::Directory(DirectoryInodeData::Populated(
parent_digest.clone(),
children.clone(),
))),
);
children
};
Ok((parent_digest.clone(), children))
}
// if the parent inode was not a directory, this doesn't make sense
InodeData::Regular(..) | InodeData::Symlink(_) => {