fix(server): Thread request context to all relevant places
Previously background contexts where created where necessary (e.g. in GCS interactions). Should I begin to use request timeouts or other context-dependent things in the future, it's useful to have the actual HTTP request context around. This threads the request context through the application to all places that need it.
This commit is contained in:
		
							parent
							
								
									30e618b65b
								
							
						
					
					
						commit
						d8fba23365
					
				
					 6 changed files with 20 additions and 23 deletions
				
			
		|  | @ -375,7 +375,7 @@ func (b *byteCounter) Write(p []byte) (n int, err error) { | |||
| // image manifest. | ||||
| func uploadHashLayer(ctx context.Context, s *State, key string, lw layerWriter) (*manifest.Entry, error) { | ||||
| 	path := "staging/" + key | ||||
| 	sha256sum, size, err := s.Storage.Persist(path, func(sw io.Writer) (string, int64, error) { | ||||
| 	sha256sum, size, err := s.Storage.Persist(ctx, path, func(sw io.Writer) (string, int64, error) { | ||||
| 		// Sets up a "multiwriter" that simultaneously runs both hash | ||||
| 		// algorithms and uploads to the storage backend. | ||||
| 		shasum := sha256.New() | ||||
|  | @ -399,7 +399,7 @@ func uploadHashLayer(ctx context.Context, s *State, key string, lw layerWriter) | |||
| 
 | ||||
| 	// Hashes are now known and the object is in the bucket, what | ||||
| 	// remains is to move it to the correct location and cache it. | ||||
| 	err = s.Storage.Move("staging/"+key, "layers/"+sha256sum) | ||||
| 	err = s.Storage.Move(ctx, "staging/"+key, "layers/"+sha256sum) | ||||
| 	if err != nil { | ||||
| 		log.WithError(err).WithField("layer", key). | ||||
| 			Error("failed to move layer from staging") | ||||
|  |  | |||
|  | @ -120,7 +120,7 @@ func manifestFromCache(ctx context.Context, s *State, key string) (json.RawMessa | |||
| 		return m, true | ||||
| 	} | ||||
| 
 | ||||
| 	r, err := s.Storage.Fetch("manifests/" + key) | ||||
| 	r, err := s.Storage.Fetch(ctx, "manifests/"+key) | ||||
| 	if err != nil { | ||||
| 		log.WithError(err).WithFields(log.Fields{ | ||||
| 			"manifest": key, | ||||
|  | @ -152,7 +152,7 @@ func cacheManifest(ctx context.Context, s *State, key string, m json.RawMessage) | |||
| 	go s.Cache.localCacheManifest(key, m) | ||||
| 
 | ||||
| 	path := "manifests/" + key | ||||
| 	_, size, err := s.Storage.Persist(path, func(w io.Writer) (string, int64, error) { | ||||
| 	_, size, err := s.Storage.Persist(ctx, path, func(w io.Writer) (string, int64, error) { | ||||
| 		size, err := io.Copy(w, bytes.NewReader([]byte(m))) | ||||
| 		return "", size, err | ||||
| 	}) | ||||
|  | @ -180,7 +180,7 @@ func layerFromCache(ctx context.Context, s *State, key string) (*manifest.Entry, | |||
| 		return entry, true | ||||
| 	} | ||||
| 
 | ||||
| 	r, err := s.Storage.Fetch("builds/" + key) | ||||
| 	r, err := s.Storage.Fetch(ctx, "builds/"+key) | ||||
| 	if err != nil { | ||||
| 		log.WithError(err).WithFields(log.Fields{ | ||||
| 			"layer":   key, | ||||
|  | @ -220,7 +220,7 @@ func cacheLayer(ctx context.Context, s *State, key string, entry manifest.Entry) | |||
| 
 | ||||
| 	j, _ := json.Marshal(&entry) | ||||
| 	path := "builds/" + key | ||||
| 	_, _, err := s.Storage.Persist(path, func(w io.Writer) (string, int64, error) { | ||||
| 	_, _, err := s.Storage.Persist(ctx, path, func(w io.Writer) (string, int64, error) { | ||||
| 		size, err := io.Copy(w, bytes.NewReader(j)) | ||||
| 		return "", size, err | ||||
| 	}) | ||||
|  |  | |||
|  | @ -26,7 +26,6 @@ | |||
| package main | ||||
| 
 | ||||
| import ( | ||||
| 	"context" | ||||
| 	"encoding/json" | ||||
| 	"fmt" | ||||
| 	"io/ioutil" | ||||
|  | @ -110,7 +109,6 @@ func writeError(w http.ResponseWriter, status int, code, message string) { | |||
| } | ||||
| 
 | ||||
| type registryHandler struct { | ||||
| 	ctx   context.Context | ||||
| 	state *builder.State | ||||
| } | ||||
| 
 | ||||
|  | @ -132,7 +130,7 @@ func (h *registryHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { | |||
| 		}).Info("requesting image manifest") | ||||
| 
 | ||||
| 		image := builder.ImageFromName(imageName, imageTag) | ||||
| 		buildResult, err := builder.BuildImage(h.ctx, h.state, &image) | ||||
| 		buildResult, err := builder.BuildImage(r.Context(), h.state, &image) | ||||
| 
 | ||||
| 		if err != nil { | ||||
| 			writeError(w, 500, "UNKNOWN", "image build failure") | ||||
|  | @ -211,7 +209,6 @@ func main() { | |||
| 
 | ||||
| 	log.WithField("backend", s.Name()).Info("initialised storage backend") | ||||
| 
 | ||||
| 	ctx := context.Background() | ||||
| 	cache, err := builder.NewCache() | ||||
| 	if err != nil { | ||||
| 		log.WithError(err).Fatal("failed to instantiate build cache") | ||||
|  | @ -240,7 +237,6 @@ func main() { | |||
| 
 | ||||
| 	// All /v2/ requests belong to the registry handler. | ||||
| 	http.Handle("/v2/", ®istryHandler{ | ||||
| 		ctx:   ctx, | ||||
| 		state: &state, | ||||
| 	}) | ||||
| 
 | ||||
|  |  | |||
|  | @ -2,6 +2,7 @@ | |||
| package storage | ||||
| 
 | ||||
| import ( | ||||
| 	"context" | ||||
| 	"fmt" | ||||
| 	"io" | ||||
| 	"net/http" | ||||
|  | @ -34,7 +35,7 @@ func (b *FSBackend) Name() string { | |||
| 	return fmt.Sprintf("Filesystem (%s)", b.path) | ||||
| } | ||||
| 
 | ||||
| func (b *FSBackend) Persist(key string, f func(io.Writer) (string, int64, error)) (string, int64, error) { | ||||
| func (b *FSBackend) Persist(ctx context.Context, key string, f Persister) (string, int64, error) { | ||||
| 	full := path.Join(b.path, key) | ||||
| 	dir := path.Dir(full) | ||||
| 	err := os.MkdirAll(dir, 0755) | ||||
|  | @ -53,12 +54,12 @@ func (b *FSBackend) Persist(key string, f func(io.Writer) (string, int64, error) | |||
| 	return f(file) | ||||
| } | ||||
| 
 | ||||
| func (b *FSBackend) Fetch(key string) (io.ReadCloser, error) { | ||||
| func (b *FSBackend) Fetch(ctx context.Context, key string) (io.ReadCloser, error) { | ||||
| 	full := path.Join(b.path, key) | ||||
| 	return os.Open(full) | ||||
| } | ||||
| 
 | ||||
| func (b *FSBackend) Move(old, new string) error { | ||||
| func (b *FSBackend) Move(ctx context.Context, old, new string) error { | ||||
| 	newpath := path.Join(b.path, new) | ||||
| 	err := os.MkdirAll(path.Dir(newpath), 0755) | ||||
| 	if err != nil { | ||||
|  |  | |||
|  | @ -66,8 +66,7 @@ func (b *GCSBackend) Name() string { | |||
| 	return "Google Cloud Storage (" + b.bucket + ")" | ||||
| } | ||||
| 
 | ||||
| func (b *GCSBackend) Persist(path string, f func(io.Writer) (string, int64, error)) (string, int64, error) { | ||||
| 	ctx := context.Background() | ||||
| func (b *GCSBackend) Persist(ctx context.Context, path string, f Persister) (string, int64, error) { | ||||
| 	obj := b.handle.Object(path) | ||||
| 	w := obj.NewWriter(ctx) | ||||
| 
 | ||||
|  | @ -80,8 +79,7 @@ func (b *GCSBackend) Persist(path string, f func(io.Writer) (string, int64, erro | |||
| 	return hash, size, w.Close() | ||||
| } | ||||
| 
 | ||||
| func (b *GCSBackend) Fetch(path string) (io.ReadCloser, error) { | ||||
| 	ctx := context.Background() | ||||
| func (b *GCSBackend) Fetch(ctx context.Context, path string) (io.ReadCloser, error) { | ||||
| 	obj := b.handle.Object(path) | ||||
| 
 | ||||
| 	// Probe whether the file exists before trying to fetch it | ||||
|  | @ -98,8 +96,7 @@ func (b *GCSBackend) Fetch(path string) (io.ReadCloser, error) { | |||
| // | ||||
| // The Go API for Cloud Storage does not support renaming objects, but | ||||
| // the HTTP API does. The code below makes the relevant call manually. | ||||
| func (b *GCSBackend) Move(old, new string) error { | ||||
| 	ctx := context.Background() | ||||
| func (b *GCSBackend) Move(ctx context.Context, old, new string) error { | ||||
| 	creds, err := google.FindDefaultCredentials(ctx, gcsScope) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
|  |  | |||
|  | @ -4,10 +4,13 @@ | |||
| package storage | ||||
| 
 | ||||
| import ( | ||||
| 	"context" | ||||
| 	"io" | ||||
| 	"net/http" | ||||
| ) | ||||
| 
 | ||||
| type Persister = func(io.Writer) (string, int64, error) | ||||
| 
 | ||||
| type Backend interface { | ||||
| 	// Name returns the name of the storage backend, for use in | ||||
| 	// log messages and such. | ||||
|  | @ -19,14 +22,14 @@ type Backend interface { | |||
| 	// It needs to return the SHA256 hash of the data written as | ||||
| 	// well as the total number of bytes, as those are required | ||||
| 	// for the image manifest. | ||||
| 	Persist(string, func(io.Writer) (string, int64, error)) (string, int64, error) | ||||
| 	Persist(context.Context, string, Persister) (string, int64, error) | ||||
| 
 | ||||
| 	// Fetch retrieves data from the storage backend. | ||||
| 	Fetch(path string) (io.ReadCloser, error) | ||||
| 	Fetch(ctx context.Context, path string) (io.ReadCloser, error) | ||||
| 
 | ||||
| 	// Move renames a path inside the storage backend. This is | ||||
| 	// used for staging uploads while calculating their hashes. | ||||
| 	Move(old, new string) error | ||||
| 	Move(ctx context.Context, old, new string) error | ||||
| 
 | ||||
| 	// Serve provides a handler function to serve HTTP requests | ||||
| 	// for layers in the storage backend. | ||||
|  |  | |||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue