nix copy: Parallelise
This commit is contained in:
		
							parent
							
								
									b2ce6fde5a
								
							
						
					
					
						commit
						91539d305f
					
				
					 4 changed files with 61 additions and 22 deletions
				
			
		| 
						 | 
				
			
			@ -3,6 +3,7 @@
 | 
			
		|||
#include "shared.hh"
 | 
			
		||||
#include "store-api.hh"
 | 
			
		||||
#include "sync.hh"
 | 
			
		||||
#include "thread-pool.hh"
 | 
			
		||||
 | 
			
		||||
#include <atomic>
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -57,17 +58,26 @@ struct CmdCopy : StorePathsCommand
 | 
			
		|||
 | 
			
		||||
        progressBar.updateStatus(showProgress());
 | 
			
		||||
 | 
			
		||||
        storePaths.reverse(); // FIXME: assumes reverse topo sort
 | 
			
		||||
        struct Graph
 | 
			
		||||
        {
 | 
			
		||||
            std::set<Path> left;
 | 
			
		||||
            std::map<Path, std::set<Path>> refs, rrefs;
 | 
			
		||||
        };
 | 
			
		||||
 | 
			
		||||
        for (auto & storePath : storePaths) {
 | 
			
		||||
            checkInterrupt();
 | 
			
		||||
 | 
			
		||||
            if (dstStore->isValidPath(storePath)) {
 | 
			
		||||
                total--;
 | 
			
		||||
                progressBar.updateStatus(showProgress());
 | 
			
		||||
                continue;
 | 
			
		||||
        Sync<Graph> graph_;
 | 
			
		||||
        {
 | 
			
		||||
            auto graph(graph_.lock());
 | 
			
		||||
            graph->left = PathSet(storePaths.begin(), storePaths.end());
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        ThreadPool pool;
 | 
			
		||||
 | 
			
		||||
        std::function<void(const Path &)> doPath;
 | 
			
		||||
 | 
			
		||||
        doPath = [&](const Path & storePath) {
 | 
			
		||||
            checkInterrupt();
 | 
			
		||||
 | 
			
		||||
            if (!dstStore->isValidPath(storePath)) {
 | 
			
		||||
                auto activity(progressBar.startActivity(format("copying ‘%s’...") % storePath));
 | 
			
		||||
 | 
			
		||||
                StringSink sink;
 | 
			
		||||
| 
						 | 
				
			
			@ -77,8 +87,43 @@ struct CmdCopy : StorePathsCommand
 | 
			
		|||
                dstStore->importPaths(false, source, 0);
 | 
			
		||||
 | 
			
		||||
                done++;
 | 
			
		||||
            } else
 | 
			
		||||
                total--;
 | 
			
		||||
 | 
			
		||||
            progressBar.updateStatus(showProgress());
 | 
			
		||||
 | 
			
		||||
            /* Enqueue all paths that were waiting for this one. */
 | 
			
		||||
            {
 | 
			
		||||
                auto graph(graph_.lock());
 | 
			
		||||
                graph->left.erase(storePath);
 | 
			
		||||
                for (auto & rref : graph->rrefs[storePath]) {
 | 
			
		||||
                    auto & refs(graph->refs[rref]);
 | 
			
		||||
                    auto i = refs.find(storePath);
 | 
			
		||||
                    assert(i != refs.end());
 | 
			
		||||
                    refs.erase(i);
 | 
			
		||||
                    if (refs.empty())
 | 
			
		||||
                        pool.enqueue(std::bind(doPath, rref));
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
        };
 | 
			
		||||
 | 
			
		||||
        /* Build the dependency graph; enqueue all paths with no
 | 
			
		||||
           dependencies. */
 | 
			
		||||
        for (auto & storePath : storePaths) {
 | 
			
		||||
            auto info = srcStore->queryPathInfo(storePath);
 | 
			
		||||
            {
 | 
			
		||||
                auto graph(graph_.lock());
 | 
			
		||||
                for (auto & ref : info->references)
 | 
			
		||||
                    if (ref != storePath && graph->left.count(ref)) {
 | 
			
		||||
                        graph->refs[storePath].insert(ref);
 | 
			
		||||
                        graph->rrefs[ref].insert(storePath);
 | 
			
		||||
                    }
 | 
			
		||||
                if (graph->refs[storePath].empty())
 | 
			
		||||
                    pool.enqueue(std::bind(doPath, storePath));
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        pool.process();
 | 
			
		||||
 | 
			
		||||
        progressBar.done();
 | 
			
		||||
    }
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -29,12 +29,12 @@ void ProgressBar::updateStatus(const std::string & s)
 | 
			
		|||
 | 
			
		||||
void ProgressBar::done()
 | 
			
		||||
{
 | 
			
		||||
    _writeToStderr = decltype(_writeToStderr)();
 | 
			
		||||
    auto state_(state.lock());
 | 
			
		||||
    assert(state_->activities.empty());
 | 
			
		||||
    state_->done = true;
 | 
			
		||||
    std::cerr << "\r\e[K";
 | 
			
		||||
    std::cerr.flush();
 | 
			
		||||
    _writeToStderr = decltype(_writeToStderr)();
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
void ProgressBar::render(State & state_)
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -1,4 +1,3 @@
 | 
			
		|||
#include "affinity.hh" // FIXME
 | 
			
		||||
#include "command.hh"
 | 
			
		||||
#include "progress-bar.hh"
 | 
			
		||||
#include "shared.hh"
 | 
			
		||||
| 
						 | 
				
			
			@ -31,8 +30,6 @@ struct CmdCopySigs : StorePathsCommand
 | 
			
		|||
 | 
			
		||||
    void run(ref<Store> store, Paths storePaths) override
 | 
			
		||||
    {
 | 
			
		||||
        restoreAffinity(); // FIXME
 | 
			
		||||
 | 
			
		||||
        if (substituterUris.empty())
 | 
			
		||||
            throw UsageError("you must specify at least one substituter using ‘-s’");
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -1,4 +1,3 @@
 | 
			
		|||
#include "affinity.hh" // FIXME
 | 
			
		||||
#include "command.hh"
 | 
			
		||||
#include "progress-bar.hh"
 | 
			
		||||
#include "shared.hh"
 | 
			
		||||
| 
						 | 
				
			
			@ -52,8 +51,6 @@ struct CmdVerify : StorePathsCommand
 | 
			
		|||
 | 
			
		||||
    void run(ref<Store> store, Paths storePaths) override
 | 
			
		||||
    {
 | 
			
		||||
        restoreAffinity(); // FIXME
 | 
			
		||||
 | 
			
		||||
        std::vector<ref<Store>> substituters;
 | 
			
		||||
        for (auto & s : substituterUris)
 | 
			
		||||
            substituters.push_back(openStoreAt(s));
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue