Similarly to how we did for buildPaths, add a std::ostream& log_sink parameter to the build_derivation method on Store, and pass it std::cerr when called at the top level by nix commands - most notably, the build-remote hook binary, so that we get build logs when using tvix as a remote builder. Change-Id: I0f8f729ba8429d4838a0a135a5c2ac1e1a95d575 Reviewed-on: https://cl.tvl.fyi/c/depot/+/2176 Tested-by: BuildkiteCI Reviewed-by: andi <andi@notmuch.email> Reviewed-by: kanepyork <rikingcoding@gmail.com>
549 lines
18 KiB
C++
549 lines
18 KiB
C++
#include "rpc-store.hh"
|
|
|
|
#include <algorithm>
|
|
#include <filesystem>
|
|
#include <memory>
|
|
#include <optional>
|
|
#include <ostream>
|
|
#include <string_view>
|
|
|
|
#include <absl/status/status.h>
|
|
#include <absl/strings/str_cat.h>
|
|
#include <absl/strings/str_format.h>
|
|
#include <absl/strings/string_view.h>
|
|
#include <glog/logging.h>
|
|
#include <google/protobuf/empty.pb.h>
|
|
#include <google/protobuf/util/time_util.h>
|
|
#include <grpcpp/create_channel.h>
|
|
#include <grpcpp/impl/codegen/async_unary_call.h>
|
|
#include <grpcpp/impl/codegen/client_context.h>
|
|
#include <grpcpp/impl/codegen/completion_queue.h>
|
|
#include <grpcpp/impl/codegen/status.h>
|
|
#include <grpcpp/impl/codegen/status_code_enum.h>
|
|
#include <grpcpp/impl/codegen/sync_stream.h>
|
|
#include <grpcpp/security/credentials.h>
|
|
#include <sys/ucontext.h>
|
|
|
|
#include "libproto/worker.grpc.pb.h"
|
|
#include "libproto/worker.pb.h"
|
|
#include "libstore/derivations.hh"
|
|
#include "libstore/store-api.hh"
|
|
#include "libstore/worker-protocol.hh"
|
|
#include "libutil/archive.hh"
|
|
#include "libutil/hash.hh"
|
|
#include "libutil/proto.hh"
|
|
#include "libutil/types.hh"
|
|
|
|
namespace nix {
|
|
|
|
namespace store {
|
|
|
|
// Should be set to the bandwidth delay product between the client and the
|
|
// daemon. The current value, which should eventually be determined dynamically,
|
|
// has currently been set to a developer's deskop computer, rounded up
|
|
constexpr size_t kChunkSize = 1024 * 64;
|
|
|
|
using google::protobuf::util::TimeUtil;
|
|
using grpc::ClientContext;
|
|
using nix::proto::WorkerService;
|
|
|
|
static google::protobuf::Empty kEmpty;
|
|
|
|
template <typename Request>
|
|
class RPCSink : public BufferedSink {
|
|
public:
|
|
using Writer = grpc::ClientWriter<Request>;
|
|
explicit RPCSink(std::unique_ptr<Writer>&& writer)
|
|
: writer_(std::move(writer)), good_(true) {}
|
|
|
|
bool good() override { return good_; }
|
|
|
|
void write(const unsigned char* data, size_t len) override {
|
|
Request req;
|
|
req.set_data(data, len);
|
|
if (!writer_->Write(req)) {
|
|
good_ = false;
|
|
}
|
|
}
|
|
|
|
~RPCSink() override { flush(); }
|
|
|
|
grpc::Status Finish() {
|
|
flush();
|
|
return writer_->Finish();
|
|
}
|
|
|
|
private:
|
|
std::unique_ptr<Writer> writer_;
|
|
bool good_;
|
|
};
|
|
|
|
// TODO(grfn): Obviously this should go away and be replaced by StatusOr... but
|
|
// that would require refactoring the entire store api, which we don't feel like
|
|
// doing right now. We should at some point though
|
|
void const RpcStore::SuccessOrThrow(const grpc::Status& status,
|
|
const absl::string_view& call) const {
|
|
if (!status.ok()) {
|
|
auto uri = uri_.value_or("unknown URI");
|
|
switch (status.error_code()) {
|
|
case grpc::StatusCode::UNIMPLEMENTED:
|
|
throw Unsupported(
|
|
absl::StrFormat("operation %s is not supported by store at %s: %s",
|
|
call, uri, status.error_message()));
|
|
default:
|
|
throw Error(absl::StrFormat(
|
|
"Rpc call %s to %s failed (%s): %s ", call, uri,
|
|
util::proto::GRPCStatusCodeDescription(status.error_code()),
|
|
status.error_message()));
|
|
}
|
|
}
|
|
}
|
|
|
|
bool RpcStore::isValidPathUncached(const Path& path) {
|
|
ClientContext ctx;
|
|
proto::IsValidPathResponse resp;
|
|
SuccessOrThrow(stub_->IsValidPath(&ctx, util::proto::StorePath(path), &resp),
|
|
__FUNCTION__);
|
|
return resp.is_valid();
|
|
}
|
|
|
|
PathSet RpcStore::queryAllValidPaths() {
|
|
ClientContext ctx;
|
|
proto::StorePaths paths;
|
|
SuccessOrThrow(stub_->QueryAllValidPaths(&ctx, kEmpty, &paths), __FUNCTION__);
|
|
return util::proto::FillFrom<PathSet>(paths.paths());
|
|
}
|
|
|
|
PathSet RpcStore::queryValidPaths(const PathSet& paths,
|
|
SubstituteFlag maybeSubstitute) {
|
|
ClientContext ctx;
|
|
proto::StorePaths store_paths;
|
|
for (const auto& path : paths) {
|
|
store_paths.add_paths(path);
|
|
}
|
|
proto::StorePaths result_paths;
|
|
SuccessOrThrow(stub_->QueryValidPaths(&ctx, store_paths, &result_paths),
|
|
__FUNCTION__);
|
|
return util::proto::FillFrom<PathSet>(result_paths.paths());
|
|
}
|
|
|
|
void RpcStore::queryPathInfoUncached(
|
|
const Path& path,
|
|
Callback<std::shared_ptr<ValidPathInfo>> callback) noexcept {
|
|
ClientContext ctx;
|
|
proto::StorePath store_path;
|
|
store_path.set_path(path);
|
|
|
|
try {
|
|
proto::PathInfo path_info;
|
|
auto result = stub_->QueryPathInfo(&ctx, store_path, &path_info);
|
|
if (result.error_code() == grpc::INVALID_ARGUMENT) {
|
|
throw InvalidPath(absl::StrFormat("path '%s' is not valid", path));
|
|
}
|
|
SuccessOrThrow(result);
|
|
|
|
std::shared_ptr<ValidPathInfo> info;
|
|
|
|
if (!path_info.is_valid()) {
|
|
throw InvalidPath(absl::StrFormat("path '%s' is not valid", path));
|
|
}
|
|
|
|
info = std::make_shared<ValidPathInfo>();
|
|
info->path = path;
|
|
info->deriver = path_info.deriver().path();
|
|
if (!info->deriver.empty()) {
|
|
assertStorePath(info->deriver);
|
|
}
|
|
auto hash_ = Hash::deserialize(path_info.nar_hash(), htSHA256);
|
|
info->narHash = Hash::unwrap_throw(hash_);
|
|
info->references.insert(path_info.references().begin(),
|
|
path_info.references().end());
|
|
info->registrationTime =
|
|
TimeUtil::TimestampToTimeT(path_info.registration_time());
|
|
info->narSize = path_info.nar_size();
|
|
info->ultimate = path_info.ultimate();
|
|
info->sigs.insert(path_info.sigs().begin(), path_info.sigs().end());
|
|
info->ca = path_info.ca();
|
|
|
|
callback(std::move(info));
|
|
} catch (...) {
|
|
callback.rethrow();
|
|
}
|
|
}
|
|
|
|
void RpcStore::queryReferrers(const Path& path, PathSet& referrers) {
|
|
ClientContext ctx;
|
|
proto::StorePaths paths;
|
|
SuccessOrThrow(
|
|
stub_->QueryReferrers(&ctx, util::proto::StorePath(path), &paths),
|
|
__FUNCTION__);
|
|
referrers.insert(paths.paths().begin(), paths.paths().end());
|
|
}
|
|
|
|
PathSet RpcStore::queryValidDerivers(const Path& path) {
|
|
ClientContext ctx;
|
|
proto::StorePaths paths;
|
|
SuccessOrThrow(
|
|
stub_->QueryValidDerivers(&ctx, util::proto::StorePath(path), &paths),
|
|
__FUNCTION__);
|
|
return util::proto::FillFrom<PathSet>(paths.paths());
|
|
}
|
|
|
|
PathSet RpcStore::queryDerivationOutputs(const Path& path) {
|
|
ClientContext ctx;
|
|
proto::StorePaths paths;
|
|
SuccessOrThrow(
|
|
stub_->QueryDerivationOutputs(&ctx, util::proto::StorePath(path), &paths),
|
|
__FUNCTION__);
|
|
return util::proto::FillFrom<PathSet>(paths.paths());
|
|
}
|
|
|
|
StringSet RpcStore::queryDerivationOutputNames(const Path& path) {
|
|
ClientContext ctx;
|
|
proto::DerivationOutputNames output_names;
|
|
SuccessOrThrow(stub_->QueryDerivationOutputNames(
|
|
&ctx, util::proto::StorePath(path), &output_names));
|
|
return util::proto::FillFrom<StringSet>(output_names.names());
|
|
}
|
|
|
|
Path RpcStore::queryPathFromHashPart(const std::string& hashPart) {
|
|
ClientContext ctx;
|
|
proto::StorePath path;
|
|
proto::HashPart proto_hash_part;
|
|
proto_hash_part.set_hash_part(hashPart);
|
|
SuccessOrThrow(stub_->QueryPathFromHashPart(&ctx, proto_hash_part, &path),
|
|
__FUNCTION__);
|
|
return path.path();
|
|
}
|
|
|
|
PathSet RpcStore::querySubstitutablePaths(const PathSet& paths) {
|
|
ClientContext ctx;
|
|
proto::StorePaths result;
|
|
SuccessOrThrow(stub_->QuerySubstitutablePaths(
|
|
&ctx, util::proto::StorePaths(paths), &result));
|
|
return util::proto::FillFrom<PathSet>(result.paths());
|
|
}
|
|
|
|
void RpcStore::querySubstitutablePathInfos(const PathSet& paths,
|
|
SubstitutablePathInfos& infos) {
|
|
ClientContext ctx;
|
|
proto::SubstitutablePathInfos result;
|
|
SuccessOrThrow(stub_->QuerySubstitutablePathInfos(
|
|
&ctx, util::proto::StorePaths(paths), &result));
|
|
|
|
for (const auto& path_info : result.path_infos()) {
|
|
auto path = path_info.path().path();
|
|
SubstitutablePathInfo& info(infos[path]);
|
|
info.deriver = path_info.deriver().path();
|
|
if (!info.deriver.empty()) {
|
|
assertStorePath(info.deriver);
|
|
}
|
|
info.references = util::proto::FillFrom<PathSet>(path_info.references());
|
|
info.downloadSize = path_info.download_size();
|
|
info.narSize = path_info.nar_size();
|
|
}
|
|
}
|
|
|
|
void RpcStore::addToStore(const ValidPathInfo& info, Source& narSource,
|
|
RepairFlag repair, CheckSigsFlag checkSigs,
|
|
std::shared_ptr<FSAccessor> accessor) {
|
|
ClientContext ctx;
|
|
google::protobuf::Empty response;
|
|
auto writer = stub_->AddToStoreNar(&ctx, &response);
|
|
|
|
proto::AddToStoreNarRequest path_info_req;
|
|
path_info_req.mutable_path_info()->mutable_path()->set_path(info.path);
|
|
path_info_req.mutable_path_info()->mutable_deriver()->set_path(info.deriver);
|
|
path_info_req.mutable_path_info()->set_nar_hash(
|
|
info.narHash.to_string(Base16, false));
|
|
for (const auto& ref : info.references) {
|
|
path_info_req.mutable_path_info()->add_references(ref);
|
|
}
|
|
*path_info_req.mutable_path_info()->mutable_registration_time() =
|
|
TimeUtil::TimeTToTimestamp(info.registrationTime);
|
|
path_info_req.mutable_path_info()->set_nar_size(info.narSize);
|
|
path_info_req.mutable_path_info()->set_ultimate(info.ultimate);
|
|
for (const auto& sig : info.sigs) {
|
|
path_info_req.mutable_path_info()->add_sigs(sig);
|
|
}
|
|
path_info_req.mutable_path_info()->set_ca(info.ca);
|
|
path_info_req.mutable_path_info()->set_repair(repair);
|
|
path_info_req.mutable_path_info()->set_check_sigs(checkSigs);
|
|
|
|
if (!writer->Write(path_info_req)) {
|
|
throw Error("Could not write to nix daemon");
|
|
}
|
|
|
|
RPCSink sink(std::move(writer));
|
|
copyNAR(narSource, sink);
|
|
SuccessOrThrow(sink.Finish(), __FUNCTION__);
|
|
}
|
|
|
|
Path RpcStore::addToStore(const std::string& name, const Path& srcPath,
|
|
bool recursive, HashType hashAlgo, PathFilter& filter,
|
|
RepairFlag repair) {
|
|
if (repair != 0u) {
|
|
throw Error(
|
|
"repairing is not supported when building through the Nix daemon");
|
|
}
|
|
|
|
ClientContext ctx;
|
|
proto::StorePath response;
|
|
auto writer = stub_->AddToStore(&ctx, &response);
|
|
|
|
proto::AddToStoreRequest metadata_req;
|
|
metadata_req.mutable_meta()->set_base_name(name);
|
|
// TODO(grfn): what is fixed?
|
|
metadata_req.mutable_meta()->set_fixed(!(hashAlgo == htSHA256 && recursive));
|
|
metadata_req.mutable_meta()->set_recursive(recursive);
|
|
metadata_req.mutable_meta()->set_hash_type(HashTypeToProto(hashAlgo));
|
|
|
|
if (!writer->Write(metadata_req)) {
|
|
throw Error("Could not write to nix daemon");
|
|
}
|
|
|
|
RPCSink sink(std::move(writer));
|
|
dumpPath(std::filesystem::absolute(srcPath), sink);
|
|
sink.flush();
|
|
SuccessOrThrow(sink.Finish(), __FUNCTION__);
|
|
|
|
return response.path();
|
|
}
|
|
|
|
Path RpcStore::addTextToStore(const std::string& name,
|
|
const std::string& content,
|
|
const PathSet& references, RepairFlag repair) {
|
|
if (repair != 0u) {
|
|
throw Error(
|
|
"repairing is not supported when building through the Nix daemon");
|
|
}
|
|
ClientContext ctx;
|
|
proto::StorePath result;
|
|
auto writer = stub_->AddTextToStore(&ctx, &result);
|
|
|
|
proto::AddTextToStoreRequest meta;
|
|
meta.mutable_meta()->set_name(name);
|
|
meta.mutable_meta()->set_size(content.size());
|
|
for (const auto& ref : references) {
|
|
meta.mutable_meta()->add_references(ref);
|
|
}
|
|
writer->Write(meta);
|
|
|
|
for (int i = 0; i <= content.size(); i += kChunkSize) {
|
|
auto len = std::min(kChunkSize, content.size() - i);
|
|
proto::AddTextToStoreRequest data;
|
|
data.set_data(content.data() + i, len);
|
|
if (!writer->Write(data)) {
|
|
// Finish() below will error
|
|
break;
|
|
}
|
|
}
|
|
|
|
writer->WritesDone();
|
|
SuccessOrThrow(writer->Finish(), __FUNCTION__);
|
|
return result.path();
|
|
}
|
|
|
|
absl::Status RpcStore::buildPaths(std::ostream& log_sink, const PathSet& paths,
|
|
BuildMode build_mode) {
|
|
ClientContext ctx;
|
|
proto::BuildPathsRequest request;
|
|
for (const auto& path : paths) {
|
|
request.add_drvs(path);
|
|
}
|
|
|
|
google::protobuf::Empty response;
|
|
request.set_mode(nix::BuildModeToProto(build_mode));
|
|
|
|
std::unique_ptr<grpc::ClientReader<proto::BuildEvent>> reader =
|
|
stub_->BuildPaths(&ctx, request);
|
|
|
|
proto::BuildEvent event;
|
|
while (reader->Read(&event)) {
|
|
if (event.has_build_log()) {
|
|
// TODO(tazjin): Include .path()?
|
|
log_sink << event.build_log().line();
|
|
} else {
|
|
log_sink << "Building path: " << event.building_path().path()
|
|
<< std::endl;
|
|
}
|
|
|
|
// has_result() is not in use in this call (for now)
|
|
}
|
|
|
|
return nix::util::proto::GRPCStatusToAbsl(reader->Finish());
|
|
}
|
|
|
|
BuildResult RpcStore::buildDerivation(std::ostream& log_sink,
|
|
const Path& drvPath,
|
|
const BasicDerivation& drv,
|
|
BuildMode buildMode) {
|
|
ClientContext ctx;
|
|
proto::BuildDerivationRequest request;
|
|
request.mutable_drv_path()->set_path(drvPath);
|
|
proto::Derivation proto_drv = drv.to_proto();
|
|
*request.mutable_derivation() = proto_drv;
|
|
request.set_build_mode(BuildModeToProto(buildMode));
|
|
|
|
std::unique_ptr<grpc::ClientReader<proto::BuildEvent>> reader =
|
|
stub_->BuildDerivation(&ctx, request);
|
|
|
|
std::optional<BuildResult> result;
|
|
|
|
proto::BuildEvent event;
|
|
while (reader->Read(&event)) {
|
|
if (event.has_build_log()) {
|
|
log_sink << event.build_log().line();
|
|
} else if (event.has_result()) {
|
|
result = BuildResult::FromProto(event.result());
|
|
}
|
|
}
|
|
SuccessOrThrow(reader->Finish(), __FUNCTION__);
|
|
|
|
if (!result.has_value()) {
|
|
throw Error("Invalid response from daemon for buildDerivation");
|
|
}
|
|
return result.value();
|
|
}
|
|
|
|
void RpcStore::ensurePath(const Path& path) {
|
|
ClientContext ctx;
|
|
google::protobuf::Empty response;
|
|
SuccessOrThrow(
|
|
stub_->EnsurePath(&ctx, util::proto::StorePath(path), &response),
|
|
__FUNCTION__);
|
|
}
|
|
|
|
void RpcStore::addTempRoot(const Path& path) {
|
|
ClientContext ctx;
|
|
google::protobuf::Empty response;
|
|
SuccessOrThrow(
|
|
stub_->AddTempRoot(&ctx, util::proto::StorePath(path), &response),
|
|
__FUNCTION__);
|
|
}
|
|
|
|
void RpcStore::addIndirectRoot(const Path& path) {
|
|
ClientContext ctx;
|
|
google::protobuf::Empty response;
|
|
SuccessOrThrow(
|
|
stub_->AddIndirectRoot(&ctx, util::proto::StorePath(path), &response),
|
|
__FUNCTION__);
|
|
}
|
|
|
|
void RpcStore::syncWithGC() {
|
|
ClientContext ctx;
|
|
google::protobuf::Empty response;
|
|
SuccessOrThrow(stub_->SyncWithGC(&ctx, kEmpty, &response), __FUNCTION__);
|
|
}
|
|
|
|
Roots RpcStore::findRoots(bool censor) {
|
|
ClientContext ctx;
|
|
proto::FindRootsResponse response;
|
|
SuccessOrThrow(stub_->FindRoots(&ctx, kEmpty, &response), __FUNCTION__);
|
|
Roots result;
|
|
|
|
for (const auto& [target, links] : response.roots()) {
|
|
auto link_paths =
|
|
util::proto::FillFrom<std::unordered_set<std::string>>(links.paths());
|
|
result.insert({target, link_paths});
|
|
}
|
|
|
|
return result;
|
|
}
|
|
|
|
void RpcStore::collectGarbage(const GCOptions& options, GCResults& results) {
|
|
ClientContext ctx;
|
|
proto::CollectGarbageRequest request;
|
|
request.set_action(options.ActionToProto());
|
|
for (const auto& path : options.pathsToDelete) {
|
|
request.add_paths_to_delete(path);
|
|
}
|
|
request.set_ignore_liveness(options.ignoreLiveness);
|
|
request.set_max_freed(options.maxFreed);
|
|
|
|
proto::CollectGarbageResponse response;
|
|
SuccessOrThrow(stub_->CollectGarbage(&ctx, request, &response), __FUNCTION__);
|
|
|
|
for (const auto& path : response.deleted_paths()) {
|
|
results.paths.insert(path);
|
|
}
|
|
results.bytesFreed = response.bytes_freed();
|
|
}
|
|
|
|
void RpcStore::optimiseStore() {
|
|
ClientContext ctx;
|
|
google::protobuf::Empty response;
|
|
SuccessOrThrow(stub_->OptimiseStore(&ctx, kEmpty, &response), __FUNCTION__);
|
|
}
|
|
|
|
bool RpcStore::verifyStore(bool checkContents, RepairFlag repair) {
|
|
ClientContext ctx;
|
|
proto::VerifyStoreRequest request;
|
|
request.set_check_contents(checkContents);
|
|
request.set_repair(repair);
|
|
proto::VerifyStoreResponse response;
|
|
SuccessOrThrow(stub_->VerifyStore(&ctx, request, &response), __FUNCTION__);
|
|
return response.errors();
|
|
}
|
|
|
|
void RpcStore::addSignatures(const Path& storePath, const StringSet& sigs) {
|
|
ClientContext ctx;
|
|
proto::AddSignaturesRequest request;
|
|
request.mutable_path()->set_path(storePath);
|
|
for (const auto& sig : sigs) {
|
|
request.mutable_sigs()->add_sigs(sig);
|
|
}
|
|
google::protobuf::Empty response;
|
|
SuccessOrThrow(stub_->AddSignatures(&ctx, request, &response), __FUNCTION__);
|
|
}
|
|
|
|
void RpcStore::queryMissing(const PathSet& targets, PathSet& willBuild,
|
|
PathSet& willSubstitute, PathSet& unknown,
|
|
unsigned long long& downloadSize,
|
|
unsigned long long& narSize) {
|
|
ClientContext ctx;
|
|
proto::QueryMissingResponse response;
|
|
SuccessOrThrow(
|
|
stub_->QueryMissing(&ctx, util::proto::StorePaths(targets), &response),
|
|
__FUNCTION__);
|
|
|
|
willBuild = util::proto::FillFrom<PathSet>(response.will_build());
|
|
willSubstitute = util::proto::FillFrom<PathSet>(response.will_substitute());
|
|
unknown = util::proto::FillFrom<PathSet>(response.unknown());
|
|
downloadSize = response.download_size();
|
|
narSize = response.nar_size();
|
|
}
|
|
|
|
std::shared_ptr<std::string> RpcStore::getBuildLog(const Path& path) {
|
|
ClientContext ctx;
|
|
proto::BuildLog response;
|
|
SuccessOrThrow(
|
|
stub_->GetBuildLog(&ctx, util::proto::StorePath(path), &response),
|
|
__FUNCTION__);
|
|
|
|
auto build_log = response.build_log();
|
|
if (build_log.empty()) {
|
|
return nullptr;
|
|
}
|
|
return std::make_shared<std::string>(build_log);
|
|
}
|
|
|
|
unsigned int RpcStore::getProtocol() { return PROTOCOL_VERSION; }
|
|
|
|
} // namespace store
|
|
|
|
constexpr std::string_view kUriScheme = "unix://";
|
|
|
|
// TODO(grfn): Make this a function that we call from main rather than... this
|
|
static RegisterStoreImplementation regStore([](const std::string& uri,
|
|
const Store::Params& params)
|
|
-> std::shared_ptr<Store> {
|
|
if (std::string(uri, 0, kUriScheme.size()) != kUriScheme) {
|
|
return nullptr;
|
|
}
|
|
auto channel = grpc::CreateChannel(uri, grpc::InsecureChannelCredentials());
|
|
return std::make_shared<store::RpcStore>(
|
|
uri, params, proto::WorkerService::NewStub(channel));
|
|
});
|
|
|
|
} // namespace nix
|