downloader: use priority_queue
This commit is contained in:
		
							parent
							
								
									ae8884b949
								
							
						
					
					
						commit
						efb938468c
					
				
					 1 changed files with 17 additions and 12 deletions
				
			
		|  | @ -10,6 +10,7 @@ | |||
| 
 | ||||
| #include <curl/curl.h> | ||||
| 
 | ||||
| #include <queue> | ||||
| #include <iostream> | ||||
| #include <thread> | ||||
| #include <cmath> | ||||
|  | @ -281,8 +282,13 @@ struct CurlDownloader : public Downloader | |||
| 
 | ||||
|     struct State | ||||
|     { | ||||
|         struct EmbargoComparator { | ||||
|             bool operator() (const std::shared_ptr<DownloadItem> & i1, const std::shared_ptr<DownloadItem> & i2) { | ||||
|                 return i1->embargo > i2->embargo; | ||||
|             } | ||||
|         }; | ||||
|         bool quit = false; | ||||
|         std::vector<std::shared_ptr<DownloadItem>> incoming; | ||||
|         std::priority_queue<std::shared_ptr<DownloadItem>, std::vector<std::shared_ptr<DownloadItem>>, EmbargoComparator> incoming; | ||||
|     }; | ||||
| 
 | ||||
|     Sync<State> state_; | ||||
|  | @ -380,9 +386,7 @@ struct CurlDownloader : public Downloader | |||
| 
 | ||||
|             /* Add new curl requests from the incoming requests queue,
 | ||||
|                except for requests that are embargoed (waiting for a | ||||
|                retry timeout to expire). FIXME: should use a priority | ||||
|                queue for the embargoed items to prevent repeated O(n) | ||||
|                checks. */ | ||||
|                retry timeout to expire). */ | ||||
|             if (extraFDs[0].revents & CURL_WAIT_POLLIN) { | ||||
|                 char buf[1024]; | ||||
|                 auto res = read(extraFDs[0].fd, buf, sizeof(buf)); | ||||
|  | @ -390,22 +394,23 @@ struct CurlDownloader : public Downloader | |||
|                     throw SysError("reading curl wakeup socket"); | ||||
|             } | ||||
| 
 | ||||
|             std::vector<std::shared_ptr<DownloadItem>> incoming, embargoed; | ||||
|             std::vector<std::shared_ptr<DownloadItem>> incoming; | ||||
|             auto now = std::chrono::steady_clock::now(); | ||||
| 
 | ||||
|             { | ||||
|                 auto state(state_.lock()); | ||||
|                 for (auto & item: state->incoming) { | ||||
|                     if (item->embargo <= now) | ||||
|                 while (!state->incoming.empty()) { | ||||
|                     auto item = state->incoming.top(); | ||||
|                     if (item->embargo <= now) { | ||||
|                         incoming.push_back(item); | ||||
|                     else { | ||||
|                         embargoed.push_back(item); | ||||
|                         state->incoming.pop(); | ||||
|                     } else { | ||||
|                         if (nextWakeup == std::chrono::steady_clock::time_point() | ||||
|                             || item->embargo < nextWakeup) | ||||
|                             nextWakeup = item->embargo; | ||||
|                         break; | ||||
|                     } | ||||
|                 } | ||||
|                 state->incoming = embargoed; | ||||
|                 quit = state->quit; | ||||
|             } | ||||
| 
 | ||||
|  | @ -432,7 +437,7 @@ struct CurlDownloader : public Downloader | |||
| 
 | ||||
|         { | ||||
|             auto state(state_.lock()); | ||||
|             state->incoming.clear(); | ||||
|             while (!state->incoming.empty()) state->incoming.pop(); | ||||
|             state->quit = true; | ||||
|         } | ||||
|     } | ||||
|  | @ -443,7 +448,7 @@ struct CurlDownloader : public Downloader | |||
|             auto state(state_.lock()); | ||||
|             if (state->quit) | ||||
|                 throw nix::Error("cannot enqueue download request because the download thread is shutting down"); | ||||
|             state->incoming.push_back(item); | ||||
|             state->incoming.push(item); | ||||
|         } | ||||
|         writeFull(wakeupPipe.writeSide.get(), " "); | ||||
|     } | ||||
|  |  | |||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue