copyPaths(): Use queryValidPaths() to reduce SSH latency
This commit is contained in:
		
							parent
							
								
									91d67692cf
								
							
						
					
					
						commit
						c5b83d8913
					
				
					 10 changed files with 45 additions and 37 deletions
				
			
		|  | @ -252,10 +252,10 @@ connected: | |||
|         string line; | ||||
|         if (!getline(cin, line)) | ||||
|             throw Error("hook caller didn't send inputs"); | ||||
|         auto inputs = tokenizeString<std::list<string>>(line); | ||||
|         auto inputs = tokenizeString<PathSet>(line); | ||||
|         if (!getline(cin, line)) | ||||
|             throw Error("hook caller didn't send outputs"); | ||||
|         auto outputs = tokenizeString<Strings>(line); | ||||
|         auto outputs = tokenizeString<PathSet>(line); | ||||
|         AutoCloseFD uploadLock = openLockFile(currentLoad + "/" + hostName + ".upload-lock", true); | ||||
|         auto old = signal(SIGALRM, handleAlarm); | ||||
|         alarm(15 * 60); | ||||
|  | @ -269,11 +269,15 @@ connected: | |||
|         printError("building ‘%s’ on ‘%s’", drvPath, hostName); | ||||
|         sshStore->buildDerivation(drvPath, readDerivation(drvPath)); | ||||
| 
 | ||||
|         std::remove_if(outputs.begin(), outputs.end(), [=](const Path & path) { return store->isValidPath(path); }); | ||||
|         if (!outputs.empty()) { | ||||
|             setenv("NIX_HELD_LOCKS", concatStringsSep(" ", outputs).c_str(), 1); /* FIXME: ugly */ | ||||
|             copyPaths(ref<Store>(sshStore), store, outputs); | ||||
|         PathSet missing; | ||||
|         for (auto & path : outputs) | ||||
|             if (!store->isValidPath(path)) missing.insert(path); | ||||
| 
 | ||||
|         if (!missing.empty()) { | ||||
|             setenv("NIX_HELD_LOCKS", concatStringsSep(" ", missing).c_str(), 1); /* FIXME: ugly */ | ||||
|             copyPaths(ref<Store>(sshStore), store, missing); | ||||
|         } | ||||
| 
 | ||||
|         return; | ||||
|     }); | ||||
| } | ||||
|  |  | |||
|  | @ -226,6 +226,19 @@ struct LegacySSHStore : public Store | |||
|         out.insert(res.begin(), res.end()); | ||||
|     } | ||||
| 
 | ||||
|     PathSet queryValidPaths(const PathSet & paths, bool maybeSubstitute = false) override | ||||
|     { | ||||
|         auto conn(connections->get()); | ||||
| 
 | ||||
|         conn->to | ||||
|             << cmdQueryValidPaths | ||||
|             << false // lock
 | ||||
|             << maybeSubstitute | ||||
|             << paths; | ||||
|         conn->to.flush(); | ||||
| 
 | ||||
|         return readStorePaths<PathSet>(*this, conn->from); | ||||
|     } | ||||
| }; | ||||
| 
 | ||||
| static RegisterStoreImplementation regStore([]( | ||||
|  |  | |||
|  | @ -669,7 +669,7 @@ bool LocalStore::isValidPathUncached(const Path & path) | |||
| } | ||||
| 
 | ||||
| 
 | ||||
| PathSet LocalStore::queryValidPaths(const PathSet & paths) | ||||
| PathSet LocalStore::queryValidPaths(const PathSet & paths, bool maybeSubstitute) | ||||
| { | ||||
|     PathSet res; | ||||
|     for (auto & i : paths) | ||||
|  |  | |||
|  | @ -99,7 +99,7 @@ public: | |||
| 
 | ||||
|     bool isValidPathUncached(const Path & path) override; | ||||
| 
 | ||||
|     PathSet queryValidPaths(const PathSet & paths) override; | ||||
|     PathSet queryValidPaths(const PathSet & paths, bool maybeSubstitute = false) override; | ||||
| 
 | ||||
|     PathSet queryAllValidPaths() override; | ||||
| 
 | ||||
|  |  | |||
|  | @ -187,7 +187,7 @@ bool RemoteStore::isValidPathUncached(const Path & path) | |||
| } | ||||
| 
 | ||||
| 
 | ||||
| PathSet RemoteStore::queryValidPaths(const PathSet & paths) | ||||
| PathSet RemoteStore::queryValidPaths(const PathSet & paths, bool maybeSubstitute) | ||||
| { | ||||
|     auto conn(connections->get()); | ||||
|     if (GET_PROTOCOL_MINOR(conn->daemonVersion) < 12) { | ||||
|  |  | |||
|  | @ -28,7 +28,7 @@ public: | |||
| 
 | ||||
|     bool isValidPathUncached(const Path & path) override; | ||||
| 
 | ||||
|     PathSet queryValidPaths(const PathSet & paths) override; | ||||
|     PathSet queryValidPaths(const PathSet & paths, bool maybeSubstitute = false) override; | ||||
| 
 | ||||
|     PathSet queryAllValidPaths() override; | ||||
| 
 | ||||
|  |  | |||
|  | @ -377,7 +377,7 @@ void Store::queryPathInfo(const Path & storePath, | |||
| } | ||||
| 
 | ||||
| 
 | ||||
| PathSet Store::queryValidPaths(const PathSet & paths) | ||||
| PathSet Store::queryValidPaths(const PathSet & paths, bool maybeSubstitute) | ||||
| { | ||||
|     struct State | ||||
|     { | ||||
|  | @ -550,6 +550,8 @@ void copyClosure(ref<Store> srcStore, ref<Store> dstStore, | |||
|     for (auto & path : storePaths) | ||||
|         srcStore->computeFSClosure(path, closure); | ||||
| 
 | ||||
|     // FIXME: use copyStorePaths()
 | ||||
| 
 | ||||
|     PathSet valid = dstStore->queryValidPaths(closure); | ||||
| 
 | ||||
|     if (valid.size() == closure.size()) return; | ||||
|  | @ -791,35 +793,22 @@ std::list<ref<Store>> getDefaultSubstituters() | |||
| } | ||||
| 
 | ||||
| 
 | ||||
| void copyPaths(ref<Store> from, ref<Store> to, const Paths & storePaths, bool substitute) | ||||
| void copyPaths(ref<Store> from, ref<Store> to, const PathSet & storePaths, bool substitute) | ||||
| { | ||||
|     if (substitute) { | ||||
|         /* Filter out .drv files (we don't want to build anything). */ | ||||
|         PathSet paths2; | ||||
|         for (auto & path : storePaths) | ||||
|             if (!isDerivation(path)) paths2.insert(path); | ||||
|         unsigned long long downloadSize, narSize; | ||||
|         PathSet willBuild, willSubstitute, unknown; | ||||
|         to->queryMissing(PathSet(paths2.begin(), paths2.end()), | ||||
|             willBuild, willSubstitute, unknown, downloadSize, narSize); | ||||
|         /* FIXME: should use ensurePath(), but it only
 | ||||
|            does one path at a time. */ | ||||
|         if (!willSubstitute.empty()) | ||||
|             try { | ||||
|                 to->buildPaths(willSubstitute); | ||||
|             } catch (Error & e) { | ||||
|                 printMsg(lvlError, format("warning: %1%") % e.msg()); | ||||
|             } | ||||
|     } | ||||
|     PathSet valid = to->queryValidPaths(storePaths, substitute); | ||||
| 
 | ||||
|     PathSet missing; | ||||
|     for (auto & path : storePaths) | ||||
|         if (!valid.count(path)) missing.insert(path); | ||||
| 
 | ||||
|     std::string copiedLabel = "copied"; | ||||
| 
 | ||||
|     logger->setExpected(copiedLabel, storePaths.size()); | ||||
|     logger->setExpected(copiedLabel, missing.size()); | ||||
| 
 | ||||
|     ThreadPool pool; | ||||
| 
 | ||||
|     processGraph<Path>(pool, | ||||
|         PathSet(storePaths.begin(), storePaths.end()), | ||||
|         PathSet(missing.begin(), missing.end()), | ||||
| 
 | ||||
|         [&](const Path & storePath) { | ||||
|             if (to->isValidPath(storePath)) return PathSet(); | ||||
|  |  | |||
|  | @ -324,8 +324,10 @@ protected: | |||
| 
 | ||||
| public: | ||||
| 
 | ||||
|     /* Query which of the given paths is valid. */ | ||||
|     virtual PathSet queryValidPaths(const PathSet & paths); | ||||
|     /* Query which of the given paths is valid. Optionally, try to
 | ||||
|        substitute missing paths. */ | ||||
|     virtual PathSet queryValidPaths(const PathSet & paths, | ||||
|         bool maybeSubstitute = false); | ||||
| 
 | ||||
|     /* Query the set of all valid paths. Note that for some store
 | ||||
|        backends, the name part of store paths may be omitted | ||||
|  | @ -653,7 +655,7 @@ ref<Store> openStore(const std::string & uri = getEnv("NIX_REMOTE")); | |||
| ref<Store> openStore(const std::string & uri, const Store::Params & params); | ||||
| 
 | ||||
| 
 | ||||
| void copyPaths(ref<Store> from, ref<Store> to, const Paths & storePaths, bool substitute = false); | ||||
| void copyPaths(ref<Store> from, ref<Store> to, const PathSet & storePaths, bool substitute = false); | ||||
| 
 | ||||
| enum StoreType { | ||||
|     tDaemon, | ||||
|  |  | |||
|  | @ -58,6 +58,6 @@ int main(int argc, char ** argv) | |||
|         PathSet closure; | ||||
|         from->computeFSClosure(storePaths2, closure, false, includeOutputs); | ||||
| 
 | ||||
|         copyPaths(from, to, Paths(closure.begin(), closure.end()), useSubstitutes); | ||||
|         copyPaths(from, to, closure, useSubstitutes); | ||||
|     }); | ||||
| } | ||||
|  |  | |||
|  | @ -46,7 +46,7 @@ struct CmdCopy : StorePathsCommand | |||
|         ref<Store> srcStore = srcUri.empty() ? store : openStore(srcUri); | ||||
|         ref<Store> dstStore = dstUri.empty() ? store : openStore(dstUri); | ||||
| 
 | ||||
|         copyPaths(srcStore, dstStore, storePaths); | ||||
|         copyPaths(srcStore, dstStore, PathSet(storePaths.begin(), storePaths.end())); | ||||
|     } | ||||
| }; | ||||
| 
 | ||||
|  |  | |||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue