Fix Brotli decompression in 'nix log'
This didn't work anymore since decompression was only done in the
non-coroutine case.
Decompressors are now sinks, just like compressors.
Also fixed a bug in bzip2 API handling (we have to handle BZ_RUN_OK
rather than BZ_OK), which we didn't notice because there was a missing
'throw':
if (ret != BZ_OK)
CompressionError("error while compressing bzip2 file");
This commit is contained in:
parent
fa4def3d46
commit
d3761f5f8b
7 changed files with 339 additions and 411 deletions
|
|
@ -217,17 +217,6 @@ void BinaryCacheStore::narFromPath(const Path & storePath, Sink & sink)
|
|||
{
|
||||
auto info = queryPathInfo(storePath).cast<const NarInfo>();
|
||||
|
||||
auto source = sinkToSource([this, url{info->url}](Sink & sink) {
|
||||
try {
|
||||
getFile(url, sink);
|
||||
} catch (NoSuchBinaryCacheFile & e) {
|
||||
throw SubstituteGone(e.what());
|
||||
}
|
||||
});
|
||||
|
||||
stats.narRead++;
|
||||
//stats.narReadCompressedBytes += nar->size(); // FIXME
|
||||
|
||||
uint64_t narSize = 0;
|
||||
|
||||
LambdaSink wrapperSink([&](const unsigned char * data, size_t len) {
|
||||
|
|
@ -235,8 +224,18 @@ void BinaryCacheStore::narFromPath(const Path & storePath, Sink & sink)
|
|||
narSize += len;
|
||||
});
|
||||
|
||||
decompress(info->compression, *source, wrapperSink);
|
||||
auto decompressor = makeDecompressionSink(info->compression, wrapperSink);
|
||||
|
||||
try {
|
||||
getFile(info->url, *decompressor);
|
||||
} catch (NoSuchBinaryCacheFile & e) {
|
||||
throw SubstituteGone(e.what());
|
||||
}
|
||||
|
||||
decompressor->flush();
|
||||
|
||||
stats.narRead++;
|
||||
//stats.narReadCompressedBytes += nar->size(); // FIXME
|
||||
stats.narReadBytes += narSize;
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -39,21 +39,16 @@ void builtinFetchurl(const BasicDerivation & drv, const std::string & netrcData)
|
|||
request.verifyTLS = false;
|
||||
request.decompress = false;
|
||||
|
||||
downloader->download(std::move(request), sink);
|
||||
auto decompressor = makeDecompressionSink(
|
||||
hasSuffix(mainUrl, ".xz") ? "xz" : "none", sink);
|
||||
downloader->download(std::move(request), *decompressor);
|
||||
decompressor->finish();
|
||||
});
|
||||
|
||||
if (get(drv.env, "unpack", "") == "1") {
|
||||
|
||||
if (hasSuffix(mainUrl, ".xz")) {
|
||||
auto source2 = sinkToSource([&](Sink & sink) {
|
||||
decompress("xz", *source, sink);
|
||||
});
|
||||
restorePath(storePath, *source2);
|
||||
} else
|
||||
restorePath(storePath, *source);
|
||||
|
||||
} else
|
||||
writeFile(storePath, *source);
|
||||
if (get(drv.env, "unpack", "") == "1")
|
||||
restorePath(storePath, *source);
|
||||
else
|
||||
writeFile(storePath, *source);
|
||||
|
||||
auto executable = drv.env.find("executable");
|
||||
if (executable != drv.env.end() && executable->second == "1") {
|
||||
|
|
|
|||
|
|
@ -58,16 +58,6 @@ std::string resolveUri(const std::string & uri)
|
|||
return uri;
|
||||
}
|
||||
|
||||
ref<std::string> decodeContent(const std::string & encoding, ref<std::string> data)
|
||||
{
|
||||
if (encoding == "")
|
||||
return data;
|
||||
else if (encoding == "br")
|
||||
return decompress(encoding, *data);
|
||||
else
|
||||
throw Error("unsupported Content-Encoding '%s'", encoding);
|
||||
}
|
||||
|
||||
struct CurlDownloader : public Downloader
|
||||
{
|
||||
CURLM * curlm = 0;
|
||||
|
|
@ -106,6 +96,12 @@ struct CurlDownloader : public Downloader
|
|||
fmt(request.data ? "uploading '%s'" : "downloading '%s'", request.uri),
|
||||
{request.uri}, request.parentAct)
|
||||
, callback(callback)
|
||||
, finalSink([this](const unsigned char * data, size_t len) {
|
||||
if (this->request.dataCallback)
|
||||
this->request.dataCallback((char *) data, len);
|
||||
else
|
||||
this->result.data->append((char *) data, len);
|
||||
})
|
||||
{
|
||||
if (!request.expectedETag.empty())
|
||||
requestHeaders = curl_slist_append(requestHeaders, ("If-None-Match: " + request.expectedETag).c_str());
|
||||
|
|
@ -129,23 +125,40 @@ struct CurlDownloader : public Downloader
|
|||
}
|
||||
}
|
||||
|
||||
template<class T>
|
||||
void fail(const T & e)
|
||||
void failEx(std::exception_ptr ex)
|
||||
{
|
||||
assert(!done);
|
||||
done = true;
|
||||
callback.rethrow(std::make_exception_ptr(e));
|
||||
callback.rethrow(ex);
|
||||
}
|
||||
|
||||
template<class T>
|
||||
void fail(const T & e)
|
||||
{
|
||||
failEx(std::make_exception_ptr(e));
|
||||
}
|
||||
|
||||
LambdaSink finalSink;
|
||||
std::shared_ptr<CompressionSink> decompressionSink;
|
||||
|
||||
std::exception_ptr writeException;
|
||||
|
||||
size_t writeCallback(void * contents, size_t size, size_t nmemb)
|
||||
{
|
||||
size_t realSize = size * nmemb;
|
||||
result.bodySize += realSize;
|
||||
if (request.dataCallback)
|
||||
request.dataCallback((char *) contents, realSize);
|
||||
else
|
||||
result.data->append((char *) contents, realSize);
|
||||
return realSize;
|
||||
try {
|
||||
size_t realSize = size * nmemb;
|
||||
result.bodySize += realSize;
|
||||
|
||||
if (!decompressionSink)
|
||||
decompressionSink = makeDecompressionSink(encoding, finalSink);
|
||||
|
||||
(*decompressionSink)((unsigned char *) contents, realSize);
|
||||
|
||||
return realSize;
|
||||
} catch (...) {
|
||||
writeException = std::current_exception();
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
static size_t writeCallbackWrapper(void * contents, size_t size, size_t nmemb, void * userp)
|
||||
|
|
@ -314,27 +327,33 @@ struct CurlDownloader : public Downloader
|
|||
debug("finished %s of '%s'; curl status = %d, HTTP status = %d, body = %d bytes",
|
||||
request.verb(), request.uri, code, httpStatus, result.bodySize);
|
||||
|
||||
if (decompressionSink)
|
||||
decompressionSink->finish();
|
||||
|
||||
if (code == CURLE_WRITE_ERROR && result.etag == request.expectedETag) {
|
||||
code = CURLE_OK;
|
||||
httpStatus = 304;
|
||||
}
|
||||
|
||||
if (code == CURLE_OK &&
|
||||
if (writeException)
|
||||
failEx(writeException);
|
||||
|
||||
else if (code == CURLE_OK &&
|
||||
(httpStatus == 200 || httpStatus == 201 || httpStatus == 204 || httpStatus == 304 || httpStatus == 226 /* FTP */ || httpStatus == 0 /* other protocol */))
|
||||
{
|
||||
result.cached = httpStatus == 304;
|
||||
done = true;
|
||||
|
||||
try {
|
||||
if (request.decompress)
|
||||
result.data = decodeContent(encoding, ref<std::string>(result.data));
|
||||
act.progress(result.data->size(), result.data->size());
|
||||
callback(std::move(result));
|
||||
} catch (...) {
|
||||
done = true;
|
||||
callback.rethrow();
|
||||
}
|
||||
} else {
|
||||
}
|
||||
|
||||
else {
|
||||
// We treat most errors as transient, but won't retry when hopeless
|
||||
Error err = Transient;
|
||||
|
||||
|
|
@ -369,6 +388,7 @@ struct CurlDownloader : public Downloader
|
|||
case CURLE_UNKNOWN_OPTION:
|
||||
case CURLE_SSL_CACERT_BADFILE:
|
||||
case CURLE_TOO_MANY_REDIRECTS:
|
||||
case CURLE_WRITE_ERROR:
|
||||
err = Misc;
|
||||
break;
|
||||
default: // Shut up warnings
|
||||
|
|
|
|||
|
|
@ -88,7 +88,4 @@ public:
|
|||
|
||||
bool isUri(const string & s);
|
||||
|
||||
/* Decode data according to the Content-Encoding header. */
|
||||
ref<std::string> decodeContent(const std::string & encoding, ref<std::string> data);
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -153,10 +153,8 @@ S3Helper::DownloadResult S3Helper::getObject(
|
|||
auto result = checkAws(fmt("AWS error fetching '%s'", key),
|
||||
client->GetObject(request));
|
||||
|
||||
res.data = decodeContent(
|
||||
result.GetContentEncoding(),
|
||||
make_ref<std::string>(
|
||||
dynamic_cast<std::stringstream &>(result.GetBody()).str()));
|
||||
res.data = decompress(result.GetContentEncoding(),
|
||||
dynamic_cast<std::stringstream &>(result.GetBody()).str());
|
||||
|
||||
} catch (S3Error & e) {
|
||||
if (e.err != Aws::S3::S3Errors::NO_SUCH_KEY) throw;
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue