This is specifically used for the fetcher code (only). Moving it to there for now. Change-Id: I1e1d0541b85340ef4ff3a4c6b3fa99b51853f539 Reviewed-on: https://cl.tvl.fyi/c/depot/+/11532 Reviewed-by: edef <edef@edef.eu> Autosubmit: flokli <flokli@flokli.de> Tested-by: BuildkiteCI
222 lines
6.4 KiB
Rust
222 lines
6.4 KiB
Rust
#![allow(dead_code)] // TODO
|
|
|
|
use std::{
|
|
io, mem,
|
|
pin::Pin,
|
|
task::{Context, Poll},
|
|
};
|
|
|
|
use async_compression::tokio::bufread::{BzDecoder, GzipDecoder, XzDecoder};
|
|
use futures::ready;
|
|
use pin_project::pin_project;
|
|
use tokio::io::{AsyncBufRead, AsyncRead, BufReader, ReadBuf};
|
|
|
|
const GZIP_MAGIC: [u8; 2] = [0x1f, 0x8b];
|
|
const BZIP2_MAGIC: [u8; 3] = *b"BZh";
|
|
const XZ_MAGIC: [u8; 6] = [0xfd, 0x37, 0x7a, 0x58, 0x5a, 0x00];
|
|
const BYTES_NEEDED: usize = 6;
|
|
|
|
#[derive(Debug, Clone, Copy)]
|
|
enum Algorithm {
|
|
Gzip,
|
|
Bzip2,
|
|
Xz,
|
|
}
|
|
|
|
impl Algorithm {
|
|
fn from_magic(magic: &[u8]) -> Option<Self> {
|
|
if magic.starts_with(&GZIP_MAGIC) {
|
|
Some(Self::Gzip)
|
|
} else if magic.starts_with(&BZIP2_MAGIC) {
|
|
Some(Self::Bzip2)
|
|
} else if magic.starts_with(&XZ_MAGIC) {
|
|
Some(Self::Xz)
|
|
} else {
|
|
None
|
|
}
|
|
}
|
|
}
|
|
|
|
#[pin_project]
|
|
struct WithPreexistingBuffer<R> {
|
|
buffer: Vec<u8>,
|
|
#[pin]
|
|
inner: R,
|
|
}
|
|
|
|
impl<R> AsyncRead for WithPreexistingBuffer<R>
|
|
where
|
|
R: AsyncRead,
|
|
{
|
|
fn poll_read(
|
|
self: Pin<&mut Self>,
|
|
cx: &mut Context<'_>,
|
|
buf: &mut ReadBuf<'_>,
|
|
) -> Poll<io::Result<()>> {
|
|
let this = self.project();
|
|
if !this.buffer.is_empty() {
|
|
// TODO: check if the buffer fits first
|
|
buf.put_slice(this.buffer);
|
|
this.buffer.clear();
|
|
}
|
|
this.inner.poll_read(cx, buf)
|
|
}
|
|
}
|
|
|
|
#[pin_project(project = DecompressedReaderInnerProj)]
|
|
enum DecompressedReaderInner<R> {
|
|
Unknown {
|
|
buffer: Vec<u8>,
|
|
#[pin]
|
|
inner: Option<R>,
|
|
},
|
|
Gzip(#[pin] GzipDecoder<BufReader<WithPreexistingBuffer<R>>>),
|
|
Bzip2(#[pin] BzDecoder<BufReader<WithPreexistingBuffer<R>>>),
|
|
Xz(#[pin] XzDecoder<BufReader<WithPreexistingBuffer<R>>>),
|
|
}
|
|
|
|
impl<R> DecompressedReaderInner<R>
|
|
where
|
|
R: AsyncBufRead,
|
|
{
|
|
fn switch_to(&mut self, algorithm: Algorithm) {
|
|
let (buffer, inner) = match self {
|
|
DecompressedReaderInner::Unknown { buffer, inner } => {
|
|
(mem::take(buffer), inner.take().unwrap())
|
|
}
|
|
DecompressedReaderInner::Gzip(_)
|
|
| DecompressedReaderInner::Bzip2(_)
|
|
| DecompressedReaderInner::Xz(_) => unreachable!(),
|
|
};
|
|
let inner = BufReader::new(WithPreexistingBuffer { buffer, inner });
|
|
|
|
*self = match algorithm {
|
|
Algorithm::Gzip => Self::Gzip(GzipDecoder::new(inner)),
|
|
Algorithm::Bzip2 => Self::Bzip2(BzDecoder::new(inner)),
|
|
Algorithm::Xz => Self::Xz(XzDecoder::new(inner)),
|
|
}
|
|
}
|
|
}
|
|
|
|
impl<R> AsyncRead for DecompressedReaderInner<R>
|
|
where
|
|
R: AsyncBufRead,
|
|
{
|
|
fn poll_read(
|
|
self: Pin<&mut Self>,
|
|
cx: &mut Context<'_>,
|
|
buf: &mut ReadBuf<'_>,
|
|
) -> Poll<io::Result<()>> {
|
|
match self.project() {
|
|
DecompressedReaderInnerProj::Unknown { .. } => {
|
|
unreachable!("Can't call poll_read on Unknown")
|
|
}
|
|
DecompressedReaderInnerProj::Gzip(inner) => inner.poll_read(cx, buf),
|
|
DecompressedReaderInnerProj::Bzip2(inner) => inner.poll_read(cx, buf),
|
|
DecompressedReaderInnerProj::Xz(inner) => inner.poll_read(cx, buf),
|
|
}
|
|
}
|
|
}
|
|
|
|
#[pin_project]
|
|
pub struct DecompressedReader<R> {
|
|
#[pin]
|
|
inner: DecompressedReaderInner<R>,
|
|
switch_to: Option<Algorithm>,
|
|
}
|
|
|
|
impl<R> DecompressedReader<R> {
|
|
pub fn new(inner: R) -> Self {
|
|
Self {
|
|
inner: DecompressedReaderInner::Unknown {
|
|
buffer: vec![0; BYTES_NEEDED],
|
|
inner: Some(inner),
|
|
},
|
|
switch_to: None,
|
|
}
|
|
}
|
|
}
|
|
|
|
impl<R> AsyncRead for DecompressedReader<R>
|
|
where
|
|
R: AsyncBufRead + Unpin,
|
|
{
|
|
fn poll_read(
|
|
self: Pin<&mut Self>,
|
|
cx: &mut Context<'_>,
|
|
buf: &mut ReadBuf<'_>,
|
|
) -> Poll<io::Result<()>> {
|
|
let mut this = self.project();
|
|
let (buffer, inner) = match this.inner.as_mut().project() {
|
|
DecompressedReaderInnerProj::Gzip(inner) => return inner.poll_read(cx, buf),
|
|
DecompressedReaderInnerProj::Bzip2(inner) => return inner.poll_read(cx, buf),
|
|
DecompressedReaderInnerProj::Xz(inner) => return inner.poll_read(cx, buf),
|
|
DecompressedReaderInnerProj::Unknown { buffer, inner } => (buffer, inner),
|
|
};
|
|
|
|
let mut our_buf = ReadBuf::new(buffer);
|
|
if let Err(e) = ready!(inner.as_pin_mut().unwrap().poll_read(cx, &mut our_buf)) {
|
|
return Poll::Ready(Err(e));
|
|
}
|
|
|
|
let data = our_buf.filled();
|
|
if data.len() >= BYTES_NEEDED {
|
|
if let Some(algorithm) = Algorithm::from_magic(data) {
|
|
this.inner.as_mut().switch_to(algorithm);
|
|
} else {
|
|
return Poll::Ready(Err(io::Error::new(
|
|
io::ErrorKind::InvalidData,
|
|
"tar data not gz, bzip2, or xz compressed",
|
|
)));
|
|
}
|
|
this.inner.poll_read(cx, buf)
|
|
} else {
|
|
cx.waker().wake_by_ref();
|
|
Poll::Pending
|
|
}
|
|
}
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use std::path::Path;
|
|
|
|
use async_compression::tokio::bufread::GzipEncoder;
|
|
use futures::TryStreamExt;
|
|
use rstest::rstest;
|
|
use tokio::io::{AsyncReadExt, BufReader};
|
|
use tokio_tar::Archive;
|
|
|
|
use super::*;
|
|
|
|
#[tokio::test]
|
|
async fn gzip() {
|
|
let data = b"abcdefghijk";
|
|
let mut enc = GzipEncoder::new(&data[..]);
|
|
let mut gzipped = vec![];
|
|
enc.read_to_end(&mut gzipped).await.unwrap();
|
|
|
|
let mut reader = DecompressedReader::new(BufReader::new(&gzipped[..]));
|
|
let mut round_tripped = vec![];
|
|
reader.read_to_end(&mut round_tripped).await.unwrap();
|
|
|
|
assert_eq!(data[..], round_tripped[..]);
|
|
}
|
|
|
|
#[rstest]
|
|
#[case::gzip(include_bytes!("../tests/blob.tar.gz"))]
|
|
#[case::bzip2(include_bytes!("../tests/blob.tar.bz2"))]
|
|
#[case::xz(include_bytes!("../tests/blob.tar.xz"))]
|
|
#[tokio::test]
|
|
async fn compressed_tar(#[case] data: &[u8]) {
|
|
let reader = DecompressedReader::new(BufReader::new(data));
|
|
let mut archive = Archive::new(reader);
|
|
let mut entries: Vec<_> = archive.entries().unwrap().try_collect().await.unwrap();
|
|
|
|
assert_eq!(entries.len(), 1);
|
|
assert_eq!(entries[0].path().unwrap().as_ref(), Path::new("empty"));
|
|
let mut data = String::new();
|
|
entries[0].read_to_string(&mut data).await.unwrap();
|
|
assert_eq!(data, "");
|
|
}
|
|
}
|