feat(glue): Implement hashed_mirrors download logic for fetchurl.
This is useful for the cases when original urls no longer exist, but nixpkgs still references them. Change-Id: Ibfbb6784876efe4fc8f65fac84ff9dcb72e78ff7 Reviewed-on: https://cl.snix.dev/c/snix/+/30570 Tested-by: besadii Reviewed-by: Florian Klink <flokli@flokli.de> Autosubmit: Vova Kryachko <v.kryachko@gmail.com>
This commit is contained in:
parent
3c1a7176cb
commit
33cd666f5c
1 changed files with 80 additions and 18 deletions
|
|
@ -1,3 +1,4 @@
|
||||||
|
use data_encoding::HEXLOWER;
|
||||||
use futures::TryStreamExt;
|
use futures::TryStreamExt;
|
||||||
use md5::{Md5, digest::DynDigest};
|
use md5::{Md5, digest::DynDigest};
|
||||||
use nix_compat::{
|
use nix_compat::{
|
||||||
|
|
@ -176,7 +177,7 @@ pub struct Fetcher<BS, DS, PS, NS> {
|
||||||
directory_service: DS,
|
directory_service: DS,
|
||||||
path_info_service: PS,
|
path_info_service: PS,
|
||||||
nar_calculation_service: NS,
|
nar_calculation_service: NS,
|
||||||
_hashed_mirrors: Vec<Url>,
|
hashed_mirrors: Vec<Url>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<BS, DS, PS, NS> Fetcher<BS, DS, PS, NS> {
|
impl<BS, DS, PS, NS> Fetcher<BS, DS, PS, NS> {
|
||||||
|
|
@ -196,31 +197,25 @@ impl<BS, DS, PS, NS> Fetcher<BS, DS, PS, NS> {
|
||||||
directory_service,
|
directory_service,
|
||||||
path_info_service,
|
path_info_service,
|
||||||
nar_calculation_service,
|
nar_calculation_service,
|
||||||
_hashed_mirrors: hashed_mirrors,
|
hashed_mirrors,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Constructs a HTTP request to the passed URL, and returns a AsyncReadBuf to it.
|
/// Downloads single url.
|
||||||
/// In case the URI uses the file:// scheme, use tokio::fs to open it.
|
///
|
||||||
#[instrument(skip_all, fields(url, indicatif.pb_show=tracing::field::Empty), err)]
|
/// Returns an error on any IO failures and if http response status is not 200.
|
||||||
async fn download(
|
async fn do_download(
|
||||||
&self,
|
&self,
|
||||||
url: Url,
|
url: Url,
|
||||||
) -> Result<Box<dyn AsyncBufRead + Unpin + Send>, FetcherError> {
|
) -> Result<Box<dyn AsyncBufRead + Unpin + Send>, FetcherError> {
|
||||||
let span = Span::current();
|
let span = Span::current();
|
||||||
span.pb_set_message(&format!(
|
|
||||||
"📡Fetching {}",
|
|
||||||
// TOOD: maybe shorten
|
|
||||||
redact_url(&url)
|
|
||||||
));
|
|
||||||
|
|
||||||
match url.scheme() {
|
match url.scheme() {
|
||||||
"file" => {
|
"file" => {
|
||||||
let f = tokio::fs::File::open(url.to_file_path().map_err(|_| {
|
let f = tokio::fs::File::open(url.to_file_path().map_err(|_| {
|
||||||
// "Returns Err if the host is neither empty nor "localhost"
|
// "Returns Err if the host is neither empty nor "localhost"
|
||||||
// (except on Windows, where file: URLs may have a non-local host)"
|
// (except on Windows, where file: URLs may have a non-local host)"
|
||||||
FetcherError::Io(std::io::Error::new(
|
FetcherError::Io(std::io::Error::new(
|
||||||
std::io::ErrorKind::Other,
|
std::io::ErrorKind::InvalidData,
|
||||||
"invalid host for file:// scheme",
|
"invalid host for file:// scheme",
|
||||||
))
|
))
|
||||||
})?)
|
})?)
|
||||||
|
|
@ -237,7 +232,35 @@ impl<BS, DS, PS, NS> Fetcher<BS, DS, PS, NS> {
|
||||||
))))
|
))))
|
||||||
}
|
}
|
||||||
_ => {
|
_ => {
|
||||||
let resp = self.http_client.get(url).send().await?;
|
let resp = self.http_client.get(url.clone()).send().await?;
|
||||||
|
if !resp.status().is_success() {
|
||||||
|
use reqwest::StatusCode;
|
||||||
|
use std::io::ErrorKind;
|
||||||
|
let kind = match resp.status() {
|
||||||
|
StatusCode::BAD_REQUEST
|
||||||
|
| StatusCode::NOT_ACCEPTABLE
|
||||||
|
| StatusCode::URI_TOO_LONG => ErrorKind::InvalidData,
|
||||||
|
StatusCode::FORBIDDEN
|
||||||
|
| StatusCode::UNAUTHORIZED
|
||||||
|
| StatusCode::NETWORK_AUTHENTICATION_REQUIRED => {
|
||||||
|
ErrorKind::PermissionDenied
|
||||||
|
}
|
||||||
|
StatusCode::NOT_FOUND | StatusCode::GONE => ErrorKind::NotFound,
|
||||||
|
StatusCode::METHOD_NOT_ALLOWED => ErrorKind::Unsupported,
|
||||||
|
StatusCode::REQUEST_TIMEOUT | StatusCode::GATEWAY_TIMEOUT => {
|
||||||
|
ErrorKind::TimedOut
|
||||||
|
}
|
||||||
|
StatusCode::TOO_MANY_REQUESTS => ErrorKind::QuotaExceeded,
|
||||||
|
StatusCode::BAD_GATEWAY | StatusCode::SERVICE_UNAVAILABLE => {
|
||||||
|
ErrorKind::ResourceBusy
|
||||||
|
}
|
||||||
|
_ => ErrorKind::Other,
|
||||||
|
};
|
||||||
|
return Err(FetcherError::Io(std::io::Error::new(
|
||||||
|
kind,
|
||||||
|
format!("unable to download '{}': {}", url, resp.status()),
|
||||||
|
)));
|
||||||
|
}
|
||||||
|
|
||||||
if let Some(content_length) = resp.content_length() {
|
if let Some(content_length) = resp.content_length() {
|
||||||
span.pb_set_length(content_length);
|
span.pb_set_length(content_length);
|
||||||
|
|
@ -261,6 +284,44 @@ impl<BS, DS, PS, NS> Fetcher<BS, DS, PS, NS> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Constructs a HTTP request to the passed URL, and returns a AsyncReadBuf to it.
|
||||||
|
/// In case the URI uses the file:// scheme, use tokio::fs to open it.
|
||||||
|
///
|
||||||
|
/// HTTP downloads with exp_hash present prioritize downloading from hashed_mirrors, if any,
|
||||||
|
/// by using the following download url: {hashed_mirror}/{hash_algo}/{hexdigest}.
|
||||||
|
/// Note that exp_hash is the flat hash of the literal file contents, not the nar hash.
|
||||||
|
#[instrument(skip_all, fields(url, indicatif.pb_show=tracing::field::Empty), err)]
|
||||||
|
async fn download(
|
||||||
|
&self,
|
||||||
|
url: Url,
|
||||||
|
exp_hash: Option<&NixHash>,
|
||||||
|
) -> Result<Box<dyn AsyncBufRead + Unpin + Send>, FetcherError> {
|
||||||
|
let span = Span::current();
|
||||||
|
span.pb_set_message(&format!(
|
||||||
|
"📡Fetching {}",
|
||||||
|
// TOOD: maybe shorten
|
||||||
|
redact_url(&url)
|
||||||
|
));
|
||||||
|
if let Some(hash) = exp_hash {
|
||||||
|
let urls = self.hashed_mirrors.iter().map(|u| {
|
||||||
|
u.join(&format!(
|
||||||
|
"{}/{}",
|
||||||
|
hash.algo(),
|
||||||
|
HEXLOWER.encode(hash.digest_as_bytes())
|
||||||
|
))
|
||||||
|
// The only reason this can fail is if the 'algo/hash' path cannot be parsed
|
||||||
|
// which cannot happen.
|
||||||
|
.expect("Snix bug!")
|
||||||
|
});
|
||||||
|
for url in urls {
|
||||||
|
if let Ok(result) = self.do_download(url).await {
|
||||||
|
return Ok(result);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
self.do_download(url).await
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Copies all data from the passed reader to the passed writer.
|
/// Copies all data from the passed reader to the passed writer.
|
||||||
|
|
@ -295,7 +356,7 @@ where
|
||||||
match fetch {
|
match fetch {
|
||||||
Fetch::URL { url, exp_hash } => {
|
Fetch::URL { url, exp_hash } => {
|
||||||
// Construct a AsyncRead reading from the data as its downloaded.
|
// Construct a AsyncRead reading from the data as its downloaded.
|
||||||
let mut r = self.download(url.clone()).await?;
|
let mut r = self.download(url.clone(), exp_hash.as_ref()).await?;
|
||||||
|
|
||||||
// Construct a AsyncWrite to write into the BlobService.
|
// Construct a AsyncWrite to write into the BlobService.
|
||||||
let mut blob_writer = self.blob_service.open_write().await;
|
let mut blob_writer = self.blob_service.open_write().await;
|
||||||
|
|
@ -350,7 +411,8 @@ where
|
||||||
exp_nar_sha256,
|
exp_nar_sha256,
|
||||||
} => {
|
} => {
|
||||||
// Construct a AsyncRead reading from the data as its downloaded.
|
// Construct a AsyncRead reading from the data as its downloaded.
|
||||||
let r = self.download(url.clone()).await?;
|
// NOTE: For Fetch::Tarball, the expected NAR SHA256 is specified, so we cannot use the hashed_mirrors mechanism.
|
||||||
|
let r = self.download(url.clone(), None).await?;
|
||||||
|
|
||||||
// Pop compression.
|
// Pop compression.
|
||||||
let r = DecompressedReader::new(r);
|
let r = DecompressedReader::new(r);
|
||||||
|
|
@ -399,7 +461,7 @@ where
|
||||||
hash: exp_hash,
|
hash: exp_hash,
|
||||||
} => {
|
} => {
|
||||||
// Construct a AsyncRead reading from the data as its downloaded.
|
// Construct a AsyncRead reading from the data as its downloaded.
|
||||||
let r = self.download(url.clone()).await?;
|
let r = self.download(url.clone(), Some(&exp_hash)).await?;
|
||||||
|
|
||||||
// Pop compression.
|
// Pop compression.
|
||||||
let mut r = DecompressedReader::new(r);
|
let mut r = DecompressedReader::new(r);
|
||||||
|
|
@ -435,7 +497,7 @@ where
|
||||||
hash: exp_hash,
|
hash: exp_hash,
|
||||||
} => {
|
} => {
|
||||||
// Construct a AsyncRead reading from the data as its downloaded.
|
// Construct a AsyncRead reading from the data as its downloaded.
|
||||||
let mut r = self.download(url.clone()).await?;
|
let mut r = self.download(url.clone(), Some(&exp_hash)).await?;
|
||||||
|
|
||||||
// Construct a AsyncWrite to write into the BlobService.
|
// Construct a AsyncWrite to write into the BlobService.
|
||||||
let mut blob_writer = self.blob_service.open_write().await;
|
let mut blob_writer = self.blob_service.open_write().await;
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue