Enable HTTP/2 support
The binary cache store can now use HTTP/2 to do lookups. This is much more efficient than HTTP/1.1 due to multiplexing: we can issue many requests in parallel over a single TCP connection. Thus it's no longer necessary to use a bunch of concurrent TCP connections (25 by default). For example, downloading 802 .narinfo files from https://cache.nixos.org/, using a single TCP connection, takes 11.8s with HTTP/1.1, but only 0.61s with HTTP/2. This did require a fairly substantial rewrite of the Downloader class to use the curl multi interface, because otherwise curl wouldn't be able to do multiplexing for us. As a bonus, we get connection reuse even with HTTP/1.1. All downloads are now handled by a single worker thread. Clients call Downloader::enqueueDownload() to tell the worker thread to start the download, getting a std::future to the result.
This commit is contained in:
		
							parent
							
								
									a75d11a7e6
								
							
						
					
					
						commit
						90ad02bf62
					
				
					 9 changed files with 433 additions and 210 deletions
				
			
		|  | @ -55,7 +55,7 @@ bool parseSearchPathArg(Strings::iterator & i, | ||||||
| Path lookupFileArg(EvalState & state, string s) | Path lookupFileArg(EvalState & state, string s) | ||||||
| { | { | ||||||
|     if (isUri(s)) |     if (isUri(s)) | ||||||
|         return makeDownloader()->downloadCached(state.store, s, true); |         return getDownloader()->downloadCached(state.store, s, true); | ||||||
|     else if (s.size() > 2 && s.at(0) == '<' && s.at(s.size() - 1) == '>') { |     else if (s.size() > 2 && s.at(0) == '<' && s.at(s.size() - 1) == '>') { | ||||||
|         Path p = s.substr(1, s.size() - 2); |         Path p = s.substr(1, s.size() - 2); | ||||||
|         return state.findFile(p); |         return state.findFile(p); | ||||||
|  |  | ||||||
|  | @ -662,7 +662,7 @@ std::pair<bool, std::string> EvalState::resolveSearchPathElem(const SearchPathEl | ||||||
|                 // FIXME: support specifying revision/branch |                 // FIXME: support specifying revision/branch | ||||||
|                 res = { true, exportGit(store, elem.second, "master") }; |                 res = { true, exportGit(store, elem.second, "master") }; | ||||||
|             else |             else | ||||||
|                 res = { true, makeDownloader()->downloadCached(store, elem.second, true) }; |                 res = { true, getDownloader()->downloadCached(store, elem.second, true) }; | ||||||
|         } catch (DownloadError & e) { |         } catch (DownloadError & e) { | ||||||
|             printMsg(lvlError, format("warning: Nix search path entry ‘%1%’ cannot be downloaded, ignoring") % elem.second); |             printMsg(lvlError, format("warning: Nix search path entry ‘%1%’ cannot be downloaded, ignoring") % elem.second); | ||||||
|             res = { false, "" }; |             res = { false, "" }; | ||||||
|  |  | ||||||
|  | @ -1769,7 +1769,7 @@ void fetch(EvalState & state, const Pos & pos, Value * * args, Value & v, | ||||||
|     if (state.restricted && !expectedHash) |     if (state.restricted && !expectedHash) | ||||||
|         throw Error(format("‘%1%’ is not allowed in restricted mode") % who); |         throw Error(format("‘%1%’ is not allowed in restricted mode") % who); | ||||||
| 
 | 
 | ||||||
|     Path res = makeDownloader()->downloadCached(state.store, url, unpack, name, expectedHash); |     Path res = getDownloader()->downloadCached(state.store, url, unpack, name, expectedHash); | ||||||
|     mkString(v, res, PathSet({res})); |     mkString(v, res, PathSet({res})); | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  |  | ||||||
|  | @ -17,13 +17,15 @@ void builtinFetchurl(const BasicDerivation & drv) | ||||||
|     auto fetch = [&](const string & url) { |     auto fetch = [&](const string & url) { | ||||||
|         /* No need to do TLS verification, because we check the hash of
 |         /* No need to do TLS verification, because we check the hash of
 | ||||||
|            the result anyway. */ |            the result anyway. */ | ||||||
|         DownloadOptions options; |         DownloadRequest request(url); | ||||||
|         options.verifyTLS = false; |         request.verifyTLS = false; | ||||||
| 
 | 
 | ||||||
|         /* Show a progress indicator, even though stderr is not a tty. */ |         /* Show a progress indicator, even though stderr is not a tty. */ | ||||||
|         options.showProgress = DownloadOptions::yes; |         request.showProgress = DownloadRequest::yes; | ||||||
| 
 | 
 | ||||||
|         auto data = makeDownloader()->download(url, options); |         /* Note: have to use a fresh downloader here because we're in
 | ||||||
|  |            a forked process. */ | ||||||
|  |         auto data = makeDownloader()->download(request); | ||||||
|         assert(data.data); |         assert(data.data); | ||||||
| 
 | 
 | ||||||
|         return data.data; |         return data.data; | ||||||
|  |  | ||||||
|  | @ -5,10 +5,15 @@ | ||||||
| #include "store-api.hh" | #include "store-api.hh" | ||||||
| #include "archive.hh" | #include "archive.hh" | ||||||
| 
 | 
 | ||||||
|  | #include <unistd.h> | ||||||
|  | #include <fcntl.h> | ||||||
|  | 
 | ||||||
| #include <curl/curl.h> | #include <curl/curl.h> | ||||||
| 
 | 
 | ||||||
| #include <iostream> | #include <iostream> | ||||||
| #include <thread> | #include <thread> | ||||||
|  | #include <cmath> | ||||||
|  | #include <random> | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| namespace nix { | namespace nix { | ||||||
|  | @ -30,51 +35,103 @@ std::string resolveUri(const std::string & uri) | ||||||
| 
 | 
 | ||||||
| struct CurlDownloader : public Downloader | struct CurlDownloader : public Downloader | ||||||
| { | { | ||||||
|     CURL * curl; |     CURLM * curlm = 0; | ||||||
|     ref<std::string> data; |  | ||||||
|     string etag, status, expectedETag, effectiveUrl; |  | ||||||
| 
 | 
 | ||||||
|     struct curl_slist * requestHeaders; |     std::random_device rd; | ||||||
|  |     std::mt19937 mt19937; | ||||||
| 
 | 
 | ||||||
|     bool showProgress; |     struct DownloadItem : public std::enable_shared_from_this<DownloadItem> | ||||||
|  |     { | ||||||
|  |         CurlDownloader & downloader; | ||||||
|  |         DownloadRequest request; | ||||||
|  |         DownloadResult result; | ||||||
|  |         bool done = false; // whether the promise has been set
 | ||||||
|  |         std::promise<DownloadResult> promise; | ||||||
|  |         CURL * req = 0; | ||||||
|  |         bool active = false; // whether the handle has been added to the multi object
 | ||||||
|  |         std::string status; | ||||||
|  | 
 | ||||||
|  |         bool showProgress = false; | ||||||
|         double prevProgressTime{0}, startTime{0}; |         double prevProgressTime{0}, startTime{0}; | ||||||
|         unsigned int moveBack{1}; |         unsigned int moveBack{1}; | ||||||
| 
 | 
 | ||||||
|  |         unsigned int attempt = 0; | ||||||
|  | 
 | ||||||
|  |         /* Don't start this download until the specified time point
 | ||||||
|  |            has been reached. */ | ||||||
|  |         std::chrono::steady_clock::time_point embargo; | ||||||
|  | 
 | ||||||
|  |         struct curl_slist * requestHeaders = 0; | ||||||
|  | 
 | ||||||
|  |         DownloadItem(CurlDownloader & downloader, const DownloadRequest & request) | ||||||
|  |             : downloader(downloader), request(request) | ||||||
|  |         { | ||||||
|  |             showProgress = | ||||||
|  |                 request.showProgress == DownloadRequest::yes || | ||||||
|  |                 (request.showProgress == DownloadRequest::automatic && isatty(STDERR_FILENO)); | ||||||
|  | 
 | ||||||
|  |             if (!request.expectedETag.empty()) | ||||||
|  |                 requestHeaders = curl_slist_append(requestHeaders, ("If-None-Match: " + request.expectedETag).c_str()); | ||||||
|  |         } | ||||||
|  | 
 | ||||||
|  |         ~DownloadItem() | ||||||
|  |         { | ||||||
|  |             if (req) { | ||||||
|  |                 if (active) | ||||||
|  |                     curl_multi_remove_handle(downloader.curlm, req); | ||||||
|  |                 curl_easy_cleanup(req); | ||||||
|  |             } | ||||||
|  |             if (requestHeaders) curl_slist_free_all(requestHeaders); | ||||||
|  |             try { | ||||||
|  |                 if (!done) | ||||||
|  |                     fail(DownloadError(Transient, format("download of ‘%s’ was interrupted") % request.uri)); | ||||||
|  |             } catch (...) { | ||||||
|  |                 ignoreException(); | ||||||
|  |             } | ||||||
|  |         } | ||||||
|  | 
 | ||||||
|  |         template<class T> | ||||||
|  |         void fail(const T & e) | ||||||
|  |         { | ||||||
|  |             promise.set_exception(std::make_exception_ptr(e)); | ||||||
|  |             done = true; | ||||||
|  |         } | ||||||
|  | 
 | ||||||
|         size_t writeCallback(void * contents, size_t size, size_t nmemb) |         size_t writeCallback(void * contents, size_t size, size_t nmemb) | ||||||
|         { |         { | ||||||
|             size_t realSize = size * nmemb; |             size_t realSize = size * nmemb; | ||||||
|         data->append((char *) contents, realSize); |             result.data->append((char *) contents, realSize); | ||||||
|             return realSize; |             return realSize; | ||||||
|         } |         } | ||||||
| 
 | 
 | ||||||
|         static size_t writeCallbackWrapper(void * contents, size_t size, size_t nmemb, void * userp) |         static size_t writeCallbackWrapper(void * contents, size_t size, size_t nmemb, void * userp) | ||||||
|         { |         { | ||||||
|         return ((CurlDownloader *) userp)->writeCallback(contents, size, nmemb); |             return ((DownloadItem *) userp)->writeCallback(contents, size, nmemb); | ||||||
|         } |         } | ||||||
| 
 | 
 | ||||||
|         size_t headerCallback(void * contents, size_t size, size_t nmemb) |         size_t headerCallback(void * contents, size_t size, size_t nmemb) | ||||||
|         { |         { | ||||||
|             size_t realSize = size * nmemb; |             size_t realSize = size * nmemb; | ||||||
|         string line = string((char *) contents, realSize); |             std::string line((char *) contents, realSize); | ||||||
|         printMsg(lvlVomit, format("got header: %1%") % trim(line)); |             printMsg(lvlVomit, format("got header for ‘%s’: %s") % request.uri % trim(line)); | ||||||
|             if (line.compare(0, 5, "HTTP/") == 0) { // new response starts
 |             if (line.compare(0, 5, "HTTP/") == 0) { // new response starts
 | ||||||
|             etag = ""; |                 result.etag = ""; | ||||||
|                 auto ss = tokenizeString<vector<string>>(line, " "); |                 auto ss = tokenizeString<vector<string>>(line, " "); | ||||||
|                 status = ss.size() >= 2 ? ss[1] : ""; |                 status = ss.size() >= 2 ? ss[1] : ""; | ||||||
|  |                 result.data = std::make_shared<std::string>(); | ||||||
|             } else { |             } else { | ||||||
|                 auto i = line.find(':'); |                 auto i = line.find(':'); | ||||||
|                 if (i != string::npos) { |                 if (i != string::npos) { | ||||||
|                 string name = trim(string(line, 0, i)); |                     string name = toLower(trim(string(line, 0, i))); | ||||||
|                 if (name == "ETag") { // FIXME: case
 |                     if (name == "etag") { | ||||||
|                     etag = trim(string(line, i + 1)); |                         result.etag = trim(string(line, i + 1)); | ||||||
|                         /* Hack to work around a GitHub bug: it sends
 |                         /* Hack to work around a GitHub bug: it sends
 | ||||||
|                            ETags, but ignores If-None-Match. So if we get |                            ETags, but ignores If-None-Match. So if we get | ||||||
|                            the expected ETag on a 200 response, then shut |                            the expected ETag on a 200 response, then shut | ||||||
|                            down the connection because we already have the |                            down the connection because we already have the | ||||||
|                            data. */ |                            data. */ | ||||||
|                     printMsg(lvlDebug, format("got ETag: %1%") % etag); |                         if (result.etag == request.expectedETag && status == "200") { | ||||||
|                     if (etag == expectedETag && status == "200") { |                             debug(format("shutting down on 200 HTTP response with expected ETag")); | ||||||
|                         printMsg(lvlDebug, format("shutting down on 200 HTTP response with expected ETag")); |  | ||||||
|                             return 0; |                             return 0; | ||||||
|                         } |                         } | ||||||
|                     } |                     } | ||||||
|  | @ -85,7 +142,7 @@ struct CurlDownloader : public Downloader | ||||||
| 
 | 
 | ||||||
|         static size_t headerCallbackWrapper(void * contents, size_t size, size_t nmemb, void * userp) |         static size_t headerCallbackWrapper(void * contents, size_t size, size_t nmemb, void * userp) | ||||||
|         { |         { | ||||||
|         return ((CurlDownloader *) userp)->headerCallback(contents, size, nmemb); |             return ((DownloadItem *) userp)->headerCallback(contents, size, nmemb); | ||||||
|         } |         } | ||||||
| 
 | 
 | ||||||
|         int progressCallback(double dltotal, double dlnow) |         int progressCallback(double dltotal, double dlnow) | ||||||
|  | @ -108,147 +165,298 @@ struct CurlDownloader : public Downloader | ||||||
| 
 | 
 | ||||||
|         static int progressCallbackWrapper(void * userp, double dltotal, double dlnow, double ultotal, double ulnow) |         static int progressCallbackWrapper(void * userp, double dltotal, double dlnow, double ultotal, double ulnow) | ||||||
|         { |         { | ||||||
|         return ((CurlDownloader *) userp)->progressCallback(dltotal, dlnow); |             return ((DownloadItem *) userp)->progressCallback(dltotal, dlnow); | ||||||
|         } |         } | ||||||
| 
 | 
 | ||||||
|     CurlDownloader() |         void init() | ||||||
|         : data(make_ref<std::string>()) |  | ||||||
|         { |         { | ||||||
|         requestHeaders = 0; |             // FIXME: handle parallel downloads.
 | ||||||
| 
 |  | ||||||
|         curl = curl_easy_init(); |  | ||||||
|         if (!curl) throw nix::Error("unable to initialize curl"); |  | ||||||
|     } |  | ||||||
| 
 |  | ||||||
|     ~CurlDownloader() |  | ||||||
|     { |  | ||||||
|         if (curl) curl_easy_cleanup(curl); |  | ||||||
|         if (requestHeaders) curl_slist_free_all(requestHeaders); |  | ||||||
|     } |  | ||||||
| 
 |  | ||||||
|     bool fetch(const string & url, const DownloadOptions & options) |  | ||||||
|     { |  | ||||||
|         showProgress = |  | ||||||
|             options.showProgress == DownloadOptions::yes || |  | ||||||
|             (options.showProgress == DownloadOptions::automatic && isatty(STDERR_FILENO)); |  | ||||||
| 
 |  | ||||||
|         curl_easy_reset(curl); |  | ||||||
| 
 |  | ||||||
|         curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1L); |  | ||||||
|         curl_easy_setopt(curl, CURLOPT_USERAGENT, ("Nix/" + nixVersion).c_str()); |  | ||||||
|         curl_easy_setopt(curl, CURLOPT_FAILONERROR, 1); |  | ||||||
| 
 |  | ||||||
|         curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, writeCallbackWrapper); |  | ||||||
|         curl_easy_setopt(curl, CURLOPT_WRITEDATA, (void *) this); |  | ||||||
| 
 |  | ||||||
|         curl_easy_setopt(curl, CURLOPT_HEADERFUNCTION, headerCallbackWrapper); |  | ||||||
|         curl_easy_setopt(curl, CURLOPT_HEADERDATA, (void *) this); |  | ||||||
| 
 |  | ||||||
|         curl_easy_setopt(curl, CURLOPT_PROGRESSFUNCTION, progressCallbackWrapper); |  | ||||||
|         curl_easy_setopt(curl, CURLOPT_PROGRESSDATA, (void *) this); |  | ||||||
|         curl_easy_setopt(curl, CURLOPT_NOPROGRESS, 0); |  | ||||||
| 
 |  | ||||||
|         curl_easy_setopt(curl, CURLOPT_NOSIGNAL, 1); |  | ||||||
| 
 |  | ||||||
|         curl_easy_setopt(curl, CURLOPT_URL, url.c_str()); |  | ||||||
| 
 |  | ||||||
|         if (options.verifyTLS) |  | ||||||
|             curl_easy_setopt(curl, CURLOPT_CAINFO, getEnv("SSL_CERT_FILE", "/etc/ssl/certs/ca-certificates.crt").c_str()); |  | ||||||
|         else { |  | ||||||
|             curl_easy_setopt(curl, CURLOPT_SSL_VERIFYPEER, 0); |  | ||||||
|             curl_easy_setopt(curl, CURLOPT_SSL_VERIFYHOST, 0); |  | ||||||
|         } |  | ||||||
| 
 |  | ||||||
|         data = make_ref<std::string>(); |  | ||||||
| 
 |  | ||||||
|         if (requestHeaders) { |  | ||||||
|             curl_slist_free_all(requestHeaders); |  | ||||||
|             requestHeaders = 0; |  | ||||||
|         } |  | ||||||
| 
 |  | ||||||
|         if (!options.expectedETag.empty()) { |  | ||||||
|             this->expectedETag = options.expectedETag; |  | ||||||
|             requestHeaders = curl_slist_append(requestHeaders, ("If-None-Match: " + options.expectedETag).c_str()); |  | ||||||
|         } |  | ||||||
| 
 |  | ||||||
|         curl_easy_setopt(curl, CURLOPT_HTTPHEADER, requestHeaders); |  | ||||||
| 
 |  | ||||||
|         if (options.head) |  | ||||||
|             curl_easy_setopt(curl, CURLOPT_NOBODY, 1); |  | ||||||
| 
 |  | ||||||
|             if (showProgress) { |             if (showProgress) { | ||||||
|             std::cerr << (format("downloading ‘%1%’... ") % url); |                 std::cerr << (format("downloading ‘%1%’... ") % request.uri); | ||||||
|                 std::cerr.flush(); |                 std::cerr.flush(); | ||||||
|                 startTime = getTime(); |                 startTime = getTime(); | ||||||
|             } |             } | ||||||
| 
 | 
 | ||||||
|         CURLcode res = curl_easy_perform(curl); |             if (!req) req = curl_easy_init(); | ||||||
|  | 
 | ||||||
|  |             curl_easy_reset(req); | ||||||
|  |             curl_easy_setopt(req, CURLOPT_URL, request.uri.c_str()); | ||||||
|  |             curl_easy_setopt(req, CURLOPT_FOLLOWLOCATION, 1L); | ||||||
|  |             curl_easy_setopt(req, CURLOPT_NOSIGNAL, 1); | ||||||
|  |             curl_easy_setopt(req, CURLOPT_USERAGENT, ("Nix/" + nixVersion).c_str()); | ||||||
|  |             curl_easy_setopt(req, CURLOPT_PIPEWAIT, 1); | ||||||
|  |             curl_easy_setopt(req, CURLOPT_HTTP_VERSION, CURL_HTTP_VERSION_2TLS); | ||||||
|  |             curl_easy_setopt(req, CURLOPT_WRITEFUNCTION, DownloadItem::writeCallbackWrapper); | ||||||
|  |             curl_easy_setopt(req, CURLOPT_WRITEDATA, this); | ||||||
|  |             curl_easy_setopt(req, CURLOPT_HEADERFUNCTION, DownloadItem::headerCallbackWrapper); | ||||||
|  |             curl_easy_setopt(req, CURLOPT_HEADERDATA, this); | ||||||
|  | 
 | ||||||
|  |             curl_easy_setopt(req, CURLOPT_PROGRESSFUNCTION, progressCallbackWrapper); | ||||||
|  |             curl_easy_setopt(req, CURLOPT_PROGRESSDATA, this); | ||||||
|  |             curl_easy_setopt(req, CURLOPT_NOPROGRESS, 0); | ||||||
|  | 
 | ||||||
|  |             curl_easy_setopt(req, CURLOPT_HTTPHEADER, requestHeaders); | ||||||
|  | 
 | ||||||
|  |             if (request.head) | ||||||
|  |                 curl_easy_setopt(req, CURLOPT_NOBODY, 1); | ||||||
|  | 
 | ||||||
|  |             if (request.verifyTLS) | ||||||
|  |                 curl_easy_setopt(req, CURLOPT_CAINFO, getEnv("SSL_CERT_FILE", "/etc/ssl/certs/ca-certificates.crt").c_str()); | ||||||
|  |             else { | ||||||
|  |                 curl_easy_setopt(req, CURLOPT_SSL_VERIFYPEER, 0); | ||||||
|  |                 curl_easy_setopt(req, CURLOPT_SSL_VERIFYHOST, 0); | ||||||
|  |             } | ||||||
|  | 
 | ||||||
|  |             result.data = std::make_shared<std::string>(); | ||||||
|  |         } | ||||||
|  | 
 | ||||||
|  |         void finish(CURLcode code) | ||||||
|  |         { | ||||||
|             if (showProgress) |             if (showProgress) | ||||||
|                 //std::cerr << "\e[" << moveBack << "D\e[K\n";
 |                 //std::cerr << "\e[" << moveBack << "D\e[K\n";
 | ||||||
|                 std::cerr << "\n"; |                 std::cerr << "\n"; | ||||||
|         checkInterrupt(); |  | ||||||
|         if (res == CURLE_WRITE_ERROR && etag == options.expectedETag) return false; |  | ||||||
| 
 | 
 | ||||||
|         long httpStatus = -1; |             long httpStatus = 0; | ||||||
|         curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &httpStatus); |             curl_easy_getinfo(req, CURLINFO_RESPONSE_CODE, &httpStatus); | ||||||
| 
 | 
 | ||||||
|         if (res != CURLE_OK) { |             char * effectiveUrlCStr; | ||||||
|  |             curl_easy_getinfo(req, CURLINFO_EFFECTIVE_URL, &effectiveUrlCStr); | ||||||
|  |             if (effectiveUrlCStr) | ||||||
|  |                 result.effectiveUrl = effectiveUrlCStr; | ||||||
|  | 
 | ||||||
|  |             debug(format("finished download of ‘%s’; curl status = %d, HTTP status = %d, body = %d bytes") | ||||||
|  |                 % request.uri % code % httpStatus % (result.data ? result.data->size() : 0)); | ||||||
|  | 
 | ||||||
|  |             if (code == CURLE_WRITE_ERROR && result.etag == request.expectedETag) { | ||||||
|  |                 code = CURLE_OK; | ||||||
|  |                 httpStatus = 304; | ||||||
|  |             } | ||||||
|  | 
 | ||||||
|  |             if (code == CURLE_OK && | ||||||
|  |                 (httpStatus == 200 || httpStatus == 304 || httpStatus == 226 /* FTP */ || httpStatus == 0 /* other protocol */)) | ||||||
|  |             { | ||||||
|  |                 result.cached = httpStatus == 304; | ||||||
|  |                 promise.set_value(result); | ||||||
|  |                 done = true; | ||||||
|  |             } else { | ||||||
|                 Error err = |                 Error err = | ||||||
|                 httpStatus == 404 ? NotFound : |                     (httpStatus == 404 || code == CURLE_FILE_COULDNT_READ_FILE) ? NotFound : | ||||||
|                     httpStatus == 403 ? Forbidden : |                     httpStatus == 403 ? Forbidden : | ||||||
|                     (httpStatus == 408 || httpStatus == 500 || httpStatus == 503 |                     (httpStatus == 408 || httpStatus == 500 || httpStatus == 503 | ||||||
|                         || httpStatus == 504  || httpStatus == 522 || httpStatus == 524 |                         || httpStatus == 504  || httpStatus == 522 || httpStatus == 524 | ||||||
|                  || res == CURLE_COULDNT_RESOLVE_HOST) ? Transient : |                         || code == CURLE_COULDNT_RESOLVE_HOST) ? Transient : | ||||||
|                     Misc; |                     Misc; | ||||||
|             if (res == CURLE_HTTP_RETURNED_ERROR && httpStatus != -1) |  | ||||||
|                 throw DownloadError(err, format("unable to download ‘%s’: HTTP error %d") |  | ||||||
|                     % url % httpStatus); |  | ||||||
|             else |  | ||||||
|                 throw DownloadError(err, format("unable to download ‘%s’: %s (%d)") |  | ||||||
|                     % url % curl_easy_strerror(res) % res); |  | ||||||
|         } |  | ||||||
| 
 | 
 | ||||||
|         char *effectiveUrlCStr; |  | ||||||
|         curl_easy_getinfo(curl, CURLINFO_EFFECTIVE_URL, &effectiveUrlCStr); |  | ||||||
|         if (effectiveUrlCStr) |  | ||||||
|             effectiveUrl = effectiveUrlCStr; |  | ||||||
| 
 |  | ||||||
|         if (httpStatus == 304) return false; |  | ||||||
| 
 |  | ||||||
|         return true; |  | ||||||
|     } |  | ||||||
| 
 |  | ||||||
|     DownloadResult download(string url, const DownloadOptions & options) override |  | ||||||
|     { |  | ||||||
|         size_t attempt = 0; |  | ||||||
| 
 |  | ||||||
|         while (true) { |  | ||||||
|             try { |  | ||||||
|                 DownloadResult res; |  | ||||||
|                 if (fetch(resolveUri(url), options)) { |  | ||||||
|                     res.cached = false; |  | ||||||
|                     res.data = data; |  | ||||||
|                 } else |  | ||||||
|                     res.cached = true; |  | ||||||
|                 res.effectiveUrl = effectiveUrl; |  | ||||||
|                 res.etag = etag; |  | ||||||
|                 return res; |  | ||||||
|             } catch (DownloadError & e) { |  | ||||||
|                 attempt++; |                 attempt++; | ||||||
|                 if (e.error != Transient || attempt >= options.tries) throw; | 
 | ||||||
|                 auto ms = options.baseRetryTimeMs * (1 << (attempt - 1)); |                 auto exc = | ||||||
|                 printMsg(lvlError, format("warning: %s; retrying in %d ms") % e.what() % ms); |                     httpStatus != 0 | ||||||
|                 std::this_thread::sleep_for(std::chrono::milliseconds(ms)); |                     ? DownloadError(err, format("unable to download ‘%s’: HTTP error %d") % request.uri % httpStatus) | ||||||
|  |                     : DownloadError(err, format("unable to download ‘%s’: %s (%d)") % request.uri % curl_easy_strerror(code) % code); | ||||||
|  | 
 | ||||||
|  |                 /* If this is a transient error, then maybe retry the
 | ||||||
|  |                    download after a while. */ | ||||||
|  |                 if (err == Transient && attempt < request.tries) { | ||||||
|  |                     int ms = request.baseRetryTimeMs * std::pow(2.0f, attempt - 1 + std::uniform_real_distribution<>(0.0, 0.5)(downloader.mt19937)); | ||||||
|  |                     printMsg(lvlError, format("warning: %s; retrying in %d ms") % exc.what() % ms); | ||||||
|  |                     embargo = std::chrono::steady_clock::now() + std::chrono::milliseconds(ms); | ||||||
|  |                     downloader.enqueueItem(shared_from_this()); | ||||||
|                 } |                 } | ||||||
|  |                 else | ||||||
|  |                     fail(exc); | ||||||
|             } |             } | ||||||
|         } |         } | ||||||
|     }; |     }; | ||||||
| 
 | 
 | ||||||
|  |     struct State | ||||||
|  |     { | ||||||
|  |         bool quit = false; | ||||||
|  |         std::vector<std::shared_ptr<DownloadItem>> incoming; | ||||||
|  |     }; | ||||||
|  | 
 | ||||||
|  |     Sync<State> state_; | ||||||
|  | 
 | ||||||
|  |     /* We can't use a std::condition_variable to wake up the curl
 | ||||||
|  |        thread, because it only monitors file descriptors. So use a | ||||||
|  |        pipe instead. */ | ||||||
|  |     Pipe wakeupPipe; | ||||||
|  | 
 | ||||||
|  |     std::thread workerThread; | ||||||
|  | 
 | ||||||
|  |     CurlDownloader() | ||||||
|  |     { | ||||||
|  |         static std::once_flag globalInit; | ||||||
|  |         std::call_once(globalInit, curl_global_init, CURL_GLOBAL_ALL); | ||||||
|  | 
 | ||||||
|  |         curlm = curl_multi_init(); | ||||||
|  | 
 | ||||||
|  |         curl_multi_setopt(curlm, CURLMOPT_PIPELINING, CURLPIPE_MULTIPLEX); | ||||||
|  |         curl_multi_setopt(curlm, CURLMOPT_MAX_TOTAL_CONNECTIONS, 25); // FIXME: configurable
 | ||||||
|  | 
 | ||||||
|  |         wakeupPipe.create(); | ||||||
|  |         fcntl(wakeupPipe.readSide.get(), F_SETFL, O_NONBLOCK); | ||||||
|  | 
 | ||||||
|  |         workerThread = std::thread([&]() { workerThreadEntry(); }); | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     ~CurlDownloader() | ||||||
|  |     { | ||||||
|  |         /* Signal the worker thread to exit. */ | ||||||
|  |         { | ||||||
|  |             auto state(state_.lock()); | ||||||
|  |             state->quit = true; | ||||||
|  |         } | ||||||
|  |         writeFull(wakeupPipe.writeSide.get(), " "); | ||||||
|  | 
 | ||||||
|  |         workerThread.join(); | ||||||
|  | 
 | ||||||
|  |         if (curlm) curl_multi_cleanup(curlm); | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     void workerThreadMain() | ||||||
|  |     { | ||||||
|  |         std::map<CURL *, std::shared_ptr<DownloadItem>> items; | ||||||
|  | 
 | ||||||
|  |         bool quit; | ||||||
|  | 
 | ||||||
|  |         std::chrono::steady_clock::time_point nextWakeup; | ||||||
|  | 
 | ||||||
|  |         while (!quit) { | ||||||
|  |             checkInterrupt(); | ||||||
|  | 
 | ||||||
|  |             /* Let curl do its thing. */ | ||||||
|  |             int running; | ||||||
|  |             CURLMcode mc = curl_multi_perform(curlm, &running); | ||||||
|  |             if (mc != CURLM_OK) | ||||||
|  |                 throw nix::Error(format("unexpected error from curl_multi_perform(): %s") % curl_multi_strerror(mc)); | ||||||
|  | 
 | ||||||
|  |             /* Set the promises of any finished requests. */ | ||||||
|  |             CURLMsg * msg; | ||||||
|  |             int left; | ||||||
|  |             while ((msg = curl_multi_info_read(curlm, &left))) { | ||||||
|  |                 if (msg->msg == CURLMSG_DONE) { | ||||||
|  |                     auto i = items.find(msg->easy_handle); | ||||||
|  |                     assert(i != items.end()); | ||||||
|  |                     i->second->finish(msg->data.result); | ||||||
|  |                     curl_multi_remove_handle(curlm, i->second->req); | ||||||
|  |                     i->second->active = false; | ||||||
|  |                     items.erase(i); | ||||||
|  |                 } | ||||||
|  |             } | ||||||
|  | 
 | ||||||
|  |             /* Wait for activity, including wakeup events. */ | ||||||
|  |             int numfds = 0; | ||||||
|  |             struct curl_waitfd extraFDs[1]; | ||||||
|  |             extraFDs[0].fd = wakeupPipe.readSide.get(); | ||||||
|  |             extraFDs[0].events = CURL_WAIT_POLLIN; | ||||||
|  |             extraFDs[0].revents = 0; | ||||||
|  |             auto sleepTimeMs = | ||||||
|  |                 nextWakeup != std::chrono::steady_clock::time_point() | ||||||
|  |                 ? std::max(0, (int) std::chrono::duration_cast<std::chrono::milliseconds>(nextWakeup - std::chrono::steady_clock::now()).count()) | ||||||
|  |                 : 1000000000; | ||||||
|  |             //printMsg(lvlVomit, format("download thread waiting for %d ms") % sleepTimeMs);
 | ||||||
|  |             mc = curl_multi_wait(curlm, extraFDs, 1, sleepTimeMs, &numfds); | ||||||
|  |             if (mc != CURLM_OK) | ||||||
|  |                 throw nix::Error(format("unexpected error from curl_multi_wait(): %s") % curl_multi_strerror(mc)); | ||||||
|  | 
 | ||||||
|  |             nextWakeup = std::chrono::steady_clock::time_point(); | ||||||
|  | 
 | ||||||
|  |             /* Add new curl requests from the incoming requests queue,
 | ||||||
|  |                except for requests that are embargoed (waiting for a | ||||||
|  |                retry timeout to expire). FIXME: should use a priority | ||||||
|  |                queue for the embargoed items to prevent repeated O(n) | ||||||
|  |                checks. */ | ||||||
|  |             if (extraFDs[0].revents & CURL_WAIT_POLLIN) { | ||||||
|  |                 char buf[1024]; | ||||||
|  |                 auto res = read(extraFDs[0].fd, buf, sizeof(buf)); | ||||||
|  |                 if (res == -1 && errno != EINTR) | ||||||
|  |                     throw SysError("reading curl wakeup socket"); | ||||||
|  |             } | ||||||
|  | 
 | ||||||
|  |             std::vector<std::shared_ptr<DownloadItem>> incoming, embargoed; | ||||||
|  |             auto now = std::chrono::steady_clock::now(); | ||||||
|  | 
 | ||||||
|  |             { | ||||||
|  |                 auto state(state_.lock()); | ||||||
|  |                 for (auto & item: state->incoming) { | ||||||
|  |                     if (item->embargo <= now) | ||||||
|  |                         incoming.push_back(item); | ||||||
|  |                     else { | ||||||
|  |                         embargoed.push_back(item); | ||||||
|  |                         if (nextWakeup == std::chrono::steady_clock::time_point() | ||||||
|  |                             || item->embargo < nextWakeup) | ||||||
|  |                             nextWakeup = item->embargo; | ||||||
|  |                     } | ||||||
|  |                 } | ||||||
|  |                 state->incoming = embargoed; | ||||||
|  |                 quit = state->quit; | ||||||
|  |             } | ||||||
|  | 
 | ||||||
|  |             for (auto & item : incoming) { | ||||||
|  |                 debug(format("starting download of %s") % item->request.uri); | ||||||
|  |                 item->init(); | ||||||
|  |                 curl_multi_add_handle(curlm, item->req); | ||||||
|  |                 item->active = true; | ||||||
|  |                 items[item->req] = item; | ||||||
|  |             } | ||||||
|  |         } | ||||||
|  | 
 | ||||||
|  |         debug("download thread shutting down"); | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     void workerThreadEntry() | ||||||
|  |     { | ||||||
|  |         try { | ||||||
|  |             workerThreadMain(); | ||||||
|  |         } catch (Interrupted & e) { | ||||||
|  |         } catch (std::exception & e) { | ||||||
|  |             printMsg(lvlError, format("unexpected error in download thread: %s") % e.what()); | ||||||
|  |         } | ||||||
|  | 
 | ||||||
|  |         { | ||||||
|  |             auto state(state_.lock()); | ||||||
|  |             state->incoming.clear(); | ||||||
|  |             state->quit = true; | ||||||
|  |         } | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     void enqueueItem(std::shared_ptr<DownloadItem> item) | ||||||
|  |     { | ||||||
|  |         { | ||||||
|  |             auto state(state_.lock()); | ||||||
|  |             if (state->quit) | ||||||
|  |                 throw nix::Error("cannot enqueue download request because the download thread is shutting down"); | ||||||
|  |             state->incoming.push_back(item); | ||||||
|  |         } | ||||||
|  |         writeFull(wakeupPipe.writeSide.get(), " "); | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     std::future<DownloadResult> enqueueDownload(const DownloadRequest & request) override | ||||||
|  |     { | ||||||
|  |         auto item = std::make_shared<DownloadItem>(*this, request); | ||||||
|  |         enqueueItem(item); | ||||||
|  |         return item->promise.get_future(); | ||||||
|  |     } | ||||||
|  | }; | ||||||
|  | 
 | ||||||
|  | ref<Downloader> getDownloader() | ||||||
|  | { | ||||||
|  |     static std::shared_ptr<Downloader> downloader; | ||||||
|  |     static std::once_flag downloaderCreated; | ||||||
|  |     std::call_once(downloaderCreated, [&]() { downloader = makeDownloader(); }); | ||||||
|  |     return ref<Downloader>(downloader); | ||||||
|  | } | ||||||
|  | 
 | ||||||
| ref<Downloader> makeDownloader() | ref<Downloader> makeDownloader() | ||||||
| { | { | ||||||
|     return make_ref<CurlDownloader>(); |     return make_ref<CurlDownloader>(); | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | DownloadResult Downloader::download(const DownloadRequest & request) | ||||||
|  | { | ||||||
|  |     return enqueueDownload(request).get(); | ||||||
|  | } | ||||||
|  | 
 | ||||||
| Path Downloader::downloadCached(ref<Store> store, const string & url_, bool unpack, string name, const Hash & expectedHash, string * effectiveUrl) | Path Downloader::downloadCached(ref<Store> store, const string & url_, bool unpack, string name, const Hash & expectedHash, string * effectiveUrl) | ||||||
| { | { | ||||||
|     auto url = resolveUri(url_); |     auto url = resolveUri(url_); | ||||||
|  | @ -303,9 +511,9 @@ Path Downloader::downloadCached(ref<Store> store, const string & url_, bool unpa | ||||||
|     if (!skip) { |     if (!skip) { | ||||||
| 
 | 
 | ||||||
|         try { |         try { | ||||||
|             DownloadOptions options; |             DownloadRequest request(url); | ||||||
|             options.expectedETag = expectedETag; |             request.expectedETag = expectedETag; | ||||||
|             auto res = download(url, options); |             auto res = download(request); | ||||||
|             if (effectiveUrl) |             if (effectiveUrl) | ||||||
|                 *effectiveUrl = res.effectiveUrl; |                 *effectiveUrl = res.effectiveUrl; | ||||||
| 
 | 
 | ||||||
|  |  | ||||||
|  | @ -4,24 +4,30 @@ | ||||||
| #include "hash.hh" | #include "hash.hh" | ||||||
| 
 | 
 | ||||||
| #include <string> | #include <string> | ||||||
|  | #include <future> | ||||||
| 
 | 
 | ||||||
| namespace nix { | namespace nix { | ||||||
| 
 | 
 | ||||||
| struct DownloadOptions | struct DownloadRequest | ||||||
| { | { | ||||||
|  |     std::string uri; | ||||||
|     std::string expectedETag; |     std::string expectedETag; | ||||||
|     bool verifyTLS = true; |     bool verifyTLS = true; | ||||||
|     enum { yes, no, automatic } showProgress = yes; |     enum { yes, no, automatic } showProgress = yes; | ||||||
|     bool head = false; |     bool head = false; | ||||||
|     size_t tries = 1; |     size_t tries = 1; | ||||||
|     unsigned int baseRetryTimeMs = 100; |     unsigned int baseRetryTimeMs = 250; | ||||||
|  | 
 | ||||||
|  |     DownloadRequest(const std::string & uri) : uri(uri) { } | ||||||
| }; | }; | ||||||
| 
 | 
 | ||||||
| struct DownloadResult | struct DownloadResult | ||||||
| { | { | ||||||
|  |     enum Status { Success, NotFound, Forbidden, Misc, Transient }; | ||||||
|  |     Status status; | ||||||
|     bool cached; |     bool cached; | ||||||
|     string etag; |     std::string etag; | ||||||
|     string effectiveUrl; |     std::string effectiveUrl; | ||||||
|     std::shared_ptr<std::string> data; |     std::shared_ptr<std::string> data; | ||||||
| }; | }; | ||||||
| 
 | 
 | ||||||
|  | @ -29,14 +35,29 @@ class Store; | ||||||
| 
 | 
 | ||||||
| struct Downloader | struct Downloader | ||||||
| { | { | ||||||
|     virtual DownloadResult download(string url, const DownloadOptions & options) = 0; |     /* Enqueue a download request, returning a future to the result of
 | ||||||
|  |        the download. The future may throw a DownloadError | ||||||
|  |        exception. */ | ||||||
|  |     virtual std::future<DownloadResult> enqueueDownload(const DownloadRequest & request) = 0; | ||||||
| 
 | 
 | ||||||
|     Path downloadCached(ref<Store> store, const string & url, bool unpack, string name = "", |     /* Synchronously download a file. */ | ||||||
|         const Hash & expectedHash = Hash(), string * effectiveUrl = nullptr); |     DownloadResult download(const DownloadRequest & request); | ||||||
|  | 
 | ||||||
|  |     /* Check if the specified file is already in ~/.cache/nix/tarballs
 | ||||||
|  |        and is more recent than ‘tarball-ttl’ seconds. Otherwise, | ||||||
|  |        use the recorded ETag to verify if the server has a more | ||||||
|  |        recent version, and if so, download it to the Nix store. */ | ||||||
|  |     Path downloadCached(ref<Store> store, const string & uri, bool unpack, string name = "", | ||||||
|  |         const Hash & expectedHash = Hash(), string * effectiveUri = nullptr); | ||||||
| 
 | 
 | ||||||
|     enum Error { NotFound, Forbidden, Misc, Transient }; |     enum Error { NotFound, Forbidden, Misc, Transient }; | ||||||
| }; | }; | ||||||
| 
 | 
 | ||||||
|  | /* Return a shared Downloader object. Using this object is preferred
 | ||||||
|  |    because it enables connection reuse and HTTP/2 multiplexing. */ | ||||||
|  | ref<Downloader> getDownloader(); | ||||||
|  | 
 | ||||||
|  | /* Return a new Downloader object. */ | ||||||
| ref<Downloader> makeDownloader(); | ref<Downloader> makeDownloader(); | ||||||
| 
 | 
 | ||||||
| class DownloadError : public Error | class DownloadError : public Error | ||||||
|  |  | ||||||
|  | @ -13,17 +13,12 @@ private: | ||||||
| 
 | 
 | ||||||
|     Path cacheUri; |     Path cacheUri; | ||||||
| 
 | 
 | ||||||
|     Pool<Downloader> downloaders; |  | ||||||
| 
 |  | ||||||
| public: | public: | ||||||
| 
 | 
 | ||||||
|     HttpBinaryCacheStore( |     HttpBinaryCacheStore( | ||||||
|         const Params & params, const Path & _cacheUri) |         const Params & params, const Path & _cacheUri) | ||||||
|         : BinaryCacheStore(params) |         : BinaryCacheStore(params) | ||||||
|         , cacheUri(_cacheUri) |         , cacheUri(_cacheUri) | ||||||
|         , downloaders( |  | ||||||
|             std::numeric_limits<size_t>::max(), |  | ||||||
|             []() { return makeDownloader(); }) |  | ||||||
|     { |     { | ||||||
|         if (cacheUri.back() == '/') |         if (cacheUri.back() == '/') | ||||||
|             cacheUri.pop_back(); |             cacheUri.pop_back(); | ||||||
|  | @ -54,12 +49,11 @@ protected: | ||||||
|     bool fileExists(const std::string & path) override |     bool fileExists(const std::string & path) override | ||||||
|     { |     { | ||||||
|         try { |         try { | ||||||
|             auto downloader(downloaders.get()); |             DownloadRequest request(cacheUri + "/" + path); | ||||||
|             DownloadOptions options; |             request.showProgress = DownloadRequest::no; | ||||||
|             options.showProgress = DownloadOptions::no; |             request.head = true; | ||||||
|             options.head = true; |             request.tries = 5; | ||||||
|             options.tries = 5; |             getDownloader()->download(request); | ||||||
|             downloader->download(cacheUri + "/" + path, options); |  | ||||||
|             return true; |             return true; | ||||||
|         } catch (DownloadError & e) { |         } catch (DownloadError & e) { | ||||||
|             /* S3 buckets return 403 if a file doesn't exist and the
 |             /* S3 buckets return 403 if a file doesn't exist and the
 | ||||||
|  | @ -77,13 +71,11 @@ protected: | ||||||
| 
 | 
 | ||||||
|     std::shared_ptr<std::string> getFile(const std::string & path) override |     std::shared_ptr<std::string> getFile(const std::string & path) override | ||||||
|     { |     { | ||||||
|         auto downloader(downloaders.get()); |         DownloadRequest request(cacheUri + "/" + path); | ||||||
|         DownloadOptions options; |         request.showProgress = DownloadRequest::no; | ||||||
|         options.showProgress = DownloadOptions::no; |         request.tries = 8; | ||||||
|         options.tries = 5; |  | ||||||
|         options.baseRetryTimeMs = 1000; |  | ||||||
|         try { |         try { | ||||||
|             return downloader->download(cacheUri + "/" + path, options).data; |             return getDownloader()->download(request).data; | ||||||
|         } catch (DownloadError & e) { |         } catch (DownloadError & e) { | ||||||
|             if (e.error == Downloader::NotFound || e.error == Downloader::Forbidden) |             if (e.error == Downloader::NotFound || e.error == Downloader::Forbidden) | ||||||
|                 return 0; |                 return 0; | ||||||
|  |  | ||||||
|  | @ -85,7 +85,7 @@ static void update(const StringSet & channelNames) | ||||||
|         // got redirected in the process, so that we can grab the various parts of a nix channel
 |         // got redirected in the process, so that we can grab the various parts of a nix channel
 | ||||||
|         // definition from a consistent location if the redirect changes mid-download.
 |         // definition from a consistent location if the redirect changes mid-download.
 | ||||||
|         auto effectiveUrl = string{}; |         auto effectiveUrl = string{}; | ||||||
|         auto dl = makeDownloader(); |         auto dl = getDownloader(); | ||||||
|         auto filename = dl->downloadCached(store, url, false, "", Hash(), &effectiveUrl); |         auto filename = dl->downloadCached(store, url, false, "", Hash(), &effectiveUrl); | ||||||
|         url = chomp(std::move(effectiveUrl)); |         url = chomp(std::move(effectiveUrl)); | ||||||
| 
 | 
 | ||||||
|  | @ -114,10 +114,10 @@ static void update(const StringSet & channelNames) | ||||||
|         if (!unpacked) { |         if (!unpacked) { | ||||||
|             // The URL doesn't unpack directly, so let's try treating it like a full channel folder with files in it
 |             // The URL doesn't unpack directly, so let's try treating it like a full channel folder with files in it
 | ||||||
|             // Check if the channel advertises a binary cache.
 |             // Check if the channel advertises a binary cache.
 | ||||||
|             DownloadOptions opts; |             DownloadRequest request(url + "/binary-cache-url"); | ||||||
|             opts.showProgress = DownloadOptions::no; |             request.showProgress = DownloadRequest::no; | ||||||
|             try { |             try { | ||||||
|                 auto dlRes = dl->download(url + "/binary-cache-url", opts); |                 auto dlRes = dl->download(request); | ||||||
|                 extraAttrs = "binaryCacheURL = \"" + *dlRes.data + "\";"; |                 extraAttrs = "binaryCacheURL = \"" + *dlRes.data + "\";"; | ||||||
|             } catch (DownloadError & e) { |             } catch (DownloadError & e) { | ||||||
|             } |             } | ||||||
|  |  | ||||||
|  | @ -158,7 +158,7 @@ int main(int argc, char * * argv) | ||||||
|             auto actualUri = resolveMirrorUri(state, uri); |             auto actualUri = resolveMirrorUri(state, uri); | ||||||
| 
 | 
 | ||||||
|             /* Download the file. */ |             /* Download the file. */ | ||||||
|             auto result = makeDownloader()->download(actualUri, DownloadOptions()); |             auto result = getDownloader()->download(DownloadRequest(actualUri)); | ||||||
| 
 | 
 | ||||||
|             AutoDelete tmpDir(createTempDir(), true); |             AutoDelete tmpDir(createTempDir(), true); | ||||||
|             Path tmpFile = (Path) tmpDir + "/tmp"; |             Path tmpFile = (Path) tmpDir + "/tmp"; | ||||||
|  |  | ||||||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue