Reduce substitution memory consumption
copyStorePath() now pipes the output of srcStore->narFromPath() directly into dstStore->addToStore(). The sink used by the former is converted into a source usable by the latter using boost::coroutine2. This is based on [1]. This reduces the maximum resident size of $ nix build --store ~/my-nix/ /nix/store/b0zlxla7dmy1iwc3g459rjznx59797xy-binutils-2.28.1 --substituters file:///tmp/binary-cache-xz/ --no-require-sigs from 418592 KiB to 53416 KiB. (The previous commit also reduced the runtime from ~4.2s to ~3.4s, not sure why.) A further improvement will be to download files into a Sink. [1] https://github.com/NixOS/nix/compare/master...Mathnerd314:dump-fix-coroutine#diff-dcbcac55a634031f9cc73707da6e4b18 Issue #1969.
This commit is contained in:
		
							parent
							
								
									3e6b194d78
								
							
						
					
					
						commit
						48662d151b
					
				
					 8 changed files with 145 additions and 37 deletions
				
			
		|  | @ -964,20 +964,11 @@ void LocalStore::invalidatePath(State & state, const Path & path) | |||
| } | ||||
| 
 | ||||
| 
 | ||||
| void LocalStore::addToStore(const ValidPathInfo & info, const ref<std::string> & nar, | ||||
| void LocalStore::addToStore(const ValidPathInfo & info, Source & source, | ||||
|     RepairFlag repair, CheckSigsFlag checkSigs, std::shared_ptr<FSAccessor> accessor) | ||||
| { | ||||
|     assert(info.narHash); | ||||
| 
 | ||||
|     Hash h = hashString(htSHA256, *nar); | ||||
|     if (h != info.narHash) | ||||
|         throw Error("hash mismatch importing path '%s'; expected hash '%s', got '%s'", | ||||
|             info.path, info.narHash.to_string(), h.to_string()); | ||||
| 
 | ||||
|     if (nar->size() != info.narSize) | ||||
|         throw Error("size mismatch importing path '%s'; expected %s, got %s", | ||||
|             info.path, info.narSize, nar->size()); | ||||
| 
 | ||||
|     if (requireSigs && checkSigs && !info.checkSignatures(*this, publicKeys)) | ||||
|         throw Error("cannot add path '%s' because it lacks a valid signature", info.path); | ||||
| 
 | ||||
|  | @ -999,8 +990,27 @@ void LocalStore::addToStore(const ValidPathInfo & info, const ref<std::string> & | |||
| 
 | ||||
|             deletePath(realPath); | ||||
| 
 | ||||
|             StringSource source(*nar); | ||||
|             restorePath(realPath, source); | ||||
|             /* While restoring the path from the NAR, compute the hash
 | ||||
|                of the NAR. */ | ||||
|             HashSink hashSink(htSHA256); | ||||
| 
 | ||||
|             LambdaSource wrapperSource([&](unsigned char * data, size_t len) -> size_t { | ||||
|                 size_t n = source.read(data, len); | ||||
|                 hashSink(data, n); | ||||
|                 return n; | ||||
|             }); | ||||
| 
 | ||||
|             restorePath(realPath, wrapperSource); | ||||
| 
 | ||||
|             auto hashResult = hashSink.finish(); | ||||
| 
 | ||||
|             if (hashResult.first != info.narHash) | ||||
|                 throw Error("hash mismatch importing path '%s'; expected hash '%s', got '%s'", | ||||
|                     info.path, info.narHash.to_string(), hashResult.first.to_string()); | ||||
| 
 | ||||
|             if (hashResult.second != info.narSize) | ||||
|                 throw Error("size mismatch importing path '%s'; expected %s, got %s", | ||||
|                     info.path, info.narSize, hashResult.second); | ||||
| 
 | ||||
|             autoGC(); | ||||
| 
 | ||||
|  |  | |||
|  | @ -143,7 +143,7 @@ public: | |||
|     void querySubstitutablePathInfos(const PathSet & paths, | ||||
|         SubstitutablePathInfos & infos) override; | ||||
| 
 | ||||
|     void addToStore(const ValidPathInfo & info, const ref<std::string> & nar, | ||||
|     void addToStore(const ValidPathInfo & info, Source & source, | ||||
|         RepairFlag repair, CheckSigsFlag checkSigs, | ||||
|         std::shared_ptr<FSAccessor> accessor) override; | ||||
| 
 | ||||
|  |  | |||
|  | @ -590,32 +590,15 @@ void copyStorePath(ref<Store> srcStore, ref<Store> dstStore, | |||
| 
 | ||||
|     uint64_t total = 0; | ||||
| 
 | ||||
|     auto progress = [&](size_t len) { | ||||
|         total += len; | ||||
|         act.progress(total, info->narSize); | ||||
|     }; | ||||
| 
 | ||||
|     struct MyStringSink : StringSink | ||||
|     { | ||||
|         typedef std::function<void(size_t)> Callback; | ||||
|         Callback callback; | ||||
|         MyStringSink(Callback callback) : callback(callback) { } | ||||
|         void operator () (const unsigned char * data, size_t len) override | ||||
|         { | ||||
|             StringSink::operator ()(data, len); | ||||
|             callback(len); | ||||
|         }; | ||||
|     }; | ||||
| 
 | ||||
|     MyStringSink sink(progress); | ||||
|     srcStore->narFromPath({storePath}, sink); | ||||
| 
 | ||||
|     // FIXME
 | ||||
| #if 0 | ||||
|     if (!info->narHash) { | ||||
|         auto info2 = make_ref<ValidPathInfo>(*info); | ||||
|         info2->narHash = hashString(htSHA256, *sink.s); | ||||
|         if (!info->narSize) info2->narSize = sink.s->size(); | ||||
|         info = info2; | ||||
|     } | ||||
| #endif | ||||
| 
 | ||||
|     if (info->ultimate) { | ||||
|         auto info2 = make_ref<ValidPathInfo>(*info); | ||||
|  | @ -623,7 +606,16 @@ void copyStorePath(ref<Store> srcStore, ref<Store> dstStore, | |||
|         info = info2; | ||||
|     } | ||||
| 
 | ||||
|     dstStore->addToStore(*info, sink.s, repair, checkSigs); | ||||
|     auto source = sinkToSource([&](Sink & sink) { | ||||
|         LambdaSink wrapperSink([&](const unsigned char * data, size_t len) { | ||||
|             sink(data, len); | ||||
|             total += len; | ||||
|             act.progress(total, info->narSize); | ||||
|         }); | ||||
|         srcStore->narFromPath({storePath}, wrapperSink); | ||||
|     }); | ||||
| 
 | ||||
|     dstStore->addToStore(*info, *source, repair, checkSigs); | ||||
| } | ||||
| 
 | ||||
| 
 | ||||
|  | @ -808,6 +800,21 @@ std::string makeFixedOutputCA(bool recursive, const Hash & hash) | |||
| } | ||||
| 
 | ||||
| 
 | ||||
| void Store::addToStore(const ValidPathInfo & info, Source & narSource, | ||||
|     RepairFlag repair, CheckSigsFlag checkSigs, | ||||
|     std::shared_ptr<FSAccessor> accessor) | ||||
| { | ||||
|     addToStore(info, make_ref<std::string>(narSource.drain()), repair, checkSigs, accessor); | ||||
| } | ||||
| 
 | ||||
| void Store::addToStore(const ValidPathInfo & info, const ref<std::string> & nar, | ||||
|     RepairFlag repair, CheckSigsFlag checkSigs, | ||||
|     std::shared_ptr<FSAccessor> accessor) | ||||
| { | ||||
|     StringSource source(*nar); | ||||
|     addToStore(info, source, repair, checkSigs, accessor); | ||||
| } | ||||
| 
 | ||||
| } | ||||
| 
 | ||||
| 
 | ||||
|  |  | |||
|  | @ -399,9 +399,14 @@ public: | |||
|     virtual bool wantMassQuery() { return false; } | ||||
| 
 | ||||
|     /* Import a path into the store. */ | ||||
|     virtual void addToStore(const ValidPathInfo & info, Source & narSource, | ||||
|         RepairFlag repair = NoRepair, CheckSigsFlag checkSigs = CheckSigs, | ||||
|         std::shared_ptr<FSAccessor> accessor = 0); | ||||
| 
 | ||||
|     // FIXME: remove
 | ||||
|     virtual void addToStore(const ValidPathInfo & info, const ref<std::string> & nar, | ||||
|         RepairFlag repair = NoRepair, CheckSigsFlag checkSigs = CheckSigs, | ||||
|         std::shared_ptr<FSAccessor> accessor = 0) = 0; | ||||
|         std::shared_ptr<FSAccessor> accessor = 0); | ||||
| 
 | ||||
|     /* Copy the contents of a path to the store and register the
 | ||||
|        validity the resulting path.  The resulting path is returned. | ||||
|  |  | |||
|  | @ -6,7 +6,7 @@ libutil_DIR := $(d) | |||
| 
 | ||||
| libutil_SOURCES := $(wildcard $(d)/*.cc) | ||||
| 
 | ||||
| libutil_LDFLAGS = $(LIBLZMA_LIBS) -lbz2 -pthread $(OPENSSL_LIBS) $(LIBBROTLI_LIBS) | ||||
| libutil_LDFLAGS = $(LIBLZMA_LIBS) -lbz2 -pthread $(OPENSSL_LIBS) $(LIBBROTLI_LIBS) -lboost_context | ||||
| 
 | ||||
| libutil_LIBS = libformat | ||||
| 
 | ||||
|  |  | |||
|  | @ -5,6 +5,8 @@ | |||
| #include <cerrno> | ||||
| #include <memory> | ||||
| 
 | ||||
| #include <boost/coroutine2/coroutine.hpp> | ||||
| 
 | ||||
| 
 | ||||
| namespace nix { | ||||
| 
 | ||||
|  | @ -88,6 +90,23 @@ void Source::operator () (unsigned char * data, size_t len) | |||
| } | ||||
| 
 | ||||
| 
 | ||||
| std::string Source::drain() | ||||
| { | ||||
|     std::string s; | ||||
|     std::vector<unsigned char> buf(8192); | ||||
|     while (true) { | ||||
|         size_t n; | ||||
|         try { | ||||
|             n = read(buf.data(), buf.size()); | ||||
|             s.append((char *) buf.data(), n); | ||||
|         } catch (EndOfFile &) { | ||||
|             break; | ||||
|         } | ||||
|     } | ||||
|     return s; | ||||
| } | ||||
| 
 | ||||
| 
 | ||||
| size_t BufferedSource::read(unsigned char * data, size_t len) | ||||
| { | ||||
|     if (!buffer) buffer = decltype(buffer)(new unsigned char[bufSize]); | ||||
|  | @ -138,6 +157,50 @@ size_t StringSource::read(unsigned char * data, size_t len) | |||
| } | ||||
| 
 | ||||
| 
 | ||||
| std::unique_ptr<Source> sinkToSource(std::function<void(Sink &)> fun) | ||||
| { | ||||
|     struct SinkToSource : Source | ||||
|     { | ||||
|         typedef boost::coroutines2::coroutine<std::string> coro_t; | ||||
| 
 | ||||
|         coro_t::pull_type coro; | ||||
| 
 | ||||
|         SinkToSource(std::function<void(Sink &)> fun) | ||||
|             : coro([&](coro_t::push_type & yield) { | ||||
|                 LambdaSink sink([&](const unsigned char * data, size_t len) { | ||||
|                     if (len) yield(std::string((const char *) data, len)); | ||||
|                 }); | ||||
|                 fun(sink); | ||||
|             }) | ||||
|         { | ||||
|         } | ||||
| 
 | ||||
|         std::string cur; | ||||
|         size_t pos = 0; | ||||
| 
 | ||||
|         size_t read(unsigned char * data, size_t len) override | ||||
|         { | ||||
|             if (!coro) | ||||
|                 throw EndOfFile("coroutine has finished"); | ||||
| 
 | ||||
|             if (pos == cur.size()) { | ||||
|                 if (!cur.empty()) coro(); | ||||
|                 cur = std::move(coro.get()); | ||||
|                 pos = 0; | ||||
|             } | ||||
| 
 | ||||
|             auto n = std::min(cur.size() - pos, len); | ||||
|             memcpy(data, (unsigned char *) cur.data() + pos, n); | ||||
|             pos += n; | ||||
| 
 | ||||
|             return n; | ||||
|         } | ||||
|     }; | ||||
| 
 | ||||
|     return std::make_unique<SinkToSource>(fun); | ||||
| } | ||||
| 
 | ||||
| 
 | ||||
| void writePadding(size_t len, Sink & sink) | ||||
| { | ||||
|     if (len % 8) { | ||||
|  |  | |||
|  | @ -61,6 +61,8 @@ struct Source | |||
|     virtual size_t read(unsigned char * data, size_t len) = 0; | ||||
| 
 | ||||
|     virtual bool good() { return true; } | ||||
| 
 | ||||
|     std::string drain(); | ||||
| }; | ||||
| 
 | ||||
| 
 | ||||
|  | @ -191,6 +193,27 @@ struct LambdaSink : Sink | |||
| }; | ||||
| 
 | ||||
| 
 | ||||
| /* Convert a function into a source. */ | ||||
| struct LambdaSource : Source | ||||
| { | ||||
|     typedef std::function<size_t(unsigned char *, size_t)> lambda_t; | ||||
| 
 | ||||
|     lambda_t lambda; | ||||
| 
 | ||||
|     LambdaSource(const lambda_t & lambda) : lambda(lambda) { } | ||||
| 
 | ||||
|     size_t read(unsigned char * data, size_t len) override | ||||
|     { | ||||
|         return lambda(data, len); | ||||
|     } | ||||
| }; | ||||
| 
 | ||||
| 
 | ||||
| /* Convert a function that feeds data into a Sink into a Source. The
 | ||||
|    Source executes the function as a coroutine. */ | ||||
| std::unique_ptr<Source> sinkToSource(std::function<void(Sink &)> fun); | ||||
| 
 | ||||
| 
 | ||||
| void writePadding(size_t len, Sink & sink); | ||||
| void writeString(const unsigned char * buf, size_t len, Sink & sink); | ||||
| 
 | ||||
|  |  | |||
|  | @ -695,7 +695,7 @@ static void performOp(TunnelLogger * logger, ref<LocalStore> store, | |||
|         parseDump(tee, tee.source); | ||||
| 
 | ||||
|         logger->startWork(); | ||||
|         store->addToStore(info, tee.source.data, (RepairFlag) repair, | ||||
|         store.cast<Store>()->addToStore(info, tee.source.data, (RepairFlag) repair, | ||||
|             dontCheckSigs ? NoCheckSigs : CheckSigs, nullptr); | ||||
|         logger->stopWork(); | ||||
|         break; | ||||
|  |  | |||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue