chore(tvix/store): Use BoxStream type alias

The BoxStream type alias is a more concise and easier to read than
the full `Pin<Box<dyn Stream<Item = ...> + Send + ...>>` type.

Change-Id: I5b7bccfd066ded5557e01f7895f4cf5c4a33bd44
Reviewed-on: https://cl.tvl.fyi/c/depot/+/10677
Reviewed-by: flokli <flokli@flokli.de>
Tested-by: BuildkiteCI
Autosubmit: Connor Brewster <cbrewster@hey.com>
This commit is contained in:
Connor Brewster 2024-01-20 16:25:39 -06:00 committed by clbot
parent 56ba7a72d8
commit 4e341fb5d9
15 changed files with 44 additions and 67 deletions

View file

@ -1,11 +1,10 @@
use std::collections::HashSet;
use std::pin::Pin;
use super::{DirectoryPutter, DirectoryService};
use crate::proto::{self, get_directory_request::ByWhat};
use crate::{B3Digest, Error};
use async_stream::try_stream;
use futures::Stream;
use futures::stream::BoxStream;
use tokio::spawn;
use tokio::sync::mpsc::UnboundedSender;
use tokio::task::JoinHandle;
@ -106,7 +105,7 @@ impl DirectoryService for GRPCDirectoryService {
fn get_recursive(
&self,
root_directory_digest: &B3Digest,
) -> Pin<Box<dyn Stream<Item = Result<proto::Directory, Error>> + Send>> {
) -> BoxStream<Result<proto::Directory, Error>> {
let mut grpc_client = self.grpc_client.clone();
let root_directory_digest = root_directory_digest.clone();

View file

@ -1,7 +1,6 @@
use crate::{proto, B3Digest, Error};
use futures::Stream;
use futures::stream::BoxStream;
use std::collections::HashMap;
use std::pin::Pin;
use std::sync::{Arc, RwLock};
use tonic::async_trait;
use tracing::{instrument, warn};
@ -73,7 +72,7 @@ impl DirectoryService for MemoryDirectoryService {
fn get_recursive(
&self,
root_directory_digest: &B3Digest,
) -> Pin<Box<dyn Stream<Item = Result<proto::Directory, Error>> + Send>> {
) -> BoxStream<Result<proto::Directory, Error>> {
traverse_directory(self.clone(), root_directory_digest)
}

View file

@ -1,6 +1,5 @@
use crate::{proto, B3Digest, Error};
use futures::Stream;
use std::pin::Pin;
use futures::stream::BoxStream;
use tonic::async_trait;
mod from_addr;
@ -44,7 +43,7 @@ pub trait DirectoryService: Send + Sync {
fn get_recursive(
&self,
root_directory_digest: &B3Digest,
) -> Pin<Box<dyn Stream<Item = Result<proto::Directory, Error>> + Send>>;
) -> BoxStream<Result<proto::Directory, Error>>;
/// Allows persisting a closure of [proto::Directory], which is a graph of
/// connected Directory messages.

View file

@ -1,10 +1,9 @@
use crate::directoryservice::DirectoryPutter;
use crate::proto::Directory;
use crate::{proto, B3Digest, Error};
use futures::Stream;
use futures::stream::BoxStream;
use prost::Message;
use std::path::Path;
use std::pin::Pin;
use tonic::async_trait;
use tracing::{instrument, warn};
@ -99,7 +98,7 @@ impl DirectoryService for SledDirectoryService {
fn get_recursive(
&self,
root_directory_digest: &B3Digest,
) -> Pin<Box<(dyn Stream<Item = Result<proto::Directory, Error>> + Send + 'static)>> {
) -> BoxStream<Result<proto::Directory, Error>> {
traverse_directory(self.clone(), root_directory_digest)
}

View file

@ -4,19 +4,18 @@ use crate::proto;
use crate::B3Digest;
use crate::Error;
use async_stream::stream;
use futures::Stream;
use futures::stream::BoxStream;
use std::collections::{HashSet, VecDeque};
use std::pin::Pin;
use tonic::async_trait;
use tracing::warn;
/// Traverses a [proto::Directory] from the root to the children.
///
/// This is mostly BFS, but directories are only returned once.
pub fn traverse_directory<DS: DirectoryService + 'static>(
pub fn traverse_directory<'a, DS: DirectoryService + 'static>(
directory_service: DS,
root_directory_digest: &B3Digest,
) -> Pin<Box<dyn Stream<Item = Result<proto::Directory, Error>> + Send>> {
) -> BoxStream<'a, Result<proto::Directory, Error>> {
// The list of all directories that still need to be traversed. The next
// element is picked from the front, new elements are enqueued at the
// back.

View file

@ -1,8 +1,8 @@
use std::{collections::BTreeMap, pin::Pin};
use std::collections::BTreeMap;
use crate::{proto::node::Node, Error};
use bytes::Bytes;
use futures::Stream;
use futures::stream::BoxStream;
use tonic::async_trait;
/// Provides an interface for looking up root nodes in tvix-castore by given
@ -15,7 +15,7 @@ pub trait RootNodes: Send + Sync {
/// Lists all root CA nodes in the filesystem. An error can be returned
/// in case listing is not allowed
fn list(&self) -> Pin<Box<dyn Stream<Item = Result<Node, Error>> + Send + '_>>;
fn list(&self) -> BoxStream<Result<Node, Error>>;
}
#[async_trait]
@ -29,7 +29,7 @@ where
Ok(self.as_ref().get(name).cloned())
}
fn list(&self) -> Pin<Box<dyn Stream<Item = Result<Node, Error>> + Send + '_>> {
fn list(&self) -> BoxStream<Result<Node, Error>> {
Box::pin(tokio_stream::iter(
self.as_ref().iter().map(|(_, v)| Ok(v.clone())),
))

View file

@ -1,11 +1,10 @@
use crate::blobservice::BlobService;
use core::pin::pin;
use futures::TryFutureExt;
use futures::{stream::BoxStream, TryFutureExt};
use std::{
collections::VecDeque,
io,
ops::{Deref, DerefMut},
pin::Pin,
};
use tokio_stream::StreamExt;
use tokio_util::io::ReaderStream;
@ -86,8 +85,7 @@ where
T: Deref<Target = dyn BlobService> + Send + Sync + 'static,
{
// https://github.com/tokio-rs/tokio/issues/2723#issuecomment-1534723933
type ReadStream =
Pin<Box<dyn futures::Stream<Item = Result<super::BlobChunk, Status>> + Send + 'static>>;
type ReadStream = BoxStream<'static, Result<super::BlobChunk, Status>>;
#[instrument(skip(self))]
async fn stat(