fix(castore/fs): enter the runtime context before blocking on tasks

fuse-backend-rs spawns multiple threads, but these don't automatically
inherit the tokio runtime context.

Spawning tasks from "the wrong thread" would then cause a panic, as seen
in https://git.snix.dev/snix/snix/issues/147.

However, cl/30575 didn't really fix this, it only removed one place
spawning tasks, without fixing the underlying issue.

PathInfoService / BlobService / DirectoryService are expected to spawn
tasks. We need to simply invoke `.enter()` from all worker threads, and
that's what this CL does.

An alternative would be to manually (re)-enter inside every function of
the FileSystem trait, but that'd be very messy.

A similar fix needs to end up in the virtiofs implementation, but we
don't have control over the (single) thread being spawned by
VhostUserDaemon there, so cannot just enter the runtime context there,
so virtiofs will stay broken for now.

Maybe it's time to re-architect this a bit - have our FileSystem impl
be little code and call to sync endpoints to do the actual work, which
is then handled by workers on another thread - but that's left for
another CL.

Change-Id: I58cdbd952f4ecc39bdc2f2fa69a788caa0cc78ba
Reviewed-on: https://cl.snix.dev/c/snix/+/30585
Tested-by: besadii
Reviewed-by: Vova Kryachko <v.kryachko@gmail.com>
This commit is contained in:
Florian Klink 2025-06-29 01:15:04 +03:00
parent 95fd048e00
commit ae2af10cf8

View file

@ -25,7 +25,9 @@ impl<FS> FuseServer<FS>
where where
FS: FileSystem + Sync + Send, FS: FileSystem + Sync + Send,
{ {
fn start(&mut self) -> io::Result<()> { fn start(&mut self, tokio_handle: tokio::runtime::Handle) -> io::Result<()> {
let _guard = tokio_handle.enter();
while let Some((reader, writer)) = self while let Some((reader, writer)) = self
.channel .channel
.get_request() .get_request()
@ -88,6 +90,9 @@ impl FuseDaemon {
.thread_name("fuse_server".to_string()) .thread_name("fuse_server".to_string())
.build(); .build();
// get a handle to the current tokio runtime
let runtime_handle = tokio::runtime::Handle::current();
for _ in 0..num_threads { for _ in 0..num_threads {
// for each thread requested, create and start a FuseServer accepting requests. // for each thread requested, create and start a FuseServer accepting requests.
let mut server = FuseServer { let mut server = FuseServer {
@ -97,8 +102,13 @@ impl FuseDaemon {
.map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?, .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?,
}; };
threads.execute(move || { // Start the FuseServer in each thread, and enter the tokio runtime context,
let _ = server.start(); // so we can block on tasks.
threads.execute({
let runtime_handle = runtime_handle.clone();
move || {
let _ = server.start(runtime_handle);
}
}); });
} }