diff --git a/snix/castore/src/fs/mod.rs b/snix/castore/src/fs/mod.rs index 40a1148e1..2e5ac1bef 100644 --- a/snix/castore/src/fs/mod.rs +++ b/snix/castore/src/fs/mod.rs @@ -26,7 +26,7 @@ use fuse_backend_rs::abi::fuse_abi::{OpenOptions, stat64}; use fuse_backend_rs::api::filesystem::{ Context, FileSystem, FsOptions, GetxattrReply, ListxattrReply, ROOT_ID, }; -use futures::StreamExt; +use futures::{StreamExt, stream::BoxStream}; use parking_lot::RwLock; use std::sync::Mutex; use std::{ @@ -37,11 +37,8 @@ use std::{ time::Duration, }; use std::{ffi::CStr, io::Cursor}; -use tokio::{ - io::{AsyncReadExt, AsyncSeekExt}, - sync::mpsc, -}; -use tracing::{Instrument as _, Span, debug, error, instrument, warn}; +use tokio::io::{AsyncReadExt, AsyncSeekExt}; +use tracing::{Span, debug, error, instrument, warn}; /// This implements a read-only FUSE filesystem for a snix-store /// with the passed [BlobService], [DirectoryService] and [RootNodes]. @@ -94,16 +91,19 @@ pub struct SnixStoreFs { // FUTUREWORK: have a generic container type for dir/file handles and handle // allocation. - /// Maps from the handle returned from an opendir to - /// This holds all opendir handles (for the root inode) - /// They point to the rx part of the channel producing the listing. + /// This holds all opendir handles (for the root inode), keyed by the handle + /// returned from the opendir call. + /// For each handle, we store an enumerated Result<(PathComponent, Node), crate::Error>. + /// The index is needed as we need to send offset information. #[allow(clippy::type_complexity)] dir_handles: RwLock< HashMap< u64, ( Span, - Arc)>>>, + Arc< + Mutex)>>, + >, ), >, >, @@ -123,7 +123,7 @@ impl SnixStoreFs where BS: BlobService, DS: DirectoryService, - RN: RootNodes + Clone + 'static, + RN: RootNodes, { pub fn new( blob_service: BS, @@ -289,9 +289,6 @@ where } } -/// Buffer size of the channel providing nodes in the mount root -const ROOT_NODES_BUFFER_SIZE: usize = 16; - const XATTR_NAME_DIRECTORY_DIGEST: &[u8] = b"user.snix.castore.directory.digest"; const XATTR_NAME_BLOB_DIGEST: &[u8] = b"user.snix.castore.blob.digest"; @@ -300,7 +297,7 @@ impl fuse_backend_rs::api::filesystem::Layer for SnixStoreFs Self::Inode { ROOT_ID @@ -311,7 +308,7 @@ impl FileSystem for SnixStoreFs where BS: BlobService, DS: DirectoryService, - RN: RootNodes + Clone + 'static, + RN: RootNodes, { type Handle = u64; type Inode = u64; @@ -418,27 +415,9 @@ where return Err(io::Error::from_raw_os_error(libc::EPERM)); // same error code as ipfs/kubo } - let root_nodes_provider = self.root_nodes_provider.clone(); - let (tx, rx) = mpsc::channel(ROOT_NODES_BUFFER_SIZE); + let stream = self.root_nodes_provider.list().enumerate().boxed(); - // This task will run in the background immediately and will exit - // after the stream ends or if we no longer want any more entries. - self.tokio_handle.spawn( - async move { - let mut stream = root_nodes_provider.list().enumerate(); - while let Some(e) = stream.next().await { - if tx.send(e).await.is_err() { - // If we get a send error, it means the sync code - // doesn't want any more entries. - break; - } - } - } - // instrument the task with the current span, this is not done by default - .in_current_span(), - ); - - // Put the rx part into [self.dir_handles]. + // Put the stream into [self.dir_handles]. // TODO: this will overflow after 2**64 operations, // which is fine for now. // See https://cl.tvl.fyi/c/depot/+/8834/comment/a6684ce0_d72469d1 @@ -447,7 +426,7 @@ where self.dir_handles .write() - .insert(dh, (Span::current(), Arc::new(Mutex::new(rx)))); + .insert(dh, (Span::current(), Arc::new(Mutex::new(stream)))); return Ok((Some(dh), OpenOptions::NONSEEKABLE)); } @@ -480,20 +459,18 @@ where return Err(io::Error::from_raw_os_error(libc::EPERM)); // same error code as ipfs/kubo } - // get the handle from [self.dir_handles] - let (_span, rx) = match self.dir_handles.read().get(&handle) { - Some(rx) => rx.clone(), - None => { - warn!("dir handle {} unknown", handle); - return Err(io::Error::from_raw_os_error(libc::EIO)); - } - }; + // get the stream from [self.dir_handles] + let dir_handles = self.dir_handles.read(); + let (_span, stream) = dir_handles.get(&handle).ok_or_else(|| { + warn!("dir handle {} unknown", handle); + io::Error::from_raw_os_error(libc::EIO) + })?; - let mut rx = rx + let mut stream = stream .lock() .map_err(|_| crate::Error::StorageError("mutex poisoned".into()))?; - while let Some((i, n)) = rx.blocking_recv() { + while let Some((i, n)) = self.tokio_handle.block_on(async { stream.next().await }) { let (name, node) = n.map_err(|e| { warn!("failed to retrieve root node: {}", e); io::Error::from_raw_os_error(libc::EIO) @@ -569,20 +546,18 @@ where return Err(io::Error::from_raw_os_error(libc::EPERM)); // same error code as ipfs/kubo } - // get the handle from [self.dir_handles] - let (_span, rx) = match self.dir_handles.read().get(&handle) { - Some(rx) => rx.clone(), - None => { - warn!("dir handle {} unknown", handle); - return Err(io::Error::from_raw_os_error(libc::EIO)); - } - }; + // get the stream from [self.dir_handles] + let dir_handles = self.dir_handles.read(); + let (_span, stream) = dir_handles.get(&handle).ok_or_else(|| { + warn!("dir handle {} unknown", handle); + io::Error::from_raw_os_error(libc::EIO) + })?; - let mut rx = rx + let mut stream = stream .lock() .map_err(|_| crate::Error::StorageError("mutex poisoned".into()))?; - while let Some((i, n)) = rx.blocking_recv() { + while let Some((i, n)) = self.tokio_handle.block_on(async { stream.next().await }) { let (name, node) = n.map_err(|e| { warn!("failed to retrieve root node: {}", e); io::Error::from_raw_os_error(libc::EPERM) @@ -651,13 +626,12 @@ where handle: Self::Handle, ) -> io::Result<()> { if inode == ROOT_ID { - // drop the rx part of the channel. - match self.dir_handles.write().remove(&handle) { + // drop the stream. + if let Some(stream) = self.dir_handles.write().remove(&handle) { // drop it, which will close it. - Some(rx) => drop(rx), - None => { - warn!("dir handle not found"); - } + drop(stream) + } else { + warn!("dir handle not found"); } } diff --git a/snix/castore/src/fs/root_nodes.rs b/snix/castore/src/fs/root_nodes.rs index aef66893d..60d4d2651 100644 --- a/snix/castore/src/fs/root_nodes.rs +++ b/snix/castore/src/fs/root_nodes.rs @@ -2,13 +2,14 @@ use std::collections::BTreeMap; use crate::nodes::Directory; use crate::{Error, Node, path::PathComponent}; +use futures::StreamExt; use futures::stream::BoxStream; use tonic::async_trait; /// Provides an interface for looking up root nodes in snix-castore by given /// a lookup key (usually the basename), and optionally allow a listing. #[async_trait] -pub trait RootNodes: Send + Sync { +pub trait RootNodes { /// Looks up a root CA node based on the basename of the node in the root /// directory of the filesystem. async fn get_by_basename(&self, name: &PathComponent) -> Result, Error>; @@ -16,7 +17,7 @@ pub trait RootNodes: Send + Sync { /// Lists all root CA nodes in the filesystem, as a tuple of (base)name /// and Node. /// An error can be returned in case listing is not allowed. - fn list(&self) -> BoxStream>; + fn list(&self) -> BoxStream<'static, Result<(PathComponent, Node), Error>>; } #[async_trait] @@ -30,12 +31,9 @@ where Ok(self.as_ref().get(name).cloned()) } - fn list(&self) -> BoxStream> { - Box::pin(tokio_stream::iter( - self.as_ref() - .iter() - .map(|(name, node)| Ok((name.to_owned(), node.to_owned()))), - )) + fn list(&self) -> BoxStream<'static, Result<(PathComponent, Node), Error>> { + let data = self.as_ref().to_owned(); + futures::stream::iter(data.into_iter().map(Ok)).boxed() } } @@ -48,10 +46,8 @@ impl RootNodes for Directory { .map(|(_, node)| node.clone())) } - fn list(&self) -> BoxStream> { - Box::pin(tokio_stream::iter( - self.nodes() - .map(|(name, node)| Ok((name.to_owned(), node.to_owned()))), - )) + fn list(&self) -> BoxStream<'static, Result<(PathComponent, Node), Error>> { + let data = self.to_owned(); + futures::stream::iter(data.into_nodes().map(Ok)).boxed() } } diff --git a/snix/castore/src/nodes/directory.rs b/snix/castore/src/nodes/directory.rs index 845f292e8..1b66d4df4 100644 --- a/snix/castore/src/nodes/directory.rs +++ b/snix/castore/src/nodes/directory.rs @@ -61,13 +61,13 @@ impl Directory { /// Allows iterating over all nodes (directories, files and symlinks) /// For each, it returns a tuple of its name and node. /// The elements are sorted by their names. - pub fn nodes(&self) -> impl Iterator + Send + Sync + '_ { + pub fn nodes(&self) -> impl Iterator + '_ { self.nodes.iter() } /// Dissolves a Directory into its individual names and nodes. /// The elements are sorted by their names. - pub fn into_nodes(self) -> impl Iterator + Send + Sync { + pub fn into_nodes(self) -> impl Iterator { self.nodes.into_iter() } diff --git a/snix/store/src/pathinfoservice/fs/mod.rs b/snix/store/src/pathinfoservice/fs/mod.rs index 9f4d1019b..90ec5d3e4 100644 --- a/snix/store/src/pathinfoservice/fs/mod.rs +++ b/snix/store/src/pathinfoservice/fs/mod.rs @@ -1,5 +1,5 @@ -use futures::StreamExt; use futures::stream::BoxStream; +use futures::{StreamExt, TryStreamExt}; use nix_compat::store_path::StorePathRef; use snix_castore::fs::{RootNodes, SnixStoreFs}; use snix_castore::{Error, Node, PathComponent}; @@ -46,7 +46,7 @@ pub struct RootNodesWrapper(pub(crate) T); #[async_trait] impl RootNodes for RootNodesWrapper where - T: PathInfoService + Send + Sync, + T: PathInfoService, { async fn get_by_basename(&self, name: &PathComponent) -> Result, Error> { let Ok(store_path) = StorePathRef::from_bytes(name.as_ref()) else { @@ -60,18 +60,18 @@ where .map(|path_info| path_info.node)) } - fn list(&self) -> BoxStream> { - Box::pin(self.0.list().map(|result| { - result.map(|path_info| { - let basename = path_info.store_path.to_string(); - ( - basename - .as_str() - .try_into() - .expect("Snix bug: StorePath must be PathComponent"), - path_info.node, - ) + fn list(&self) -> BoxStream<'static, Result<(PathComponent, Node), Error>> { + self.0 + .list() + .map_ok(|path_info| { + let name = path_info + .store_path + .to_string() + .as_str() + .try_into() + .expect("Snix bug: StorePath must be PathComponent"); + (name, path_info.node) }) - })) + .boxed() } }