Adds a Nix cache proxy which can be used to send a Nix cache lookup to the first available cache that has the given NAR. We will use this for dynamically created builders. Relates to b/432. Change-Id: If970d2393e43ba032b5b7d653f2b92f6ac0eab63 Reviewed-on: https://cl.tvl.fyi/c/depot/+/12949 Tested-by: BuildkiteCI Autosubmit: tazjin <tazjin@tvl.su> Reviewed-by: sterni <sternenseemann@systemli.org>
		
			
				
	
	
		
			151 lines
		
	
	
	
		
			3.4 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			151 lines
		
	
	
	
		
			3.4 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| // Package proxy implements logic for proxying narinfo requests to upstream
 | |
| // caches, and modifying their responses to let hosts fetch the required data
 | |
| // directly from upstream.
 | |
| package proxy
 | |
| 
 | |
| import (
 | |
| 	"context"
 | |
| 	"errors"
 | |
| 	"fmt"
 | |
| 	"io"
 | |
| 	"log/slog"
 | |
| 	"net/http"
 | |
| 	"strings"
 | |
| 	"sync/atomic"
 | |
| 	"time"
 | |
| 
 | |
| 	"tvl.fyi/ops/builderball/config"
 | |
| 	"tvl.fyi/ops/builderball/discovery"
 | |
| )
 | |
| 
 | |
| var hits atomic.Uint64
 | |
| var misses atomic.Uint64
 | |
| 
 | |
| func GetStats() (uint64, uint64) {
 | |
| 	return hits.Swap(0), misses.Swap(0)
 | |
| }
 | |
| 
 | |
| type narinfo struct {
 | |
| 	body string
 | |
| 	url  string
 | |
| }
 | |
| 
 | |
| // fetchNarinfoWithAbsoluteURL contacts the cache at baseURL to see if it has
 | |
| // the given NAR, and if so returns the narinfo with the URL pointing to the
 | |
| // *absolute* address of the cache. Nix will follow the absolute URL for
 | |
| // downloads.
 | |
| func fetchNarinfoWithAbsoluteURL(ctx context.Context, r *http.Request, baseURL string) *narinfo {
 | |
| 	url := baseURL + r.URL.Path
 | |
| 	slog.Debug("querying upstream cache", "url", url)
 | |
| 
 | |
| 	req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
 | |
| 
 | |
| 	if *config.CacheHost != "" {
 | |
| 		req.Header.Add("Host", *config.CacheHost)
 | |
| 	}
 | |
| 
 | |
| 	if err != nil {
 | |
| 		slog.Warn("could not create cache lookup request", "cache", baseURL, "error", err.Error())
 | |
| 		return nil
 | |
| 	}
 | |
| 
 | |
| 	resp, err := http.DefaultClient.Do(req)
 | |
| 	if err != nil {
 | |
| 		if errors.Is(err, context.Canceled) {
 | |
| 			slog.Debug("cancelled lookup to cache", "url", baseURL)
 | |
| 		} else if errors.Is(err, context.DeadlineExceeded) {
 | |
| 			slog.Info("cache timed out", "cache", baseURL)
 | |
| 		} else {
 | |
| 			slog.Warn("could not query cache", "cache", baseURL, "error", err.Error())
 | |
| 		}
 | |
| 
 | |
| 		return nil
 | |
| 	}
 | |
| 
 | |
| 	defer resp.Body.Close()
 | |
| 
 | |
| 	if resp.StatusCode != http.StatusOK {
 | |
| 		slog.Debug("upstream cache responded with non-OK status", "status", resp.Status)
 | |
| 		return nil
 | |
| 	}
 | |
| 
 | |
| 	content, err := io.ReadAll(resp.Body)
 | |
| 	if err != nil {
 | |
| 		slog.Warn("could not read upstream response", "error", err.Error())
 | |
| 		return nil
 | |
| 	}
 | |
| 
 | |
| 	result := new(narinfo)
 | |
| 	lines := strings.Split(string(content), "\n")
 | |
| 	for i, line := range lines {
 | |
| 		if strings.HasPrefix(line, "URL: ") {
 | |
| 			result.url = baseURL + "/" + strings.TrimPrefix(line, "URL: ")
 | |
| 			lines[i] = "URL: " + result.url
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	result.body = strings.Join(lines, "\n")
 | |
| 
 | |
| 	return result
 | |
| }
 | |
| 
 | |
| func findInCaches(r *http.Request, caches []string) *narinfo {
 | |
| 	slog.Debug("querying caches", "caches", caches)
 | |
| 	ctx, cancel := context.WithTimeout(r.Context(), 1*time.Second)
 | |
| 	defer cancel()
 | |
| 
 | |
| 	result := make(chan *narinfo, len(caches))
 | |
| 
 | |
| 	for _, cacheURL := range caches {
 | |
| 		go func(baseURL string) {
 | |
| 			result <- fetchNarinfoWithAbsoluteURL(ctx, r, baseURL)
 | |
| 		}(cacheURL)
 | |
| 	}
 | |
| 
 | |
| 	remaining := len(caches)
 | |
| 	for remaining > 0 {
 | |
| 		select {
 | |
| 		case <-ctx.Done():
 | |
| 			return nil
 | |
| 		case r := <-result:
 | |
| 			if r != nil {
 | |
| 				return r
 | |
| 			}
 | |
| 
 | |
| 			remaining--
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func Handler(w http.ResponseWriter, r *http.Request) {
 | |
| 	// Only handle narinfo requests
 | |
| 	if !strings.HasSuffix(r.URL.Path, ".narinfo") {
 | |
| 		slog.Warn("received non-narinfo request", "path", r.URL.Path)
 | |
| 		http.NotFound(w, r)
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	b := discovery.GetCaches()
 | |
| 	if len(b) == 0 {
 | |
| 		slog.Warn("no upstream caches available")
 | |
| 		http.NotFound(w, r)
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	narinfo := findInCaches(r, b)
 | |
| 	if narinfo == nil {
 | |
| 		misses.Add(1)
 | |
| 		slog.Debug("no cache had store path", "path", r.URL.Path, "caches", b)
 | |
| 		http.NotFound(w, r)
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	slog.Debug("cache hit", "url", narinfo.url)
 | |
| 	hits.Add(1)
 | |
| 
 | |
| 	w.Header().Set("Content-Type", "text/x-nix-narinfo")
 | |
| 	w.Header().Set("nix-link", narinfo.url)
 | |
| 	fmt.Fprint(w, narinfo.body)
 | |
| }
 |