* Some hackery to propagate the worker's stderr and exceptions to the
client.
This commit is contained in:
		
							parent
							
								
									714fa24cfb
								
							
						
					
					
						commit
						7951c3c546
					
				
					 7 changed files with 209 additions and 107 deletions
				
			
		|  | @ -872,7 +872,7 @@ static void drain(int fd) | ||||||
|             if (errno != EINTR) |             if (errno != EINTR) | ||||||
|                 throw SysError("draining"); |                 throw SysError("draining"); | ||||||
|         } else if (rd == 0) break; |         } else if (rd == 0) break; | ||||||
|         else writeFull(STDERR_FILENO, buffer, rd); |         else writeToStderr(buffer, rd); | ||||||
|     } |     } | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | @ -1610,7 +1610,7 @@ void DerivationGoal::handleChildOutput(int fd, const string & data) | ||||||
| { | { | ||||||
|     if (fd == logPipe.readSide) { |     if (fd == logPipe.readSide) { | ||||||
|         if (verbosity >= buildVerbosity) |         if (verbosity >= buildVerbosity) | ||||||
|             writeFull(STDERR_FILENO, (unsigned char *) data.c_str(), data.size()); |             writeToStderr((unsigned char *) data.c_str(), data.size()); | ||||||
|         writeFull(fdLogFile, (unsigned char *) data.c_str(), data.size()); |         writeFull(fdLogFile, (unsigned char *) data.c_str(), data.size()); | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|  | @ -1923,7 +1923,7 @@ void SubstitutionGoal::handleChildOutput(int fd, const string & data) | ||||||
| { | { | ||||||
|     assert(fd == logPipe.readSide); |     assert(fd == logPipe.readSide); | ||||||
|     if (verbosity >= buildVerbosity) |     if (verbosity >= buildVerbosity) | ||||||
|         writeFull(STDERR_FILENO, (unsigned char *) data.c_str(), data.size()); |         writeToStderr((unsigned char *) data.c_str(), data.size()); | ||||||
|     /* Don't write substitution output to a log file for now.  We
 |     /* Don't write substitution output to a log file for now.  We
 | ||||||
|        probably should, though. */ |        probably should, though. */ | ||||||
| } | } | ||||||
|  |  | ||||||
|  | @ -4,6 +4,10 @@ | ||||||
| #include "worker-protocol.hh" | #include "worker-protocol.hh" | ||||||
| #include "archive.hh" | #include "archive.hh" | ||||||
| 
 | 
 | ||||||
|  | #include <sys/types.h> | ||||||
|  | #include <sys/stat.h> | ||||||
|  | #include <fcntl.h> | ||||||
|  | 
 | ||||||
| #include <iostream> | #include <iostream> | ||||||
| #include <unistd.h> | #include <unistd.h> | ||||||
| 
 | 
 | ||||||
|  | @ -38,8 +42,14 @@ RemoteStore::RemoteStore() | ||||||
|             if (dup2(toChild.readSide, STDIN_FILENO) == -1) |             if (dup2(toChild.readSide, STDIN_FILENO) == -1) | ||||||
|                 throw SysError("dupping read side"); |                 throw SysError("dupping read side"); | ||||||
| 
 | 
 | ||||||
|  |             int fdDebug = open("/tmp/worker-log", O_WRONLY | O_CREAT | O_TRUNC, 0644); | ||||||
|  |             assert(fdDebug != -1); | ||||||
|  |             if (dup2(fdDebug, STDERR_FILENO) == -1) | ||||||
|  |                 throw SysError("dupping stderr"); | ||||||
|  |             close(fdDebug); | ||||||
|  |              | ||||||
|             execlp(worker.c_str(), worker.c_str(), |             execlp(worker.c_str(), worker.c_str(), | ||||||
|                 "--slave", NULL); |                 "-vvv", "--slave", NULL); | ||||||
| 
 | 
 | ||||||
|             throw SysError(format("executing `%1%'") % worker); |             throw SysError(format("executing `%1%'") % worker); | ||||||
|              |              | ||||||
|  | @ -66,9 +76,13 @@ RemoteStore::RemoteStore() | ||||||
| 
 | 
 | ||||||
| RemoteStore::~RemoteStore() | RemoteStore::~RemoteStore() | ||||||
| { | { | ||||||
|     writeInt(wopQuit, to); |     try { | ||||||
|     readInt(from); |         fromChild.readSide.close(); | ||||||
|  |         toChild.writeSide.close(); | ||||||
|         child.wait(true); |         child.wait(true); | ||||||
|  |     } catch (Error & e) { | ||||||
|  |         printMsg(lvlError, format("error (ignored): %1%") % e.msg()); | ||||||
|  |     } | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
|  | @ -158,6 +172,7 @@ void RemoteStore::buildDerivations(const PathSet & drvPaths) | ||||||
| { | { | ||||||
|     writeInt(wopBuildDerivations, to); |     writeInt(wopBuildDerivations, to); | ||||||
|     writeStringSet(drvPaths, to); |     writeStringSet(drvPaths, to); | ||||||
|  |     processStderr(); | ||||||
|     readInt(from); |     readInt(from); | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | @ -185,4 +200,18 @@ void RemoteStore::syncWithGC() | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
|  | void RemoteStore::processStderr() | ||||||
|  | { | ||||||
|  |     unsigned int msg; | ||||||
|  |     while ((msg = readInt(from)) == STDERR_NEXT) { | ||||||
|  |         string s = readString(from); | ||||||
|  |         writeToStderr((unsigned char *) s.c_str(), s.size()); | ||||||
|  |     } | ||||||
|  |     if (msg == STDERR_ERROR) | ||||||
|  |         throw Error(readString(from)); | ||||||
|  |     else if (msg != STDERR_LAST) | ||||||
|  |         throw Error("protocol error processing standard error"); | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
| } | } | ||||||
|  |  | ||||||
|  | @ -57,6 +57,8 @@ private: | ||||||
|     FdSink to; |     FdSink to; | ||||||
|     FdSource from; |     FdSource from; | ||||||
|     Pid child; |     Pid child; | ||||||
|  | 
 | ||||||
|  |     void processStderr(); | ||||||
| }; | }; | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
|  |  | ||||||
|  | @ -23,4 +23,9 @@ typedef enum { | ||||||
| } WorkerOp; | } WorkerOp; | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
|  | #define STDERR_NEXT  0x6f6c6d67 | ||||||
|  | #define STDERR_LAST  0x616c7473 | ||||||
|  | #define STDERR_ERROR 0x63787470 | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
| #endif /* !__WORKER_PROTOCOL_H */ | #endif /* !__WORKER_PROTOCOL_H */ | ||||||
|  |  | ||||||
|  | @ -437,7 +437,7 @@ void printMsg_(Verbosity level, const format & f) | ||||||
|     else if (logType == ltEscapes && level != lvlInfo) |     else if (logType == ltEscapes && level != lvlInfo) | ||||||
|         prefix = "\033[" + escVerbosity(level) + "s"; |         prefix = "\033[" + escVerbosity(level) + "s"; | ||||||
|     string s = (format("%1%%2%\n") % prefix % f.str()).str(); |     string s = (format("%1%%2%\n") % prefix % f.str()).str(); | ||||||
|     writeFull(STDERR_FILENO, (const unsigned char *) s.c_str(), s.size()); |     writeToStderr((const unsigned char *) s.c_str(), s.size()); | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
|  | @ -450,6 +450,15 @@ void warnOnce(bool & haveWarned, const format & f) | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
|  | static void defaultWriteToStderr(const unsigned char * buf, size_t count) | ||||||
|  | { | ||||||
|  |     writeFull(STDERR_FILENO, buf, count); | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | void (*writeToStderr) (const unsigned char * buf, size_t count) = defaultWriteToStderr; | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
| void readFull(int fd, unsigned char * buf, size_t count) | void readFull(int fd, unsigned char * buf, size_t count) | ||||||
| { | { | ||||||
|     while (count) { |     while (count) { | ||||||
|  |  | ||||||
|  | @ -131,6 +131,8 @@ void printMsg_(Verbosity level, const format & f); | ||||||
| 
 | 
 | ||||||
| void warnOnce(bool & haveWarned, const format & f); | void warnOnce(bool & haveWarned, const format & f); | ||||||
| 
 | 
 | ||||||
|  | extern void (*writeToStderr) (const unsigned char * buf, size_t count); | ||||||
|  | 
 | ||||||
| 
 | 
 | ||||||
| /* Wrappers arount read()/write() that read/write exactly the
 | /* Wrappers arount read()/write() that read/write exactly the
 | ||||||
|    requested number of bytes. */ |    requested number of bytes. */ | ||||||
|  |  | ||||||
|  | @ -10,7 +10,7 @@ | ||||||
| using namespace nix; | using namespace nix; | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| Path readStorePath(Source & from) | static Path readStorePath(Source & from) | ||||||
| { | { | ||||||
|     Path path = readString(from); |     Path path = readString(from); | ||||||
|     assertStorePath(path); |     assertStorePath(path); | ||||||
|  | @ -18,7 +18,7 @@ Path readStorePath(Source & from) | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| PathSet readStorePaths(Source & from) | static PathSet readStorePaths(Source & from) | ||||||
| { | { | ||||||
|     PathSet paths = readStringSet(from); |     PathSet paths = readStringSet(from); | ||||||
|     for (PathSet::iterator i = paths.begin(); i != paths.end(); ++i) |     for (PathSet::iterator i = paths.begin(); i != paths.end(); ++i) | ||||||
|  | @ -27,36 +27,56 @@ PathSet readStorePaths(Source & from) | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| void processConnection(Source & from, Sink & to) | static Sink * _to; /* !!! should make writeToStderr an object */ | ||||||
|  | bool canSendStderr; | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | static void tunnelStderr(const unsigned char * buf, size_t count) | ||||||
| { | { | ||||||
|     store = boost::shared_ptr<StoreAPI>(new LocalStore(true)); |     writeFull(STDERR_FILENO, buf, count); | ||||||
|  |     if (canSendStderr) { | ||||||
|  |         try { | ||||||
|  |             writeInt(STDERR_NEXT, *_to); | ||||||
|  |             writeString(string((char *) buf, count), *_to); | ||||||
|  |         } catch (...) { | ||||||
|  |             /* Write failed; that means that the other side is
 | ||||||
|  |                gone. */ | ||||||
|  |             canSendStderr = false; | ||||||
|  |             throw; | ||||||
|  |         } | ||||||
|  |     } | ||||||
|  | } | ||||||
| 
 | 
 | ||||||
|     unsigned int magic = readInt(from); |  | ||||||
|     if (magic != WORKER_MAGIC_1) throw Error("protocol mismatch"); |  | ||||||
| 
 | 
 | ||||||
|     writeInt(WORKER_MAGIC_2, to); | /* startWork() means that we're starting an operation for which we
 | ||||||
|  |    want to send out stderr to the client. */ | ||||||
|  | static void startWork() | ||||||
|  | { | ||||||
|  |     canSendStderr = true; | ||||||
|  | } | ||||||
| 
 | 
 | ||||||
|     debug("greeting exchanged"); |  | ||||||
| 
 | 
 | ||||||
|     bool quit = false; | /* stopWork() means that we're done; stop sending stderr to the
 | ||||||
|  |    client. */ | ||||||
|  | static void stopWork() | ||||||
|  | { | ||||||
|  |     canSendStderr = false; | ||||||
|  |     writeInt(STDERR_LAST, *_to); | ||||||
|  | } | ||||||
| 
 | 
 | ||||||
|     unsigned int opCount = 0; |  | ||||||
|      |  | ||||||
|     do { |  | ||||||
|          |  | ||||||
|         WorkerOp op = (WorkerOp) readInt(from); |  | ||||||
| 
 |  | ||||||
|         opCount++; |  | ||||||
| 
 | 
 | ||||||
|  | static void performOp(Source & from, Sink & to, unsigned int op) | ||||||
|  | { | ||||||
|     switch (op) { |     switch (op) { | ||||||
| 
 | 
 | ||||||
|  | #if 0        
 | ||||||
|     case wopQuit: { |     case wopQuit: { | ||||||
|         /* Close the database. */ |         /* Close the database. */ | ||||||
|         store.reset((StoreAPI *) 0); |         store.reset((StoreAPI *) 0); | ||||||
|         writeInt(1, to); |         writeInt(1, to); | ||||||
|             quit = true; |  | ||||||
|         break; |         break; | ||||||
|     } |     } | ||||||
|  | #endif | ||||||
| 
 | 
 | ||||||
|     case wopIsValidPath: { |     case wopIsValidPath: { | ||||||
|         Path path = readStorePath(from); |         Path path = readStorePath(from); | ||||||
|  | @ -115,7 +135,9 @@ void processConnection(Source & from, Sink & to) | ||||||
| 
 | 
 | ||||||
|     case wopBuildDerivations: { |     case wopBuildDerivations: { | ||||||
|         PathSet drvs = readStorePaths(from); |         PathSet drvs = readStorePaths(from); | ||||||
|  |         startWork(); | ||||||
|         store->buildDerivations(drvs); |         store->buildDerivations(drvs); | ||||||
|  |         stopWork(); | ||||||
|         writeInt(1, to); |         writeInt(1, to); | ||||||
|         break; |         break; | ||||||
|     } |     } | ||||||
|  | @ -143,6 +165,39 @@ void processConnection(Source & from, Sink & to) | ||||||
|     default: |     default: | ||||||
|         throw Error(format("invalid operation %1%") % op); |         throw Error(format("invalid operation %1%") % op); | ||||||
|     } |     } | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | static void processConnection(Source & from, Sink & to) | ||||||
|  | { | ||||||
|  |     store = boost::shared_ptr<StoreAPI>(new LocalStore(true)); | ||||||
|  | 
 | ||||||
|  |     unsigned int magic = readInt(from); | ||||||
|  |     if (magic != WORKER_MAGIC_1) throw Error("protocol mismatch"); | ||||||
|  | 
 | ||||||
|  |     writeInt(WORKER_MAGIC_2, to); | ||||||
|  | 
 | ||||||
|  |     debug("greeting exchanged"); | ||||||
|  | 
 | ||||||
|  |     _to = &to; | ||||||
|  |     canSendStderr = false; | ||||||
|  |     writeToStderr = tunnelStderr; | ||||||
|  | 
 | ||||||
|  |     bool quit = false; | ||||||
|  | 
 | ||||||
|  |     unsigned int opCount = 0; | ||||||
|  |      | ||||||
|  |     do { | ||||||
|  |         WorkerOp op = (WorkerOp) readInt(from); | ||||||
|  | 
 | ||||||
|  |         opCount++; | ||||||
|  | 
 | ||||||
|  |         try { | ||||||
|  |             performOp(from, to, op); | ||||||
|  |         } catch (Error & e) { | ||||||
|  |             writeInt(STDERR_ERROR, *_to); | ||||||
|  |             writeString(e.msg(), to); | ||||||
|  |         } | ||||||
| 
 | 
 | ||||||
|     } while (!quit); |     } while (!quit); | ||||||
| 
 | 
 | ||||||
|  |  | ||||||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue