Simplify the callback mechanism
This commit is contained in:
		
							parent
							
								
									1672bcd230
								
							
						
					
					
						commit
						81ea8bd5ce
					
				
					 16 changed files with 152 additions and 180 deletions
				
			
		|  | @ -58,12 +58,13 @@ std::shared_ptr<std::string> BinaryCacheStore::getFile(const std::string & path) | ||||||
| { | { | ||||||
|     std::promise<std::shared_ptr<std::string>> promise; |     std::promise<std::shared_ptr<std::string>> promise; | ||||||
|     getFile(path, |     getFile(path, | ||||||
|         [&](std::shared_ptr<std::string> result) { |         {[&](std::future<std::shared_ptr<std::string>> result) { | ||||||
|             promise.set_value(result); |             try { | ||||||
|         }, |                 promise.set_value(result.get()); | ||||||
|         [&](std::exception_ptr exc) { |             } catch (...) { | ||||||
|             promise.set_exception(exc); |                 promise.set_exception(std::current_exception()); | ||||||
|         }); |             } | ||||||
|  |         }}); | ||||||
|     return promise.get_future().get(); |     return promise.get_future().get(); | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | @ -218,8 +219,7 @@ void BinaryCacheStore::narFromPath(const Path & storePath, Sink & sink) | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| void BinaryCacheStore::queryPathInfoUncached(const Path & storePath, | void BinaryCacheStore::queryPathInfoUncached(const Path & storePath, | ||||||
|         std::function<void(std::shared_ptr<ValidPathInfo>)> success, |     Callback<std::shared_ptr<ValidPathInfo>> callback) | ||||||
|         std::function<void(std::exception_ptr exc)> failure) |  | ||||||
| { | { | ||||||
|     auto uri = getUri(); |     auto uri = getUri(); | ||||||
|     auto act = std::make_shared<Activity>(*logger, lvlTalkative, actQueryPathInfo, |     auto act = std::make_shared<Activity>(*logger, lvlTalkative, actQueryPathInfo, | ||||||
|  | @ -229,17 +229,22 @@ void BinaryCacheStore::queryPathInfoUncached(const Path & storePath, | ||||||
|     auto narInfoFile = narInfoFileFor(storePath); |     auto narInfoFile = narInfoFileFor(storePath); | ||||||
| 
 | 
 | ||||||
|     getFile(narInfoFile, |     getFile(narInfoFile, | ||||||
|         [=](std::shared_ptr<std::string> data) { |         {[=](std::future<std::shared_ptr<std::string>> fut) { | ||||||
|             if (!data) return success(0); |             try { | ||||||
|  |                 auto data = fut.get(); | ||||||
| 
 | 
 | ||||||
|             stats.narInfoRead++; |                 if (!data) return callback(nullptr); | ||||||
| 
 | 
 | ||||||
|             callSuccess(success, failure, (std::shared_ptr<ValidPathInfo>) |                 stats.narInfoRead++; | ||||||
|                 std::make_shared<NarInfo>(*this, *data, narInfoFile)); |  | ||||||
| 
 | 
 | ||||||
|             (void) act; // force Activity into this lambda to ensure it stays alive
 |                 callback((std::shared_ptr<ValidPathInfo>) | ||||||
|         }, |                     std::make_shared<NarInfo>(*this, *data, narInfoFile)); | ||||||
|         failure); | 
 | ||||||
|  |                 (void) act; // force Activity into this lambda to ensure it stays alive
 | ||||||
|  |             } catch (...) { | ||||||
|  |                 callback.rethrow(); | ||||||
|  |             } | ||||||
|  |         }}); | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| Path BinaryCacheStore::addToStore(const string & name, const Path & srcPath, | Path BinaryCacheStore::addToStore(const string & name, const Path & srcPath, | ||||||
|  |  | ||||||
|  | @ -41,8 +41,7 @@ public: | ||||||
|     /* Return the contents of the specified file, or null if it
 |     /* Return the contents of the specified file, or null if it
 | ||||||
|        doesn't exist. */ |        doesn't exist. */ | ||||||
|     virtual void getFile(const std::string & path, |     virtual void getFile(const std::string & path, | ||||||
|         std::function<void(std::shared_ptr<std::string>)> success, |         Callback<std::shared_ptr<std::string>> callback) = 0; | ||||||
|         std::function<void(std::exception_ptr exc)> failure) = 0; |  | ||||||
| 
 | 
 | ||||||
|     std::shared_ptr<std::string> getFile(const std::string & path); |     std::shared_ptr<std::string> getFile(const std::string & path); | ||||||
| 
 | 
 | ||||||
|  | @ -71,8 +70,7 @@ public: | ||||||
|     { unsupported(); } |     { unsupported(); } | ||||||
| 
 | 
 | ||||||
|     void queryPathInfoUncached(const Path & path, |     void queryPathInfoUncached(const Path & path, | ||||||
|         std::function<void(std::shared_ptr<ValidPathInfo>)> success, |         Callback<std::shared_ptr<ValidPathInfo>> callback) override; | ||||||
|         std::function<void(std::exception_ptr exc)> failure) override; |  | ||||||
| 
 | 
 | ||||||
|     void queryReferrers(const Path & path, |     void queryReferrers(const Path & path, | ||||||
|         PathSet & referrers) override |         PathSet & referrers) override | ||||||
|  |  | ||||||
|  | @ -81,8 +81,7 @@ struct CurlDownloader : public Downloader | ||||||
|         DownloadResult result; |         DownloadResult result; | ||||||
|         Activity act; |         Activity act; | ||||||
|         bool done = false; // whether either the success or failure function has been called
 |         bool done = false; // whether either the success or failure function has been called
 | ||||||
|         std::function<void(const DownloadResult &)> success; |         Callback<DownloadResult> callback; | ||||||
|         std::function<void(std::exception_ptr exc)> failure; |  | ||||||
|         CURL * req = 0; |         CURL * req = 0; | ||||||
|         bool active = false; // whether the handle has been added to the multi object
 |         bool active = false; // whether the handle has been added to the multi object
 | ||||||
|         std::string status; |         std::string status; | ||||||
|  | @ -97,10 +96,13 @@ struct CurlDownloader : public Downloader | ||||||
| 
 | 
 | ||||||
|         std::string encoding; |         std::string encoding; | ||||||
| 
 | 
 | ||||||
|         DownloadItem(CurlDownloader & downloader, const DownloadRequest & request) |         DownloadItem(CurlDownloader & downloader, | ||||||
|  |             const DownloadRequest & request, | ||||||
|  |             Callback<DownloadResult> callback) | ||||||
|             : downloader(downloader) |             : downloader(downloader) | ||||||
|             , request(request) |             , request(request) | ||||||
|             , act(*logger, lvlTalkative, actDownload, fmt("downloading '%s'", request.uri), {request.uri}, request.parentAct) |             , act(*logger, lvlTalkative, actDownload, fmt("downloading '%s'", request.uri), {request.uri}, request.parentAct) | ||||||
|  |             , callback(callback) | ||||||
|         { |         { | ||||||
|             if (!request.expectedETag.empty()) |             if (!request.expectedETag.empty()) | ||||||
|                 requestHeaders = curl_slist_append(requestHeaders, ("If-None-Match: " + request.expectedETag).c_str()); |                 requestHeaders = curl_slist_append(requestHeaders, ("If-None-Match: " + request.expectedETag).c_str()); | ||||||
|  | @ -129,7 +131,7 @@ struct CurlDownloader : public Downloader | ||||||
|         { |         { | ||||||
|             assert(!done); |             assert(!done); | ||||||
|             done = true; |             done = true; | ||||||
|             callFailure(failure, std::make_exception_ptr(e)); |             callback.rethrow(std::make_exception_ptr(e)); | ||||||
|         } |         } | ||||||
| 
 | 
 | ||||||
|         size_t writeCallback(void * contents, size_t size, size_t nmemb) |         size_t writeCallback(void * contents, size_t size, size_t nmemb) | ||||||
|  | @ -316,11 +318,11 @@ struct CurlDownloader : public Downloader | ||||||
|                 try { |                 try { | ||||||
|                     if (request.decompress) |                     if (request.decompress) | ||||||
|                         result.data = decodeContent(encoding, ref<std::string>(result.data)); |                         result.data = decodeContent(encoding, ref<std::string>(result.data)); | ||||||
|                     callSuccess(success, failure, const_cast<const DownloadResult &>(result)); |  | ||||||
|                     act.progress(result.data->size(), result.data->size()); |                     act.progress(result.data->size(), result.data->size()); | ||||||
|  |                     callback(std::move(result)); | ||||||
|                 } catch (...) { |                 } catch (...) { | ||||||
|                     done = true; |                     done = true; | ||||||
|                     callFailure(failure, std::current_exception()); |                     callback.rethrow(); | ||||||
|                 } |                 } | ||||||
|             } else { |             } else { | ||||||
|                 // We treat most errors as transient, but won't retry when hopeless
 |                 // We treat most errors as transient, but won't retry when hopeless
 | ||||||
|  | @ -570,13 +572,12 @@ struct CurlDownloader : public Downloader | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     void enqueueDownload(const DownloadRequest & request, |     void enqueueDownload(const DownloadRequest & request, | ||||||
|         std::function<void(const DownloadResult &)> success, |         Callback<DownloadResult> callback) override | ||||||
|         std::function<void(std::exception_ptr exc)> failure) override |  | ||||||
|     { |     { | ||||||
|         /* Ugly hack to support s3:// URIs. */ |         /* Ugly hack to support s3:// URIs. */ | ||||||
|         if (hasPrefix(request.uri, "s3://")) { |         if (hasPrefix(request.uri, "s3://")) { | ||||||
|             // FIXME: do this on a worker thread
 |             // FIXME: do this on a worker thread
 | ||||||
|             sync2async<DownloadResult>(success, failure, [&]() -> DownloadResult { |             try { | ||||||
| #ifdef ENABLE_S3 | #ifdef ENABLE_S3 | ||||||
|                 S3Helper s3Helper("", Aws::Region::US_EAST_1); // FIXME: make configurable
 |                 S3Helper s3Helper("", Aws::Region::US_EAST_1); // FIXME: make configurable
 | ||||||
|                 auto slash = request.uri.find('/', 5); |                 auto slash = request.uri.find('/', 5); | ||||||
|  | @ -590,18 +591,15 @@ struct CurlDownloader : public Downloader | ||||||
|                 if (!s3Res.data) |                 if (!s3Res.data) | ||||||
|                     throw DownloadError(NotFound, fmt("S3 object '%s' does not exist", request.uri)); |                     throw DownloadError(NotFound, fmt("S3 object '%s' does not exist", request.uri)); | ||||||
|                 res.data = s3Res.data; |                 res.data = s3Res.data; | ||||||
|                 return res; |                 callback(std::move(res)); | ||||||
| #else | #else | ||||||
|                 throw nix::Error("cannot download '%s' because Nix is not built with S3 support", request.uri); |                 throw nix::Error("cannot download '%s' because Nix is not built with S3 support", request.uri); | ||||||
| #endif | #endif | ||||||
|             }); |             } catch (...) { callback.rethrow(); } | ||||||
|             return; |             return; | ||||||
|         } |         } | ||||||
| 
 | 
 | ||||||
|         auto item = std::make_shared<DownloadItem>(*this, request); |         enqueueItem(std::make_shared<DownloadItem>(*this, request, callback)); | ||||||
|         item->success = success; |  | ||||||
|         item->failure = failure; |  | ||||||
|         enqueueItem(item); |  | ||||||
|     } |     } | ||||||
| }; | }; | ||||||
| 
 | 
 | ||||||
|  | @ -622,8 +620,13 @@ std::future<DownloadResult> Downloader::enqueueDownload(const DownloadRequest & | ||||||
| { | { | ||||||
|     auto promise = std::make_shared<std::promise<DownloadResult>>(); |     auto promise = std::make_shared<std::promise<DownloadResult>>(); | ||||||
|     enqueueDownload(request, |     enqueueDownload(request, | ||||||
|         [promise](const DownloadResult & result) { promise->set_value(result); }, |         {[promise](std::future<DownloadResult> fut) { | ||||||
|         [promise](std::exception_ptr exc) { promise->set_exception(exc); }); |             try { | ||||||
|  |                 promise->set_value(fut.get()); | ||||||
|  |             } catch (...) { | ||||||
|  |                 promise->set_exception(std::current_exception()); | ||||||
|  |             } | ||||||
|  |         }}); | ||||||
|     return promise->get_future(); |     return promise->get_future(); | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  |  | ||||||
|  | @ -42,8 +42,7 @@ struct Downloader | ||||||
|        the download. The future may throw a DownloadError |        the download. The future may throw a DownloadError | ||||||
|        exception. */ |        exception. */ | ||||||
|     virtual void enqueueDownload(const DownloadRequest & request, |     virtual void enqueueDownload(const DownloadRequest & request, | ||||||
|         std::function<void(const DownloadResult &)> success, |         Callback<DownloadResult> callback) = 0; | ||||||
|         std::function<void(std::exception_ptr exc)> failure) = 0; |  | ||||||
| 
 | 
 | ||||||
|     std::future<DownloadResult> enqueueDownload(const DownloadRequest & request); |     std::future<DownloadResult> enqueueDownload(const DownloadRequest & request); | ||||||
| 
 | 
 | ||||||
|  |  | ||||||
|  | @ -78,27 +78,23 @@ protected: | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     void getFile(const std::string & path, |     void getFile(const std::string & path, | ||||||
|         std::function<void(std::shared_ptr<std::string>)> success, |         Callback<std::shared_ptr<std::string>> callback) override | ||||||
|         std::function<void(std::exception_ptr exc)> failure) override |  | ||||||
|     { |     { | ||||||
|         DownloadRequest request(cacheUri + "/" + path); |         DownloadRequest request(cacheUri + "/" + path); | ||||||
|         request.tries = 8; |         request.tries = 8; | ||||||
| 
 | 
 | ||||||
|         getDownloader()->enqueueDownload(request, |         getDownloader()->enqueueDownload(request, | ||||||
|             [success](const DownloadResult & result) { |             {[callback](std::future<DownloadResult> result) { | ||||||
|                 success(result.data); |  | ||||||
|             }, |  | ||||||
|             [success, failure](std::exception_ptr exc) { |  | ||||||
|                 try { |                 try { | ||||||
|                     std::rethrow_exception(exc); |                     callback(result.get().data); | ||||||
|                 } catch (DownloadError & e) { |                 } catch (DownloadError & e) { | ||||||
|                     if (e.error == Downloader::NotFound || e.error == Downloader::Forbidden) |                     if (e.error == Downloader::NotFound || e.error == Downloader::Forbidden) | ||||||
|                         return success(0); |                         return callback(std::shared_ptr<std::string>()); | ||||||
|                     failure(exc); |                     callback.rethrow(); | ||||||
|                 } catch (...) { |                 } catch (...) { | ||||||
|                     failure(exc); |                     callback.rethrow(); | ||||||
|                 } |                 } | ||||||
|             }); |             }}); | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
| }; | }; | ||||||
|  |  | ||||||
|  | @ -84,10 +84,9 @@ struct LegacySSHStore : public Store | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     void queryPathInfoUncached(const Path & path, |     void queryPathInfoUncached(const Path & path, | ||||||
|         std::function<void(std::shared_ptr<ValidPathInfo>)> success, |         Callback<std::shared_ptr<ValidPathInfo>> callback) override | ||||||
|         std::function<void(std::exception_ptr exc)> failure) override |  | ||||||
|     { |     { | ||||||
|         sync2async<std::shared_ptr<ValidPathInfo>>(success, failure, [&]() -> std::shared_ptr<ValidPathInfo> { |         try { | ||||||
|             auto conn(connections->get()); |             auto conn(connections->get()); | ||||||
| 
 | 
 | ||||||
|             debug("querying remote host '%s' for info on '%s'", host, path); |             debug("querying remote host '%s' for info on '%s'", host, path); | ||||||
|  | @ -97,7 +96,7 @@ struct LegacySSHStore : public Store | ||||||
| 
 | 
 | ||||||
|             auto info = std::make_shared<ValidPathInfo>(); |             auto info = std::make_shared<ValidPathInfo>(); | ||||||
|             conn->from >> info->path; |             conn->from >> info->path; | ||||||
|             if (info->path.empty()) return nullptr; |             if (info->path.empty()) return callback(nullptr); | ||||||
|             assert(path == info->path); |             assert(path == info->path); | ||||||
| 
 | 
 | ||||||
|             PathSet references; |             PathSet references; | ||||||
|  | @ -116,8 +115,8 @@ struct LegacySSHStore : public Store | ||||||
|             auto s = readString(conn->from); |             auto s = readString(conn->from); | ||||||
|             assert(s == ""); |             assert(s == ""); | ||||||
| 
 | 
 | ||||||
|             return info; |             callback(std::move(info)); | ||||||
|         }); |         } catch (...) { callback.rethrow(); } | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     void addToStore(const ValidPathInfo & info, Source & source, |     void addToStore(const ValidPathInfo & info, Source & source, | ||||||
|  |  | ||||||
|  | @ -35,17 +35,14 @@ protected: | ||||||
|         const std::string & mimeType) override; |         const std::string & mimeType) override; | ||||||
| 
 | 
 | ||||||
|     void getFile(const std::string & path, |     void getFile(const std::string & path, | ||||||
|         std::function<void(std::shared_ptr<std::string>)> success, |         Callback<std::shared_ptr<std::string>> callback) override | ||||||
|         std::function<void(std::exception_ptr exc)> failure) override |  | ||||||
|     { |     { | ||||||
|         sync2async<std::shared_ptr<std::string>>(success, failure, [&]() { |         try { | ||||||
|             try { |             // FIXME: O(n) space
 | ||||||
|                 return std::make_shared<std::string>(readFile(binaryCacheDir + "/" + path)); |             callback(std::make_shared<std::string>(readFile(binaryCacheDir + "/" + path))); | ||||||
|             } catch (SysError & e) { |         } catch (SysError & e) { | ||||||
|                 if (e.errNo == ENOENT) return std::shared_ptr<std::string>(); |             if (e.errNo == ENOENT) callback(nullptr); else callback.rethrow(); | ||||||
|                 throw; |         } catch (...) { callback.rethrow(); } | ||||||
|             } |  | ||||||
|         }); |  | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     PathSet queryAllValidPaths() override |     PathSet queryAllValidPaths() override | ||||||
|  |  | ||||||
|  | @ -629,17 +629,15 @@ uint64_t LocalStore::addValidPath(State & state, | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| void LocalStore::queryPathInfoUncached(const Path & path, | void LocalStore::queryPathInfoUncached(const Path & path, | ||||||
|     std::function<void(std::shared_ptr<ValidPathInfo>)> success, |     Callback<std::shared_ptr<ValidPathInfo>> callback) | ||||||
|     std::function<void(std::exception_ptr exc)> failure) |  | ||||||
| { | { | ||||||
|     sync2async<std::shared_ptr<ValidPathInfo>>(success, failure, [&]() { |     try { | ||||||
| 
 |  | ||||||
|         auto info = std::make_shared<ValidPathInfo>(); |         auto info = std::make_shared<ValidPathInfo>(); | ||||||
|         info->path = path; |         info->path = path; | ||||||
| 
 | 
 | ||||||
|         assertStorePath(path); |         assertStorePath(path); | ||||||
| 
 | 
 | ||||||
|         return retrySQLite<std::shared_ptr<ValidPathInfo>>([&]() { |         callback(retrySQLite<std::shared_ptr<ValidPathInfo>>([&]() { | ||||||
|             auto state(_state.lock()); |             auto state(_state.lock()); | ||||||
| 
 | 
 | ||||||
|             /* Get the path info. */ |             /* Get the path info. */ | ||||||
|  | @ -679,8 +677,9 @@ void LocalStore::queryPathInfoUncached(const Path & path, | ||||||
|                 info->references.insert(useQueryReferences.getStr(0)); |                 info->references.insert(useQueryReferences.getStr(0)); | ||||||
| 
 | 
 | ||||||
|             return info; |             return info; | ||||||
|         }); |         })); | ||||||
|     }); | 
 | ||||||
|  |     } catch (...) { callback.rethrow(); } | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
|  |  | ||||||
|  | @ -127,8 +127,7 @@ public: | ||||||
|     PathSet queryAllValidPaths() override; |     PathSet queryAllValidPaths() override; | ||||||
| 
 | 
 | ||||||
|     void queryPathInfoUncached(const Path & path, |     void queryPathInfoUncached(const Path & path, | ||||||
|         std::function<void(std::shared_ptr<ValidPathInfo>)> success, |         Callback<std::shared_ptr<ValidPathInfo>> callback) override; | ||||||
|         std::function<void(std::exception_ptr exc)> failure) override; |  | ||||||
| 
 | 
 | ||||||
|     void queryReferrers(const Path & path, PathSet & referrers) override; |     void queryReferrers(const Path & path, PathSet & referrers) override; | ||||||
| 
 | 
 | ||||||
|  |  | ||||||
|  | @ -33,9 +33,11 @@ void Store::computeFSClosure(const PathSet & startPaths, | ||||||
|             state->pending++; |             state->pending++; | ||||||
|         } |         } | ||||||
| 
 | 
 | ||||||
|         queryPathInfo(path, |         queryPathInfo(path, {[&, path](std::future<ref<ValidPathInfo>> fut) { | ||||||
|             [&, path](ref<ValidPathInfo> info) { |             // FIXME: calls to isValidPath() should be async
 | ||||||
|                 // FIXME: calls to isValidPath() should be async
 | 
 | ||||||
|  |             try { | ||||||
|  |                 auto info = fut.get(); | ||||||
| 
 | 
 | ||||||
|                 if (flipDirection) { |                 if (flipDirection) { | ||||||
| 
 | 
 | ||||||
|  | @ -75,14 +77,13 @@ void Store::computeFSClosure(const PathSet & startPaths, | ||||||
|                     if (!--state->pending) done.notify_one(); |                     if (!--state->pending) done.notify_one(); | ||||||
|                 } |                 } | ||||||
| 
 | 
 | ||||||
|             }, |             } catch (...) { | ||||||
| 
 |  | ||||||
|             [&, path](std::exception_ptr exc) { |  | ||||||
|                 auto state(state_.lock()); |                 auto state(state_.lock()); | ||||||
|                 if (!state->exc) state->exc = exc; |                 if (!state->exc) state->exc = std::current_exception(); | ||||||
|                 assert(state->pending); |                 assert(state->pending); | ||||||
|                 if (!--state->pending) done.notify_one(); |                 if (!--state->pending) done.notify_one(); | ||||||
|             }); |             }; | ||||||
|  |         }}); | ||||||
|     }; |     }; | ||||||
| 
 | 
 | ||||||
|     for (auto & startPath : startPaths) |     for (auto & startPath : startPaths) | ||||||
|  |  | ||||||
|  | @ -294,10 +294,9 @@ void RemoteStore::querySubstitutablePathInfos(const PathSet & paths, | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| void RemoteStore::queryPathInfoUncached(const Path & path, | void RemoteStore::queryPathInfoUncached(const Path & path, | ||||||
|     std::function<void(std::shared_ptr<ValidPathInfo>)> success, |     Callback<std::shared_ptr<ValidPathInfo>> callback) | ||||||
|     std::function<void(std::exception_ptr exc)> failure) |  | ||||||
| { | { | ||||||
|     sync2async<std::shared_ptr<ValidPathInfo>>(success, failure, [&]() { |     try { | ||||||
|         auto conn(connections->get()); |         auto conn(connections->get()); | ||||||
|         conn->to << wopQueryPathInfo << path; |         conn->to << wopQueryPathInfo << path; | ||||||
|         try { |         try { | ||||||
|  | @ -324,8 +323,8 @@ void RemoteStore::queryPathInfoUncached(const Path & path, | ||||||
|             info->sigs = readStrings<StringSet>(conn->from); |             info->sigs = readStrings<StringSet>(conn->from); | ||||||
|             conn->from >> info->ca; |             conn->from >> info->ca; | ||||||
|         } |         } | ||||||
|         return info; |         callback(std::move(info)); | ||||||
|     }); |     } catch (...) { callback.rethrow(); } | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
|  |  | ||||||
|  | @ -40,8 +40,7 @@ public: | ||||||
|     PathSet queryAllValidPaths() override; |     PathSet queryAllValidPaths() override; | ||||||
| 
 | 
 | ||||||
|     void queryPathInfoUncached(const Path & path, |     void queryPathInfoUncached(const Path & path, | ||||||
|         std::function<void(std::shared_ptr<ValidPathInfo>)> success, |         Callback<std::shared_ptr<ValidPathInfo>> callback) override; | ||||||
|         std::function<void(std::exception_ptr exc)> failure) override; |  | ||||||
| 
 | 
 | ||||||
|     void queryReferrers(const Path & path, PathSet & referrers) override; |     void queryReferrers(const Path & path, PathSet & referrers) override; | ||||||
| 
 | 
 | ||||||
|  |  | ||||||
|  | @ -365,10 +365,9 @@ struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     void getFile(const std::string & path, |     void getFile(const std::string & path, | ||||||
|         std::function<void(std::shared_ptr<std::string>)> success, |         Callback<std::shared_ptr<std::string>> callback) override | ||||||
|         std::function<void(std::exception_ptr exc)> failure) override |  | ||||||
|     { |     { | ||||||
|         sync2async<std::shared_ptr<std::string>>(success, failure, [&]() { |         try { | ||||||
|             stats.get++; |             stats.get++; | ||||||
| 
 | 
 | ||||||
|             auto res = s3Helper.getObject(bucketName, path); |             auto res = s3Helper.getObject(bucketName, path); | ||||||
|  | @ -380,8 +379,8 @@ struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore | ||||||
|                 printTalkative("downloaded 's3://%s/%s' (%d bytes) in %d ms", |                 printTalkative("downloaded 's3://%s/%s' (%d bytes) in %d ms", | ||||||
|                     bucketName, path, res.data->size(), res.durationMs); |                     bucketName, path, res.data->size(), res.durationMs); | ||||||
| 
 | 
 | ||||||
|             return res.data; |             callback(std::move(res.data)); | ||||||
|         }); |         } catch (...) { callback.rethrow(); } | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     PathSet queryAllValidPaths() override |     PathSet queryAllValidPaths() override | ||||||
|  |  | ||||||
|  | @ -305,20 +305,20 @@ ref<const ValidPathInfo> Store::queryPathInfo(const Path & storePath) | ||||||
|     std::promise<ref<ValidPathInfo>> promise; |     std::promise<ref<ValidPathInfo>> promise; | ||||||
| 
 | 
 | ||||||
|     queryPathInfo(storePath, |     queryPathInfo(storePath, | ||||||
|         [&](ref<ValidPathInfo> info) { |         {[&](std::future<ref<ValidPathInfo>> result) { | ||||||
|             promise.set_value(info); |             try { | ||||||
|         }, |                 promise.set_value(result.get()); | ||||||
|         [&](std::exception_ptr exc) { |             } catch (...) { | ||||||
|             promise.set_exception(exc); |                 promise.set_exception(std::current_exception()); | ||||||
|         }); |             } | ||||||
|  |         }}); | ||||||
| 
 | 
 | ||||||
|     return promise.get_future().get(); |     return promise.get_future().get(); | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| void Store::queryPathInfo(const Path & storePath, | void Store::queryPathInfo(const Path & storePath, | ||||||
|     std::function<void(ref<ValidPathInfo>)> success, |     Callback<ref<ValidPathInfo>> callback) | ||||||
|     std::function<void(std::exception_ptr exc)> failure) |  | ||||||
| { | { | ||||||
|     auto hashPart = storePathToHash(storePath); |     auto hashPart = storePathToHash(storePath); | ||||||
| 
 | 
 | ||||||
|  | @ -330,7 +330,7 @@ void Store::queryPathInfo(const Path & storePath, | ||||||
|                 stats.narInfoReadAverted++; |                 stats.narInfoReadAverted++; | ||||||
|                 if (!*res) |                 if (!*res) | ||||||
|                     throw InvalidPath(format("path '%s' is not valid") % storePath); |                     throw InvalidPath(format("path '%s' is not valid") % storePath); | ||||||
|                 return success(ref<ValidPathInfo>(*res)); |                 return callback(ref<ValidPathInfo>(*res)); | ||||||
|             } |             } | ||||||
|         } |         } | ||||||
| 
 | 
 | ||||||
|  | @ -346,35 +346,36 @@ void Store::queryPathInfo(const Path & storePath, | ||||||
|                         (res.second->path != storePath && storePathToName(storePath) != "")) |                         (res.second->path != storePath && storePathToName(storePath) != "")) | ||||||
|                         throw InvalidPath(format("path '%s' is not valid") % storePath); |                         throw InvalidPath(format("path '%s' is not valid") % storePath); | ||||||
|                 } |                 } | ||||||
|                 return success(ref<ValidPathInfo>(res.second)); |                 return callback(ref<ValidPathInfo>(res.second)); | ||||||
|             } |             } | ||||||
|         } |         } | ||||||
| 
 | 
 | ||||||
|     } catch (std::exception & e) { |     } catch (...) { return callback.rethrow(); } | ||||||
|         return callFailure(failure); |  | ||||||
|     } |  | ||||||
| 
 | 
 | ||||||
|     queryPathInfoUncached(storePath, |     queryPathInfoUncached(storePath, | ||||||
|         [this, storePath, hashPart, success, failure](std::shared_ptr<ValidPathInfo> info) { |         {[this, storePath, hashPart, callback](std::future<std::shared_ptr<ValidPathInfo>> fut) { | ||||||
| 
 | 
 | ||||||
|             if (diskCache) |             try { | ||||||
|                 diskCache->upsertNarInfo(getUri(), hashPart, info); |                 auto info = fut.get(); | ||||||
| 
 | 
 | ||||||
|             { |                 if (diskCache) | ||||||
|                 auto state_(state.lock()); |                     diskCache->upsertNarInfo(getUri(), hashPart, info); | ||||||
|                 state_->pathInfoCache.upsert(hashPart, info); |  | ||||||
|             } |  | ||||||
| 
 | 
 | ||||||
|             if (!info |                 { | ||||||
|                 || (info->path != storePath && storePathToName(storePath) != "")) |                     auto state_(state.lock()); | ||||||
|             { |                     state_->pathInfoCache.upsert(hashPart, info); | ||||||
|                 stats.narInfoMissing++; |                 } | ||||||
|                 return failure(std::make_exception_ptr(InvalidPath(format("path '%s' is not valid") % storePath))); |  | ||||||
|             } |  | ||||||
| 
 | 
 | ||||||
|             callSuccess(success, failure, ref<ValidPathInfo>(info)); |                 if (!info | ||||||
|  |                     || (info->path != storePath && storePathToName(storePath) != "")) | ||||||
|  |                 { | ||||||
|  |                     stats.narInfoMissing++; | ||||||
|  |                     throw InvalidPath("path '%s' is not valid", storePath); | ||||||
|  |                 } | ||||||
| 
 | 
 | ||||||
|         }, failure); |                 callback(ref<ValidPathInfo>(info)); | ||||||
|  |             } catch (...) { callback.rethrow(); } | ||||||
|  |         }}); | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
|  | @ -394,26 +395,19 @@ PathSet Store::queryValidPaths(const PathSet & paths, SubstituteFlag maybeSubsti | ||||||
| 
 | 
 | ||||||
|     auto doQuery = [&](const Path & path ) { |     auto doQuery = [&](const Path & path ) { | ||||||
|         checkInterrupt(); |         checkInterrupt(); | ||||||
|         queryPathInfo(path, |         queryPathInfo(path, {[path, &state_, &wakeup](std::future<ref<ValidPathInfo>> fut) { | ||||||
|             [path, &state_, &wakeup](ref<ValidPathInfo> info) { |             auto state(state_.lock()); | ||||||
|                 auto state(state_.lock()); |             try { | ||||||
|  |                 auto info = fut.get(); | ||||||
|                 state->valid.insert(path); |                 state->valid.insert(path); | ||||||
|                 assert(state->left); |             } catch (InvalidPath &) { | ||||||
|                 if (!--state->left) |             } catch (...) { | ||||||
|                     wakeup.notify_one(); |                 state->exc = std::current_exception(); | ||||||
|             }, |             } | ||||||
|             [path, &state_, &wakeup](std::exception_ptr exc) { |             assert(state->left); | ||||||
|                 auto state(state_.lock()); |             if (!--state->left) | ||||||
|                 try { |                 wakeup.notify_one(); | ||||||
|                     std::rethrow_exception(exc); |         }}); | ||||||
|                 } catch (InvalidPath &) { |  | ||||||
|                 } catch (...) { |  | ||||||
|                     state->exc = exc; |  | ||||||
|                 } |  | ||||||
|                 assert(state->left); |  | ||||||
|                 if (!--state->left) |  | ||||||
|                     wakeup.notify_one(); |  | ||||||
|             }); |  | ||||||
|     }; |     }; | ||||||
| 
 | 
 | ||||||
|     for (auto & path : paths) |     for (auto & path : paths) | ||||||
|  |  | ||||||
|  | @ -355,14 +355,12 @@ public: | ||||||
| 
 | 
 | ||||||
|     /* Asynchronous version of queryPathInfo(). */ |     /* Asynchronous version of queryPathInfo(). */ | ||||||
|     void queryPathInfo(const Path & path, |     void queryPathInfo(const Path & path, | ||||||
|         std::function<void(ref<ValidPathInfo>)> success, |         Callback<ref<ValidPathInfo>> callback); | ||||||
|         std::function<void(std::exception_ptr exc)> failure); |  | ||||||
| 
 | 
 | ||||||
| protected: | protected: | ||||||
| 
 | 
 | ||||||
|     virtual void queryPathInfoUncached(const Path & path, |     virtual void queryPathInfoUncached(const Path & path, | ||||||
|         std::function<void(std::shared_ptr<ValidPathInfo>)> success, |         Callback<std::shared_ptr<ValidPathInfo>> callback) = 0; | ||||||
|         std::function<void(std::exception_ptr exc)> failure) = 0; |  | ||||||
| 
 | 
 | ||||||
| public: | public: | ||||||
| 
 | 
 | ||||||
|  |  | ||||||
|  | @ -15,6 +15,7 @@ | ||||||
| #include <map> | #include <map> | ||||||
| #include <sstream> | #include <sstream> | ||||||
| #include <experimental/optional> | #include <experimental/optional> | ||||||
|  | #include <future> | ||||||
| 
 | 
 | ||||||
| #ifndef HAVE_STRUCT_DIRENT_D_TYPE | #ifndef HAVE_STRUCT_DIRENT_D_TYPE | ||||||
| #define DT_UNKNOWN 0 | #define DT_UNKNOWN 0 | ||||||
|  | @ -424,44 +425,30 @@ string get(const T & map, const string & key, const string & def = "") | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| /* Call ‘failure’ with the current exception as argument. If ‘failure’
 | /* A callback is a wrapper around a lambda that accepts a valid of
 | ||||||
|    throws an exception, abort the program. */ |    type T or an exception. (We abuse std::future<T> to pass the value or | ||||||
| void callFailure(const std::function<void(std::exception_ptr exc)> & failure, |    exception.) */ | ||||||
|     std::exception_ptr exc = std::current_exception()); | template<typename T> | ||||||
| 
 | struct Callback | ||||||
| 
 |  | ||||||
| /* Evaluate the function ‘f’. If it returns a value, call ‘success’
 |  | ||||||
|    with that value as its argument. If it or ‘success’ throws an |  | ||||||
|    exception, call ‘failure’. If ‘failure’ throws an exception, abort |  | ||||||
|    the program. */ |  | ||||||
| template<class T> |  | ||||||
| void sync2async( |  | ||||||
|     const std::function<void(T)> & success, |  | ||||||
|     const std::function<void(std::exception_ptr exc)> & failure, |  | ||||||
|     const std::function<T()> & f) |  | ||||||
| { | { | ||||||
|     try { |     std::function<void(std::future<T>)> fun; | ||||||
|         success(f()); |  | ||||||
|     } catch (...) { |  | ||||||
|         callFailure(failure); |  | ||||||
|     } |  | ||||||
| } |  | ||||||
| 
 | 
 | ||||||
|  |     Callback(std::function<void(std::future<T>)> fun) : fun(fun) { } | ||||||
| 
 | 
 | ||||||
| /* Call the function ‘success’. If it throws an exception, call
 |     void operator()(T && t) const | ||||||
|    ‘failure’. If that throws an exception, abort the program. */ |     { | ||||||
| template<class T> |         std::promise<T> promise; | ||||||
| void callSuccess( |         promise.set_value(std::move(t)); | ||||||
|     const std::function<void(T)> & success, |         fun(promise.get_future()); | ||||||
|     const std::function<void(std::exception_ptr exc)> & failure, |  | ||||||
|     T && arg) |  | ||||||
| { |  | ||||||
|     try { |  | ||||||
|         success(arg); |  | ||||||
|     } catch (...) { |  | ||||||
|         callFailure(failure); |  | ||||||
|     } |     } | ||||||
| } | 
 | ||||||
|  |     void rethrow(const std::exception_ptr & exc = std::current_exception()) const | ||||||
|  |     { | ||||||
|  |         std::promise<T> promise; | ||||||
|  |         promise.set_exception(exc); | ||||||
|  |         fun(promise.get_future()); | ||||||
|  |     } | ||||||
|  | }; | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| /* Start a thread that handles various signals. Also block those signals
 | /* Start a thread that handles various signals. Also block those signals
 | ||||||
|  |  | ||||||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue