refactor(castore/fs): use streams for dir handles
This changes RootNodes::list to return a BoxStream<'static, _>, and then drops all the mpsc sender / receiver complexity we were having. There's also no need to worry about channel buffer sizes - all current RootNodes implementations are immediately ready to yield new elements in the stream. Assuming there's new implementations that do take some time, we can deal with buffer sizes on the producer size, which might know its own batch sizes better. RootNodes now doesn't need to implement Clone/Send anymore, and can have non-static lifetimes. As long as its the list method returns a BoxStream<'static>, we're fine with all that. On a first look, this seems like we now need to do more cloning upfront for the BTreeMap and Directory RootNodes impls. However, we already had to clone the entire thing at `self.root_nodes_provider.clone()`, and then did it again for each element. Now we get an owned version of the data whenever a list() call happens, and then just move owned things around. Change-Id: I85fbca0e1171014ae85eeb03b3d58e6176ef4e2d Reviewed-on: https://cl.snix.dev/c/snix/+/30549 Autosubmit: Florian Klink <flokli@flokli.de> Reviewed-by: Connor Brewster <cbrewster@hey.com> Tested-by: besadii
This commit is contained in:
parent
0f9c5f0354
commit
80b5206034
4 changed files with 62 additions and 92 deletions
|
|
@ -26,7 +26,7 @@ use fuse_backend_rs::abi::fuse_abi::{OpenOptions, stat64};
|
||||||
use fuse_backend_rs::api::filesystem::{
|
use fuse_backend_rs::api::filesystem::{
|
||||||
Context, FileSystem, FsOptions, GetxattrReply, ListxattrReply, ROOT_ID,
|
Context, FileSystem, FsOptions, GetxattrReply, ListxattrReply, ROOT_ID,
|
||||||
};
|
};
|
||||||
use futures::StreamExt;
|
use futures::{StreamExt, stream::BoxStream};
|
||||||
use parking_lot::RwLock;
|
use parking_lot::RwLock;
|
||||||
use std::sync::Mutex;
|
use std::sync::Mutex;
|
||||||
use std::{
|
use std::{
|
||||||
|
|
@ -37,11 +37,8 @@ use std::{
|
||||||
time::Duration,
|
time::Duration,
|
||||||
};
|
};
|
||||||
use std::{ffi::CStr, io::Cursor};
|
use std::{ffi::CStr, io::Cursor};
|
||||||
use tokio::{
|
use tokio::io::{AsyncReadExt, AsyncSeekExt};
|
||||||
io::{AsyncReadExt, AsyncSeekExt},
|
use tracing::{Span, debug, error, instrument, warn};
|
||||||
sync::mpsc,
|
|
||||||
};
|
|
||||||
use tracing::{Instrument as _, Span, debug, error, instrument, warn};
|
|
||||||
|
|
||||||
/// This implements a read-only FUSE filesystem for a snix-store
|
/// This implements a read-only FUSE filesystem for a snix-store
|
||||||
/// with the passed [BlobService], [DirectoryService] and [RootNodes].
|
/// with the passed [BlobService], [DirectoryService] and [RootNodes].
|
||||||
|
|
@ -94,16 +91,19 @@ pub struct SnixStoreFs<BS, DS, RN> {
|
||||||
|
|
||||||
// FUTUREWORK: have a generic container type for dir/file handles and handle
|
// FUTUREWORK: have a generic container type for dir/file handles and handle
|
||||||
// allocation.
|
// allocation.
|
||||||
/// Maps from the handle returned from an opendir to
|
/// This holds all opendir handles (for the root inode), keyed by the handle
|
||||||
/// This holds all opendir handles (for the root inode)
|
/// returned from the opendir call.
|
||||||
/// They point to the rx part of the channel producing the listing.
|
/// For each handle, we store an enumerated Result<(PathComponent, Node), crate::Error>.
|
||||||
|
/// The index is needed as we need to send offset information.
|
||||||
#[allow(clippy::type_complexity)]
|
#[allow(clippy::type_complexity)]
|
||||||
dir_handles: RwLock<
|
dir_handles: RwLock<
|
||||||
HashMap<
|
HashMap<
|
||||||
u64,
|
u64,
|
||||||
(
|
(
|
||||||
Span,
|
Span,
|
||||||
Arc<Mutex<mpsc::Receiver<(usize, Result<(PathComponent, Node), crate::Error>)>>>,
|
Arc<
|
||||||
|
Mutex<BoxStream<'static, (usize, Result<(PathComponent, Node), crate::Error>)>>,
|
||||||
|
>,
|
||||||
),
|
),
|
||||||
>,
|
>,
|
||||||
>,
|
>,
|
||||||
|
|
@ -123,7 +123,7 @@ impl<BS, DS, RN> SnixStoreFs<BS, DS, RN>
|
||||||
where
|
where
|
||||||
BS: BlobService,
|
BS: BlobService,
|
||||||
DS: DirectoryService,
|
DS: DirectoryService,
|
||||||
RN: RootNodes + Clone + 'static,
|
RN: RootNodes,
|
||||||
{
|
{
|
||||||
pub fn new(
|
pub fn new(
|
||||||
blob_service: BS,
|
blob_service: BS,
|
||||||
|
|
@ -289,9 +289,6 @@ where
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Buffer size of the channel providing nodes in the mount root
|
|
||||||
const ROOT_NODES_BUFFER_SIZE: usize = 16;
|
|
||||||
|
|
||||||
const XATTR_NAME_DIRECTORY_DIGEST: &[u8] = b"user.snix.castore.directory.digest";
|
const XATTR_NAME_DIRECTORY_DIGEST: &[u8] = b"user.snix.castore.directory.digest";
|
||||||
const XATTR_NAME_BLOB_DIGEST: &[u8] = b"user.snix.castore.blob.digest";
|
const XATTR_NAME_BLOB_DIGEST: &[u8] = b"user.snix.castore.blob.digest";
|
||||||
|
|
||||||
|
|
@ -300,7 +297,7 @@ impl<BS, DS, RN> fuse_backend_rs::api::filesystem::Layer for SnixStoreFs<BS, DS,
|
||||||
where
|
where
|
||||||
BS: BlobService,
|
BS: BlobService,
|
||||||
DS: DirectoryService,
|
DS: DirectoryService,
|
||||||
RN: RootNodes + Clone + 'static,
|
RN: RootNodes,
|
||||||
{
|
{
|
||||||
fn root_inode(&self) -> Self::Inode {
|
fn root_inode(&self) -> Self::Inode {
|
||||||
ROOT_ID
|
ROOT_ID
|
||||||
|
|
@ -311,7 +308,7 @@ impl<BS, DS, RN> FileSystem for SnixStoreFs<BS, DS, RN>
|
||||||
where
|
where
|
||||||
BS: BlobService,
|
BS: BlobService,
|
||||||
DS: DirectoryService,
|
DS: DirectoryService,
|
||||||
RN: RootNodes + Clone + 'static,
|
RN: RootNodes,
|
||||||
{
|
{
|
||||||
type Handle = u64;
|
type Handle = u64;
|
||||||
type Inode = u64;
|
type Inode = u64;
|
||||||
|
|
@ -418,27 +415,9 @@ where
|
||||||
return Err(io::Error::from_raw_os_error(libc::EPERM)); // same error code as ipfs/kubo
|
return Err(io::Error::from_raw_os_error(libc::EPERM)); // same error code as ipfs/kubo
|
||||||
}
|
}
|
||||||
|
|
||||||
let root_nodes_provider = self.root_nodes_provider.clone();
|
let stream = self.root_nodes_provider.list().enumerate().boxed();
|
||||||
let (tx, rx) = mpsc::channel(ROOT_NODES_BUFFER_SIZE);
|
|
||||||
|
|
||||||
// This task will run in the background immediately and will exit
|
// Put the stream into [self.dir_handles].
|
||||||
// after the stream ends or if we no longer want any more entries.
|
|
||||||
self.tokio_handle.spawn(
|
|
||||||
async move {
|
|
||||||
let mut stream = root_nodes_provider.list().enumerate();
|
|
||||||
while let Some(e) = stream.next().await {
|
|
||||||
if tx.send(e).await.is_err() {
|
|
||||||
// If we get a send error, it means the sync code
|
|
||||||
// doesn't want any more entries.
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// instrument the task with the current span, this is not done by default
|
|
||||||
.in_current_span(),
|
|
||||||
);
|
|
||||||
|
|
||||||
// Put the rx part into [self.dir_handles].
|
|
||||||
// TODO: this will overflow after 2**64 operations,
|
// TODO: this will overflow after 2**64 operations,
|
||||||
// which is fine for now.
|
// which is fine for now.
|
||||||
// See https://cl.tvl.fyi/c/depot/+/8834/comment/a6684ce0_d72469d1
|
// See https://cl.tvl.fyi/c/depot/+/8834/comment/a6684ce0_d72469d1
|
||||||
|
|
@ -447,7 +426,7 @@ where
|
||||||
|
|
||||||
self.dir_handles
|
self.dir_handles
|
||||||
.write()
|
.write()
|
||||||
.insert(dh, (Span::current(), Arc::new(Mutex::new(rx))));
|
.insert(dh, (Span::current(), Arc::new(Mutex::new(stream))));
|
||||||
|
|
||||||
return Ok((Some(dh), OpenOptions::NONSEEKABLE));
|
return Ok((Some(dh), OpenOptions::NONSEEKABLE));
|
||||||
}
|
}
|
||||||
|
|
@ -480,20 +459,18 @@ where
|
||||||
return Err(io::Error::from_raw_os_error(libc::EPERM)); // same error code as ipfs/kubo
|
return Err(io::Error::from_raw_os_error(libc::EPERM)); // same error code as ipfs/kubo
|
||||||
}
|
}
|
||||||
|
|
||||||
// get the handle from [self.dir_handles]
|
// get the stream from [self.dir_handles]
|
||||||
let (_span, rx) = match self.dir_handles.read().get(&handle) {
|
let dir_handles = self.dir_handles.read();
|
||||||
Some(rx) => rx.clone(),
|
let (_span, stream) = dir_handles.get(&handle).ok_or_else(|| {
|
||||||
None => {
|
warn!("dir handle {} unknown", handle);
|
||||||
warn!("dir handle {} unknown", handle);
|
io::Error::from_raw_os_error(libc::EIO)
|
||||||
return Err(io::Error::from_raw_os_error(libc::EIO));
|
})?;
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
let mut rx = rx
|
let mut stream = stream
|
||||||
.lock()
|
.lock()
|
||||||
.map_err(|_| crate::Error::StorageError("mutex poisoned".into()))?;
|
.map_err(|_| crate::Error::StorageError("mutex poisoned".into()))?;
|
||||||
|
|
||||||
while let Some((i, n)) = rx.blocking_recv() {
|
while let Some((i, n)) = self.tokio_handle.block_on(async { stream.next().await }) {
|
||||||
let (name, node) = n.map_err(|e| {
|
let (name, node) = n.map_err(|e| {
|
||||||
warn!("failed to retrieve root node: {}", e);
|
warn!("failed to retrieve root node: {}", e);
|
||||||
io::Error::from_raw_os_error(libc::EIO)
|
io::Error::from_raw_os_error(libc::EIO)
|
||||||
|
|
@ -569,20 +546,18 @@ where
|
||||||
return Err(io::Error::from_raw_os_error(libc::EPERM)); // same error code as ipfs/kubo
|
return Err(io::Error::from_raw_os_error(libc::EPERM)); // same error code as ipfs/kubo
|
||||||
}
|
}
|
||||||
|
|
||||||
// get the handle from [self.dir_handles]
|
// get the stream from [self.dir_handles]
|
||||||
let (_span, rx) = match self.dir_handles.read().get(&handle) {
|
let dir_handles = self.dir_handles.read();
|
||||||
Some(rx) => rx.clone(),
|
let (_span, stream) = dir_handles.get(&handle).ok_or_else(|| {
|
||||||
None => {
|
warn!("dir handle {} unknown", handle);
|
||||||
warn!("dir handle {} unknown", handle);
|
io::Error::from_raw_os_error(libc::EIO)
|
||||||
return Err(io::Error::from_raw_os_error(libc::EIO));
|
})?;
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
let mut rx = rx
|
let mut stream = stream
|
||||||
.lock()
|
.lock()
|
||||||
.map_err(|_| crate::Error::StorageError("mutex poisoned".into()))?;
|
.map_err(|_| crate::Error::StorageError("mutex poisoned".into()))?;
|
||||||
|
|
||||||
while let Some((i, n)) = rx.blocking_recv() {
|
while let Some((i, n)) = self.tokio_handle.block_on(async { stream.next().await }) {
|
||||||
let (name, node) = n.map_err(|e| {
|
let (name, node) = n.map_err(|e| {
|
||||||
warn!("failed to retrieve root node: {}", e);
|
warn!("failed to retrieve root node: {}", e);
|
||||||
io::Error::from_raw_os_error(libc::EPERM)
|
io::Error::from_raw_os_error(libc::EPERM)
|
||||||
|
|
@ -651,13 +626,12 @@ where
|
||||||
handle: Self::Handle,
|
handle: Self::Handle,
|
||||||
) -> io::Result<()> {
|
) -> io::Result<()> {
|
||||||
if inode == ROOT_ID {
|
if inode == ROOT_ID {
|
||||||
// drop the rx part of the channel.
|
// drop the stream.
|
||||||
match self.dir_handles.write().remove(&handle) {
|
if let Some(stream) = self.dir_handles.write().remove(&handle) {
|
||||||
// drop it, which will close it.
|
// drop it, which will close it.
|
||||||
Some(rx) => drop(rx),
|
drop(stream)
|
||||||
None => {
|
} else {
|
||||||
warn!("dir handle not found");
|
warn!("dir handle not found");
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -2,13 +2,14 @@ use std::collections::BTreeMap;
|
||||||
|
|
||||||
use crate::nodes::Directory;
|
use crate::nodes::Directory;
|
||||||
use crate::{Error, Node, path::PathComponent};
|
use crate::{Error, Node, path::PathComponent};
|
||||||
|
use futures::StreamExt;
|
||||||
use futures::stream::BoxStream;
|
use futures::stream::BoxStream;
|
||||||
use tonic::async_trait;
|
use tonic::async_trait;
|
||||||
|
|
||||||
/// Provides an interface for looking up root nodes in snix-castore by given
|
/// Provides an interface for looking up root nodes in snix-castore by given
|
||||||
/// a lookup key (usually the basename), and optionally allow a listing.
|
/// a lookup key (usually the basename), and optionally allow a listing.
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
pub trait RootNodes: Send + Sync {
|
pub trait RootNodes {
|
||||||
/// Looks up a root CA node based on the basename of the node in the root
|
/// Looks up a root CA node based on the basename of the node in the root
|
||||||
/// directory of the filesystem.
|
/// directory of the filesystem.
|
||||||
async fn get_by_basename(&self, name: &PathComponent) -> Result<Option<Node>, Error>;
|
async fn get_by_basename(&self, name: &PathComponent) -> Result<Option<Node>, Error>;
|
||||||
|
|
@ -16,7 +17,7 @@ pub trait RootNodes: Send + Sync {
|
||||||
/// Lists all root CA nodes in the filesystem, as a tuple of (base)name
|
/// Lists all root CA nodes in the filesystem, as a tuple of (base)name
|
||||||
/// and Node.
|
/// and Node.
|
||||||
/// An error can be returned in case listing is not allowed.
|
/// An error can be returned in case listing is not allowed.
|
||||||
fn list(&self) -> BoxStream<Result<(PathComponent, Node), Error>>;
|
fn list(&self) -> BoxStream<'static, Result<(PathComponent, Node), Error>>;
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
|
|
@ -30,12 +31,9 @@ where
|
||||||
Ok(self.as_ref().get(name).cloned())
|
Ok(self.as_ref().get(name).cloned())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn list(&self) -> BoxStream<Result<(PathComponent, Node), Error>> {
|
fn list(&self) -> BoxStream<'static, Result<(PathComponent, Node), Error>> {
|
||||||
Box::pin(tokio_stream::iter(
|
let data = self.as_ref().to_owned();
|
||||||
self.as_ref()
|
futures::stream::iter(data.into_iter().map(Ok)).boxed()
|
||||||
.iter()
|
|
||||||
.map(|(name, node)| Ok((name.to_owned(), node.to_owned()))),
|
|
||||||
))
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -48,10 +46,8 @@ impl RootNodes for Directory {
|
||||||
.map(|(_, node)| node.clone()))
|
.map(|(_, node)| node.clone()))
|
||||||
}
|
}
|
||||||
|
|
||||||
fn list(&self) -> BoxStream<Result<(PathComponent, Node), Error>> {
|
fn list(&self) -> BoxStream<'static, Result<(PathComponent, Node), Error>> {
|
||||||
Box::pin(tokio_stream::iter(
|
let data = self.to_owned();
|
||||||
self.nodes()
|
futures::stream::iter(data.into_nodes().map(Ok)).boxed()
|
||||||
.map(|(name, node)| Ok((name.to_owned(), node.to_owned()))),
|
|
||||||
))
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -61,13 +61,13 @@ impl Directory {
|
||||||
/// Allows iterating over all nodes (directories, files and symlinks)
|
/// Allows iterating over all nodes (directories, files and symlinks)
|
||||||
/// For each, it returns a tuple of its name and node.
|
/// For each, it returns a tuple of its name and node.
|
||||||
/// The elements are sorted by their names.
|
/// The elements are sorted by their names.
|
||||||
pub fn nodes(&self) -> impl Iterator<Item = (&PathComponent, &Node)> + Send + Sync + '_ {
|
pub fn nodes(&self) -> impl Iterator<Item = (&PathComponent, &Node)> + '_ {
|
||||||
self.nodes.iter()
|
self.nodes.iter()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Dissolves a Directory into its individual names and nodes.
|
/// Dissolves a Directory into its individual names and nodes.
|
||||||
/// The elements are sorted by their names.
|
/// The elements are sorted by their names.
|
||||||
pub fn into_nodes(self) -> impl Iterator<Item = (PathComponent, Node)> + Send + Sync {
|
pub fn into_nodes(self) -> impl Iterator<Item = (PathComponent, Node)> {
|
||||||
self.nodes.into_iter()
|
self.nodes.into_iter()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -1,5 +1,5 @@
|
||||||
use futures::StreamExt;
|
|
||||||
use futures::stream::BoxStream;
|
use futures::stream::BoxStream;
|
||||||
|
use futures::{StreamExt, TryStreamExt};
|
||||||
use nix_compat::store_path::StorePathRef;
|
use nix_compat::store_path::StorePathRef;
|
||||||
use snix_castore::fs::{RootNodes, SnixStoreFs};
|
use snix_castore::fs::{RootNodes, SnixStoreFs};
|
||||||
use snix_castore::{Error, Node, PathComponent};
|
use snix_castore::{Error, Node, PathComponent};
|
||||||
|
|
@ -46,7 +46,7 @@ pub struct RootNodesWrapper<T>(pub(crate) T);
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl<T> RootNodes for RootNodesWrapper<T>
|
impl<T> RootNodes for RootNodesWrapper<T>
|
||||||
where
|
where
|
||||||
T: PathInfoService + Send + Sync,
|
T: PathInfoService,
|
||||||
{
|
{
|
||||||
async fn get_by_basename(&self, name: &PathComponent) -> Result<Option<Node>, Error> {
|
async fn get_by_basename(&self, name: &PathComponent) -> Result<Option<Node>, Error> {
|
||||||
let Ok(store_path) = StorePathRef::from_bytes(name.as_ref()) else {
|
let Ok(store_path) = StorePathRef::from_bytes(name.as_ref()) else {
|
||||||
|
|
@ -60,18 +60,18 @@ where
|
||||||
.map(|path_info| path_info.node))
|
.map(|path_info| path_info.node))
|
||||||
}
|
}
|
||||||
|
|
||||||
fn list(&self) -> BoxStream<Result<(PathComponent, Node), Error>> {
|
fn list(&self) -> BoxStream<'static, Result<(PathComponent, Node), Error>> {
|
||||||
Box::pin(self.0.list().map(|result| {
|
self.0
|
||||||
result.map(|path_info| {
|
.list()
|
||||||
let basename = path_info.store_path.to_string();
|
.map_ok(|path_info| {
|
||||||
(
|
let name = path_info
|
||||||
basename
|
.store_path
|
||||||
.as_str()
|
.to_string()
|
||||||
.try_into()
|
.as_str()
|
||||||
.expect("Snix bug: StorePath must be PathComponent"),
|
.try_into()
|
||||||
path_info.node,
|
.expect("Snix bug: StorePath must be PathComponent");
|
||||||
)
|
(name, path_info.node)
|
||||||
})
|
})
|
||||||
}))
|
.boxed()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue