feat(tvix/castore/fs): implement opendir/releasedir
Similar to how we already handle opening files, implement opendir/ releasedir, and keep a map of dir_handles. They point to the rx side of a channel. This greatly improves performance listing the root of the filesystem when used inside tvix-store, as we don't need to re-request the listing (and skip to the desired position) all the time. Change-Id: I0d3ec4cb70a8792c5a1343439cf47d78d9cbb1d6 Reviewed-on: https://cl.tvl.fyi/c/depot/+/11425 Autosubmit: flokli <flokli@flokli.de> Reviewed-by: raitobezarius <tvl@lahfa.xyz> Tested-by: BuildkiteCI
This commit is contained in:
		
							parent
							
								
									3ed7eda79b
								
							
						
					
					
						commit
						2a47ad1d90
					
				
					 2 changed files with 142 additions and 66 deletions
				
			
		|  | @ -20,7 +20,7 @@ use crate::{ | |||
|     B3Digest, | ||||
| }; | ||||
| use bstr::ByteVec; | ||||
| use fuse_backend_rs::abi::fuse_abi::stat64; | ||||
| use fuse_backend_rs::abi::fuse_abi::{stat64, OpenOptions}; | ||||
| use fuse_backend_rs::api::filesystem::{ | ||||
|     Context, FileSystem, FsOptions, GetxattrReply, ListxattrReply, ROOT_ID, | ||||
| }; | ||||
|  | @ -97,6 +97,17 @@ pub struct TvixStoreFs<BS, DS, RN> { | |||
|     /// This keeps track of inodes and data alongside them.
 | ||||
|     inode_tracker: RwLock<InodeTracker>, | ||||
| 
 | ||||
|     // FUTUREWORK: have a generic container type for dir/file handles and handle
 | ||||
|     // allocation.
 | ||||
|     /// Maps from the handle returned from an opendir to
 | ||||
|     /// This holds all opendir handles (for the root inode)
 | ||||
|     /// They point to the rx part of the channel producing the listing.
 | ||||
|     #[allow(clippy::type_complexity)] | ||||
|     dir_handles: | ||||
|         RwLock<HashMap<u64, Arc<Mutex<mpsc::Receiver<(usize, Result<Node, crate::Error>)>>>>>, | ||||
| 
 | ||||
|     next_dir_handle: AtomicU64, | ||||
| 
 | ||||
|     /// This holds all open file handles
 | ||||
|     #[allow(clippy::type_complexity)] | ||||
|     file_handles: RwLock<HashMap<u64, Arc<Mutex<Box<dyn BlobReader>>>>>, | ||||
|  | @ -130,6 +141,9 @@ where | |||
|             root_nodes: RwLock::new(HashMap::default()), | ||||
|             inode_tracker: RwLock::new(Default::default()), | ||||
| 
 | ||||
|             dir_handles: RwLock::new(Default::default()), | ||||
|             next_dir_handle: AtomicU64::new(1), | ||||
| 
 | ||||
|             file_handles: RwLock::new(Default::default()), | ||||
|             next_file_handle: AtomicU64::new(1), | ||||
|             tokio_handle: tokio::runtime::Handle::current(), | ||||
|  | @ -276,6 +290,9 @@ where | |||
|     } | ||||
| } | ||||
| 
 | ||||
| /// Buffer size of the channel providing nodes in the mount root
 | ||||
| const ROOT_NODES_BUFFER_SIZE: usize = 16; | ||||
| 
 | ||||
| const XATTR_NAME_DIRECTORY_DIGEST: &[u8] = b"user.tvix.castore.directory.digest"; | ||||
| const XATTR_NAME_BLOB_DIGEST: &[u8] = b"user.tvix.castore.blob.digest"; | ||||
| 
 | ||||
|  | @ -368,14 +385,63 @@ where | |||
|         } | ||||
|     } | ||||
| 
 | ||||
|     // TODO: readdirplus?
 | ||||
|     #[tracing::instrument(skip_all, fields(rq.inode = inode))] | ||||
|     fn opendir( | ||||
|         &self, | ||||
|         _ctx: &Context, | ||||
|         inode: Self::Inode, | ||||
|         _flags: u32, | ||||
|     ) -> io::Result<(Option<Self::Handle>, OpenOptions)> { | ||||
|         // In case opendir on the root is called, we provide the handle, as re-entering that listing is expensive.
 | ||||
|         // For all other directory inodes we just let readdir take care of it.
 | ||||
|         if inode == ROOT_ID { | ||||
|             if !self.list_root { | ||||
|                 return Err(io::Error::from_raw_os_error(libc::EPERM)); // same error code as ipfs/kubo
 | ||||
|             } | ||||
| 
 | ||||
|     #[tracing::instrument(skip_all, fields(rq.inode = inode, rq.offset = offset))] | ||||
|             let root_nodes_provider = self.root_nodes_provider.clone(); | ||||
|             let (tx, rx) = mpsc::channel(ROOT_NODES_BUFFER_SIZE); | ||||
| 
 | ||||
|             // This task will run in the background immediately and will exit
 | ||||
|             // after the stream ends or if we no longer want any more entries.
 | ||||
|             self.tokio_handle.spawn(async move { | ||||
|                 let mut stream = root_nodes_provider.list().enumerate(); | ||||
|                 while let Some(node) = stream.next().await { | ||||
|                     if tx.send(node).await.is_err() { | ||||
|                         // If we get a send error, it means the sync code
 | ||||
|                         // doesn't want any more entries.
 | ||||
|                         break; | ||||
|                     } | ||||
|                 } | ||||
|             }); | ||||
| 
 | ||||
|             // Put the rx part into [self.dir_handles].
 | ||||
|             // TODO: this will overflow after 2**64 operations,
 | ||||
|             // which is fine for now.
 | ||||
|             // See https://cl.tvl.fyi/c/depot/+/8834/comment/a6684ce0_d72469d1
 | ||||
|             // for the discussion on alternatives.
 | ||||
|             let dh = self.next_dir_handle.fetch_add(1, Ordering::SeqCst); | ||||
| 
 | ||||
|             debug!("add dir handle {}", dh); | ||||
|             self.dir_handles | ||||
|                 .write() | ||||
|                 .insert(dh, Arc::new(Mutex::new(rx))); | ||||
| 
 | ||||
|             return Ok(( | ||||
|                 Some(dh), | ||||
|                 fuse_backend_rs::api::filesystem::OpenOptions::empty(), // TODO: non-seekable
 | ||||
|             )); | ||||
|         } | ||||
| 
 | ||||
|         Ok((None, OpenOptions::empty())) | ||||
|     } | ||||
| 
 | ||||
|     #[tracing::instrument(skip_all, fields(rq.inode = inode, rq.handle = handle, rq.offset = offset))] | ||||
|     fn readdir( | ||||
|         &self, | ||||
|         _ctx: &Context, | ||||
|         inode: Self::Inode, | ||||
|         _handle: Self::Handle, | ||||
|         handle: Self::Handle, | ||||
|         _size: u32, | ||||
|         offset: u64, | ||||
|         add_entry: &mut dyn FnMut(fuse_backend_rs::api::filesystem::DirEntry) -> io::Result<usize>, | ||||
|  | @ -385,68 +451,61 @@ where | |||
|         if inode == ROOT_ID { | ||||
|             if !self.list_root { | ||||
|                 return Err(io::Error::from_raw_os_error(libc::EPERM)); // same error code as ipfs/kubo
 | ||||
|             } else { | ||||
|                 let root_nodes_provider = self.root_nodes_provider.clone(); | ||||
|                 let (tx, mut rx) = mpsc::channel(16); | ||||
|             } | ||||
| 
 | ||||
|                 // This task will run in the background immediately and will exit
 | ||||
|                 // after the stream ends or if we no longer want any more entries.
 | ||||
|                 self.tokio_handle.spawn(async move { | ||||
|                     let mut stream = root_nodes_provider.list().skip(offset as usize).enumerate(); | ||||
|                     while let Some(node) = stream.next().await { | ||||
|                         if tx.send(node).await.is_err() { | ||||
|                             // If we get a send error, it means the sync code
 | ||||
|                             // doesn't want any more entries.
 | ||||
|                             break; | ||||
|                         } | ||||
|                     } | ||||
|             // get the handle from [self.dir_handles]
 | ||||
|             let rx = match self.dir_handles.read().get(&handle) { | ||||
|                 Some(rx) => rx.clone(), | ||||
|                 None => { | ||||
|                     warn!("dir handle {} unknown", handle); | ||||
|                     return Err(io::Error::from_raw_os_error(libc::EIO)); | ||||
|                 } | ||||
|             }; | ||||
| 
 | ||||
|             let mut rx = rx | ||||
|                 .lock() | ||||
|                 .map_err(|_| crate::Error::StorageError("mutex poisoned".into()))?; | ||||
| 
 | ||||
|             while let Some((i, n)) = rx.blocking_recv() { | ||||
|                 let root_node = n.map_err(|e| { | ||||
|                     warn!("failed to retrieve root node: {}", e); | ||||
|                     io::Error::from_raw_os_error(libc::EIO) | ||||
|                 })?; | ||||
| 
 | ||||
|                 let name = root_node.get_name(); | ||||
|                 let ty = match root_node { | ||||
|                     Node::Directory(_) => libc::S_IFDIR, | ||||
|                     Node::File(_) => libc::S_IFREG, | ||||
|                     Node::Symlink(_) => libc::S_IFLNK, | ||||
|                 }; | ||||
| 
 | ||||
|                 // obtain the inode, or allocate a new one.
 | ||||
|                 let ino = self.get_inode_for_root_name(name).unwrap_or_else(|| { | ||||
|                     // insert the (sparse) inode data and register in
 | ||||
|                     // self.root_nodes.
 | ||||
|                     let ino = self.inode_tracker.write().put((&root_node).into()); | ||||
|                     self.root_nodes.write().insert(name.into(), ino); | ||||
|                     ino | ||||
|                 }); | ||||
| 
 | ||||
|                 while let Some((i, ref root_node)) = rx.blocking_recv() { | ||||
|                     let root_node = match root_node { | ||||
|                         Err(e) => { | ||||
|                             warn!("failed to retrieve pathinfo: {}", e); | ||||
|                             return Err(io::Error::from_raw_os_error(libc::EPERM)); | ||||
|                         } | ||||
|                         Ok(root_node) => root_node, | ||||
|                     }; | ||||
|                 #[cfg(target_os = "macos")] | ||||
|                 let ty = ty as u32; | ||||
| 
 | ||||
|                     let name = root_node.get_name(); | ||||
|                     // obtain the inode, or allocate a new one.
 | ||||
|                     let ino = self.get_inode_for_root_name(name).unwrap_or_else(|| { | ||||
|                         // insert the (sparse) inode data and register in
 | ||||
|                         // self.root_nodes.
 | ||||
|                         let ino = self.inode_tracker.write().put(root_node.into()); | ||||
|                         self.root_nodes.write().insert(name.into(), ino); | ||||
|                         ino | ||||
|                     }); | ||||
| 
 | ||||
|                     let ty = match root_node { | ||||
|                         Node::Directory(_) => libc::S_IFDIR, | ||||
|                         Node::File(_) => libc::S_IFREG, | ||||
|                         Node::Symlink(_) => libc::S_IFLNK, | ||||
|                     }; | ||||
| 
 | ||||
|                     #[cfg(target_os = "macos")] | ||||
|                     let ty = ty as u32; | ||||
| 
 | ||||
|                     let written = add_entry(fuse_backend_rs::api::filesystem::DirEntry { | ||||
|                         ino, | ||||
|                         offset: offset + i as u64 + 1, | ||||
|                         type_: ty, | ||||
|                         name, | ||||
|                     })?; | ||||
|                     // If the buffer is full, add_entry will return `Ok(0)`.
 | ||||
|                     if written == 0 { | ||||
|                         break; | ||||
|                     } | ||||
|                 let written = add_entry(fuse_backend_rs::api::filesystem::DirEntry { | ||||
|                     ino, | ||||
|                     offset: offset + i as u64 + 1, | ||||
|                     type_: ty, | ||||
|                     name, | ||||
|                 })?; | ||||
|                 // If the buffer is full, add_entry will return `Ok(0)`.
 | ||||
|                 if written == 0 { | ||||
|                     break; | ||||
|                 } | ||||
| 
 | ||||
|                 return Ok(()); | ||||
|             } | ||||
|             return Ok(()); | ||||
|         } | ||||
| 
 | ||||
|         // lookup the children, or return an error if it's not a directory.
 | ||||
|         // Non root-node case: lookup the children, or return an error if it's not a directory.
 | ||||
|         let (parent_digest, children) = self.get_directory_children(inode)?; | ||||
| 
 | ||||
|         let span = info_span!("lookup", directory.digest = %parent_digest); | ||||
|  | @ -478,6 +537,29 @@ where | |||
| 
 | ||||
|         Ok(()) | ||||
|     } | ||||
|     // TODO: readdirplus?
 | ||||
| 
 | ||||
|     #[tracing::instrument(skip_all, fields(rq.inode = inode, rq.handle = handle))] | ||||
|     fn releasedir( | ||||
|         &self, | ||||
|         _ctx: &Context, | ||||
|         inode: Self::Inode, | ||||
|         _flags: u32, | ||||
|         handle: Self::Handle, | ||||
|     ) -> io::Result<()> { | ||||
|         if inode == ROOT_ID { | ||||
|             // drop the rx part of the channel.
 | ||||
|             match self.dir_handles.write().remove(&handle) { | ||||
|                 // drop it, which will close it.
 | ||||
|                 Some(rx) => drop(rx), | ||||
|                 None => { | ||||
|                     debug!("dir handle not found"); | ||||
|                 } | ||||
|             } | ||||
|         } | ||||
| 
 | ||||
|         Ok(()) | ||||
|     } | ||||
| 
 | ||||
|     #[tracing::instrument(skip_all, fields(rq.inode = inode))] | ||||
|     fn open( | ||||
|  |  | |||
|  | @ -281,14 +281,8 @@ async fn root() { | |||
|     .expect("must succeed"); | ||||
| 
 | ||||
|     { | ||||
|         // read_dir succeeds, but getting the first element will fail.
 | ||||
|         let mut it = ReadDirStream::new(tokio::fs::read_dir(tmpdir).await.expect("must succeed")); | ||||
| 
 | ||||
|         let err = it | ||||
|             .next() | ||||
|             .await | ||||
|             .expect("must be some") | ||||
|             .expect_err("must be err"); | ||||
|         // read_dir fails (as opendir fails).
 | ||||
|         let err = tokio::fs::read_dir(tmpdir).await.expect_err("must fail"); | ||||
|         assert_eq!(std::io::ErrorKind::PermissionDenied, err.kind()); | ||||
|     } | ||||
| 
 | ||||
|  |  | |||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue