From 80b520603461f999ce3b73b9e1e7af16ed8e409d Mon Sep 17 00:00:00 2001 From: Florian Klink Date: Mon, 2 Jun 2025 17:53:33 +0200 Subject: [PATCH] refactor(castore/fs): use streams for dir handles This changes RootNodes::list to return a BoxStream<'static, _>, and then drops all the mpsc sender / receiver complexity we were having. There's also no need to worry about channel buffer sizes - all current RootNodes implementations are immediately ready to yield new elements in the stream. Assuming there's new implementations that do take some time, we can deal with buffer sizes on the producer size, which might know its own batch sizes better. RootNodes now doesn't need to implement Clone/Send anymore, and can have non-static lifetimes. As long as its the list method returns a BoxStream<'static>, we're fine with all that. On a first look, this seems like we now need to do more cloning upfront for the BTreeMap and Directory RootNodes impls. However, we already had to clone the entire thing at `self.root_nodes_provider.clone()`, and then did it again for each element. Now we get an owned version of the data whenever a list() call happens, and then just move owned things around. Change-Id: I85fbca0e1171014ae85eeb03b3d58e6176ef4e2d Reviewed-on: https://cl.snix.dev/c/snix/+/30549 Autosubmit: Florian Klink Reviewed-by: Connor Brewster Tested-by: besadii --- snix/castore/src/fs/mod.rs | 100 +++++++++-------------- snix/castore/src/fs/root_nodes.rs | 22 ++--- snix/castore/src/nodes/directory.rs | 4 +- snix/store/src/pathinfoservice/fs/mod.rs | 28 +++---- 4 files changed, 62 insertions(+), 92 deletions(-) diff --git a/snix/castore/src/fs/mod.rs b/snix/castore/src/fs/mod.rs index 40a1148e1..2e5ac1bef 100644 --- a/snix/castore/src/fs/mod.rs +++ b/snix/castore/src/fs/mod.rs @@ -26,7 +26,7 @@ use fuse_backend_rs::abi::fuse_abi::{OpenOptions, stat64}; use fuse_backend_rs::api::filesystem::{ Context, FileSystem, FsOptions, GetxattrReply, ListxattrReply, ROOT_ID, }; -use futures::StreamExt; +use futures::{StreamExt, stream::BoxStream}; use parking_lot::RwLock; use std::sync::Mutex; use std::{ @@ -37,11 +37,8 @@ use std::{ time::Duration, }; use std::{ffi::CStr, io::Cursor}; -use tokio::{ - io::{AsyncReadExt, AsyncSeekExt}, - sync::mpsc, -}; -use tracing::{Instrument as _, Span, debug, error, instrument, warn}; +use tokio::io::{AsyncReadExt, AsyncSeekExt}; +use tracing::{Span, debug, error, instrument, warn}; /// This implements a read-only FUSE filesystem for a snix-store /// with the passed [BlobService], [DirectoryService] and [RootNodes]. @@ -94,16 +91,19 @@ pub struct SnixStoreFs { // FUTUREWORK: have a generic container type for dir/file handles and handle // allocation. - /// Maps from the handle returned from an opendir to - /// This holds all opendir handles (for the root inode) - /// They point to the rx part of the channel producing the listing. + /// This holds all opendir handles (for the root inode), keyed by the handle + /// returned from the opendir call. + /// For each handle, we store an enumerated Result<(PathComponent, Node), crate::Error>. + /// The index is needed as we need to send offset information. #[allow(clippy::type_complexity)] dir_handles: RwLock< HashMap< u64, ( Span, - Arc)>>>, + Arc< + Mutex)>>, + >, ), >, >, @@ -123,7 +123,7 @@ impl SnixStoreFs where BS: BlobService, DS: DirectoryService, - RN: RootNodes + Clone + 'static, + RN: RootNodes, { pub fn new( blob_service: BS, @@ -289,9 +289,6 @@ where } } -/// Buffer size of the channel providing nodes in the mount root -const ROOT_NODES_BUFFER_SIZE: usize = 16; - const XATTR_NAME_DIRECTORY_DIGEST: &[u8] = b"user.snix.castore.directory.digest"; const XATTR_NAME_BLOB_DIGEST: &[u8] = b"user.snix.castore.blob.digest"; @@ -300,7 +297,7 @@ impl fuse_backend_rs::api::filesystem::Layer for SnixStoreFs Self::Inode { ROOT_ID @@ -311,7 +308,7 @@ impl FileSystem for SnixStoreFs where BS: BlobService, DS: DirectoryService, - RN: RootNodes + Clone + 'static, + RN: RootNodes, { type Handle = u64; type Inode = u64; @@ -418,27 +415,9 @@ where return Err(io::Error::from_raw_os_error(libc::EPERM)); // same error code as ipfs/kubo } - let root_nodes_provider = self.root_nodes_provider.clone(); - let (tx, rx) = mpsc::channel(ROOT_NODES_BUFFER_SIZE); + let stream = self.root_nodes_provider.list().enumerate().boxed(); - // This task will run in the background immediately and will exit - // after the stream ends or if we no longer want any more entries. - self.tokio_handle.spawn( - async move { - let mut stream = root_nodes_provider.list().enumerate(); - while let Some(e) = stream.next().await { - if tx.send(e).await.is_err() { - // If we get a send error, it means the sync code - // doesn't want any more entries. - break; - } - } - } - // instrument the task with the current span, this is not done by default - .in_current_span(), - ); - - // Put the rx part into [self.dir_handles]. + // Put the stream into [self.dir_handles]. // TODO: this will overflow after 2**64 operations, // which is fine for now. // See https://cl.tvl.fyi/c/depot/+/8834/comment/a6684ce0_d72469d1 @@ -447,7 +426,7 @@ where self.dir_handles .write() - .insert(dh, (Span::current(), Arc::new(Mutex::new(rx)))); + .insert(dh, (Span::current(), Arc::new(Mutex::new(stream)))); return Ok((Some(dh), OpenOptions::NONSEEKABLE)); } @@ -480,20 +459,18 @@ where return Err(io::Error::from_raw_os_error(libc::EPERM)); // same error code as ipfs/kubo } - // get the handle from [self.dir_handles] - let (_span, rx) = match self.dir_handles.read().get(&handle) { - Some(rx) => rx.clone(), - None => { - warn!("dir handle {} unknown", handle); - return Err(io::Error::from_raw_os_error(libc::EIO)); - } - }; + // get the stream from [self.dir_handles] + let dir_handles = self.dir_handles.read(); + let (_span, stream) = dir_handles.get(&handle).ok_or_else(|| { + warn!("dir handle {} unknown", handle); + io::Error::from_raw_os_error(libc::EIO) + })?; - let mut rx = rx + let mut stream = stream .lock() .map_err(|_| crate::Error::StorageError("mutex poisoned".into()))?; - while let Some((i, n)) = rx.blocking_recv() { + while let Some((i, n)) = self.tokio_handle.block_on(async { stream.next().await }) { let (name, node) = n.map_err(|e| { warn!("failed to retrieve root node: {}", e); io::Error::from_raw_os_error(libc::EIO) @@ -569,20 +546,18 @@ where return Err(io::Error::from_raw_os_error(libc::EPERM)); // same error code as ipfs/kubo } - // get the handle from [self.dir_handles] - let (_span, rx) = match self.dir_handles.read().get(&handle) { - Some(rx) => rx.clone(), - None => { - warn!("dir handle {} unknown", handle); - return Err(io::Error::from_raw_os_error(libc::EIO)); - } - }; + // get the stream from [self.dir_handles] + let dir_handles = self.dir_handles.read(); + let (_span, stream) = dir_handles.get(&handle).ok_or_else(|| { + warn!("dir handle {} unknown", handle); + io::Error::from_raw_os_error(libc::EIO) + })?; - let mut rx = rx + let mut stream = stream .lock() .map_err(|_| crate::Error::StorageError("mutex poisoned".into()))?; - while let Some((i, n)) = rx.blocking_recv() { + while let Some((i, n)) = self.tokio_handle.block_on(async { stream.next().await }) { let (name, node) = n.map_err(|e| { warn!("failed to retrieve root node: {}", e); io::Error::from_raw_os_error(libc::EPERM) @@ -651,13 +626,12 @@ where handle: Self::Handle, ) -> io::Result<()> { if inode == ROOT_ID { - // drop the rx part of the channel. - match self.dir_handles.write().remove(&handle) { + // drop the stream. + if let Some(stream) = self.dir_handles.write().remove(&handle) { // drop it, which will close it. - Some(rx) => drop(rx), - None => { - warn!("dir handle not found"); - } + drop(stream) + } else { + warn!("dir handle not found"); } } diff --git a/snix/castore/src/fs/root_nodes.rs b/snix/castore/src/fs/root_nodes.rs index aef66893d..60d4d2651 100644 --- a/snix/castore/src/fs/root_nodes.rs +++ b/snix/castore/src/fs/root_nodes.rs @@ -2,13 +2,14 @@ use std::collections::BTreeMap; use crate::nodes::Directory; use crate::{Error, Node, path::PathComponent}; +use futures::StreamExt; use futures::stream::BoxStream; use tonic::async_trait; /// Provides an interface for looking up root nodes in snix-castore by given /// a lookup key (usually the basename), and optionally allow a listing. #[async_trait] -pub trait RootNodes: Send + Sync { +pub trait RootNodes { /// Looks up a root CA node based on the basename of the node in the root /// directory of the filesystem. async fn get_by_basename(&self, name: &PathComponent) -> Result, Error>; @@ -16,7 +17,7 @@ pub trait RootNodes: Send + Sync { /// Lists all root CA nodes in the filesystem, as a tuple of (base)name /// and Node. /// An error can be returned in case listing is not allowed. - fn list(&self) -> BoxStream>; + fn list(&self) -> BoxStream<'static, Result<(PathComponent, Node), Error>>; } #[async_trait] @@ -30,12 +31,9 @@ where Ok(self.as_ref().get(name).cloned()) } - fn list(&self) -> BoxStream> { - Box::pin(tokio_stream::iter( - self.as_ref() - .iter() - .map(|(name, node)| Ok((name.to_owned(), node.to_owned()))), - )) + fn list(&self) -> BoxStream<'static, Result<(PathComponent, Node), Error>> { + let data = self.as_ref().to_owned(); + futures::stream::iter(data.into_iter().map(Ok)).boxed() } } @@ -48,10 +46,8 @@ impl RootNodes for Directory { .map(|(_, node)| node.clone())) } - fn list(&self) -> BoxStream> { - Box::pin(tokio_stream::iter( - self.nodes() - .map(|(name, node)| Ok((name.to_owned(), node.to_owned()))), - )) + fn list(&self) -> BoxStream<'static, Result<(PathComponent, Node), Error>> { + let data = self.to_owned(); + futures::stream::iter(data.into_nodes().map(Ok)).boxed() } } diff --git a/snix/castore/src/nodes/directory.rs b/snix/castore/src/nodes/directory.rs index 845f292e8..1b66d4df4 100644 --- a/snix/castore/src/nodes/directory.rs +++ b/snix/castore/src/nodes/directory.rs @@ -61,13 +61,13 @@ impl Directory { /// Allows iterating over all nodes (directories, files and symlinks) /// For each, it returns a tuple of its name and node. /// The elements are sorted by their names. - pub fn nodes(&self) -> impl Iterator + Send + Sync + '_ { + pub fn nodes(&self) -> impl Iterator + '_ { self.nodes.iter() } /// Dissolves a Directory into its individual names and nodes. /// The elements are sorted by their names. - pub fn into_nodes(self) -> impl Iterator + Send + Sync { + pub fn into_nodes(self) -> impl Iterator { self.nodes.into_iter() } diff --git a/snix/store/src/pathinfoservice/fs/mod.rs b/snix/store/src/pathinfoservice/fs/mod.rs index 9f4d1019b..90ec5d3e4 100644 --- a/snix/store/src/pathinfoservice/fs/mod.rs +++ b/snix/store/src/pathinfoservice/fs/mod.rs @@ -1,5 +1,5 @@ -use futures::StreamExt; use futures::stream::BoxStream; +use futures::{StreamExt, TryStreamExt}; use nix_compat::store_path::StorePathRef; use snix_castore::fs::{RootNodes, SnixStoreFs}; use snix_castore::{Error, Node, PathComponent}; @@ -46,7 +46,7 @@ pub struct RootNodesWrapper(pub(crate) T); #[async_trait] impl RootNodes for RootNodesWrapper where - T: PathInfoService + Send + Sync, + T: PathInfoService, { async fn get_by_basename(&self, name: &PathComponent) -> Result, Error> { let Ok(store_path) = StorePathRef::from_bytes(name.as_ref()) else { @@ -60,18 +60,18 @@ where .map(|path_info| path_info.node)) } - fn list(&self) -> BoxStream> { - Box::pin(self.0.list().map(|result| { - result.map(|path_info| { - let basename = path_info.store_path.to_string(); - ( - basename - .as_str() - .try_into() - .expect("Snix bug: StorePath must be PathComponent"), - path_info.node, - ) + fn list(&self) -> BoxStream<'static, Result<(PathComponent, Node), Error>> { + self.0 + .list() + .map_ok(|path_info| { + let name = path_info + .store_path + .to_string() + .as_str() + .try_into() + .expect("Snix bug: StorePath must be PathComponent"); + (name, path_info.node) }) - })) + .boxed() } }