feat(tvix/nar-bridge): implement range request for NARs

With an implementation of AsyncRead + AsyncSeek, axum-range can answer
range requests.

We only use it if a range has been requested, as it uses more memory
than the linear variant.

Change-Id: I0072b0a09b328f3e932f14567a2caa3a49abcbf7
Reviewed-on: https://cl.tvl.fyi/c/depot/+/12509
Autosubmit: flokli <flokli@flokli.de>
Tested-by: BuildkiteCI
Reviewed-by: raitobezarius <tvl@lahfa.xyz>
Reviewed-by: yuka <yuka@yuka.dev>
This commit is contained in:
Florian Klink 2024-09-25 22:05:04 +02:00 committed by clbot
parent 16a3b90125
commit 2e4a373a04
5 changed files with 350 additions and 19 deletions

View file

@ -1,7 +1,9 @@
use axum::body::Body;
use axum::extract::Query;
use axum::http::StatusCode;
use axum::response::Response;
use axum::{body::Body, response::IntoResponse};
use axum_extra::{headers::Range, TypedHeader};
use axum_range::{KnownSize, Ranged};
use bytes::Bytes;
use data_encoding::BASE64URL_NOPAD;
use futures::TryStreamExt;
@ -22,6 +24,7 @@ pub(crate) struct GetNARParams {
#[instrument(skip(blob_service, directory_service))]
pub async fn get(
ranges: Option<TypedHeader<Range>>,
axum::extract::Path(root_node_enc): axum::extract::Path<String>,
axum::extract::Query(GetNARParams { nar_size }): Query<GetNARParams>,
axum::extract::State(AppState {
@ -29,7 +32,7 @@ pub async fn get(
directory_service,
..
}): axum::extract::State<AppState>,
) -> Result<Response, StatusCode> {
) -> Result<impl axum::response::IntoResponse, StatusCode> {
use prost::Message;
// b64decode the root node passed *by the user*
let root_node_proto = BASE64URL_NOPAD
@ -62,24 +65,52 @@ pub async fn get(
return Err(StatusCode::BAD_REQUEST);
}
let (w, r) = tokio::io::duplex(1024 * 8);
Ok((
// headers
[
("cache-control", "max-age=31536000, immutable"),
("content-type", nix_http::MIME_TYPE_NAR),
],
// If this is a range request, construct a seekable NAR reader
if let Some(TypedHeader(ranges)) = ranges {
let r =
tvix_store::nar::seekable::Reader::new(root_node, blob_service, directory_service)
.await
.map_err(|e| {
warn!(err=%e, "failed to construct seekable nar reader");
StatusCode::INTERNAL_SERVER_ERROR
})?;
// spawn a task rendering the NAR to the client
tokio::spawn(async move {
if let Err(e) =
tvix_store::nar::write_nar(w, &root_node, blob_service, directory_service).await
{
warn!(err=%e, "failed to write out NAR");
}
});
// ensure the user-supplied nar size was correct, no point returning data otherwise.
if r.stream_len() != nar_size {
warn!(
actual_nar_size = r.stream_len(),
supplied_nar_size = nar_size,
"wrong nar size supplied"
);
return Err(StatusCode::BAD_REQUEST);
}
Ranged::new(Some(ranges), KnownSize::sized(r, nar_size)).into_response()
} else {
// use the non-seekable codepath if there's no range(s) requested,
// as it uses less memory.
let (w, r) = tokio::io::duplex(1024 * 8);
Ok(Response::builder()
.status(StatusCode::OK)
.header("cache-control", "max-age=31536000, immutable")
.header("content-length", nar_size)
.header("content-type", nix_http::MIME_TYPE_NAR)
.body(Body::from_stream(ReaderStream::new(r)))
.unwrap())
// spawn a task rendering the NAR to the client.
tokio::spawn(async move {
if let Err(e) =
tvix_store::nar::write_nar(w, &root_node, blob_service, directory_service).await
{
warn!(err=%e, "failed to write out NAR");
}
});
Response::builder()
.header("content-length", nar_size)
.body(Body::from_stream(ReaderStream::new(r)))
.unwrap()
},
))
}
#[instrument(skip(blob_service, directory_service, request))]