refactor(tvix/castore/blobservice/grpc): remove fn pointer hack

It looks like the workaround isn't necessary anymore.

Change-Id: Ifbcef1d631b3f369cac3db25a2c793480043f697
Reviewed-on: https://cl.tvl.fyi/c/depot/+/10583
Autosubmit: flokli <flokli@flokli.de>
Tested-by: BuildkiteCI
Reviewed-by: raitobezarius <tvl@lahfa.xyz>
This commit is contained in:
Florian Klink 2024-01-09 14:37:15 +02:00 committed by clbot
parent 89882ff9b1
commit b1c556b7e1

View file

@ -7,7 +7,7 @@ use tokio::task::JoinHandle;
use tokio_stream::{wrappers::ReceiverStream, StreamExt}; use tokio_stream::{wrappers::ReceiverStream, StreamExt};
use tokio_util::{ use tokio_util::{
io::{CopyToBytes, SinkWriter}, io::{CopyToBytes, SinkWriter},
sync::{PollSendError, PollSender}, sync::PollSender,
}; };
use tonic::{async_trait, transport::Channel, Code, Status}; use tonic::{async_trait, transport::Channel, Code, Status};
use tracing::instrument; use tracing::instrument;
@ -79,10 +79,8 @@ impl BlobService for GRPCBlobService {
} }
/// Returns a BlobWriter, that'll internally wrap each write in a /// Returns a BlobWriter, that'll internally wrap each write in a
// [proto::BlobChunk], which is send to the gRPC server. /// [proto::BlobChunk], which is send to the gRPC server.
async fn open_write(&self) -> Box<dyn BlobWriter> { async fn open_write(&self) -> Box<dyn BlobWriter> {
let mut grpc_client = self.grpc_client.clone();
// set up an mpsc channel passing around Bytes. // set up an mpsc channel passing around Bytes.
let (tx, rx) = tokio::sync::mpsc::channel::<bytes::Bytes>(10); let (tx, rx) = tokio::sync::mpsc::channel::<bytes::Bytes>(10);
@ -90,20 +88,15 @@ impl BlobService for GRPCBlobService {
// [proto::BlobChunk], and a [ReceiverStream] is constructed. // [proto::BlobChunk], and a [ReceiverStream] is constructed.
let blobchunk_stream = ReceiverStream::new(rx).map(|x| proto::BlobChunk { data: x }); let blobchunk_stream = ReceiverStream::new(rx).map(|x| proto::BlobChunk { data: x });
// That receiver stream is used as a stream in the gRPC BlobService.put rpc call. // spawn the gRPC put request, which will read from blobchunk_stream.
let task: JoinHandle<Result<_, Status>> = let task = tokio::spawn({
tokio::spawn(async move { Ok(grpc_client.put(blobchunk_stream).await?.into_inner()) }); let mut grpc_client = self.grpc_client.clone();
async move { Ok::<_, Status>(grpc_client.put(blobchunk_stream).await?.into_inner()) }
});
// The tx part of the channel is converted to a sink of byte chunks. // The tx part of the channel is converted to a sink of byte chunks.
// We need to make this a function pointer, not a closure.
fn convert_error(_: PollSendError<bytes::Bytes>) -> io::Error {
io::Error::from(io::ErrorKind::BrokenPipe)
}
let sink = PollSender::new(tx) let sink = PollSender::new(tx)
.sink_map_err(convert_error as fn(PollSendError<bytes::Bytes>) -> io::Error); .sink_map_err(|e| std::io::Error::new(std::io::ErrorKind::BrokenPipe, e));
// We need to explicitly cast here, otherwise rustc does error with "expected fn pointer, found fn item"
// … which is turned into an [tokio::io::AsyncWrite]. // … which is turned into an [tokio::io::AsyncWrite].
let writer = SinkWriter::new(CopyToBytes::new(sink)); let writer = SinkWriter::new(CopyToBytes::new(sink));