feat(server): Implement creation of layer tarballs in the server
This will create, upload and hash the layer tarballs in one disk read.
This commit is contained in:
		
							parent
							
								
									1124b8c236
								
							
						
					
					
						commit
						6b06fe27be
					
				
					 2 changed files with 160 additions and 81 deletions
				
			
		
							
								
								
									
										92
									
								
								tools/nixery/server/builder/archive.go
									
										
									
									
									
										Normal file
									
								
							
							
						
						
									
										92
									
								
								tools/nixery/server/builder/archive.go
									
										
									
									
									
										Normal file
									
								
							|  | @ -0,0 +1,92 @@ | |||
| package builder | ||||
| 
 | ||||
| // This file implements logic for walking through a directory and creating a | ||||
| // tarball of it. | ||||
| // | ||||
| // The tarball is written straight to the supplied reader, which makes it | ||||
| // possible to create an image layer from the specified store paths, hash it and | ||||
| // upload it in one reading pass. | ||||
| 
 | ||||
| import ( | ||||
| 	"archive/tar" | ||||
| 	"io" | ||||
| 	"log" | ||||
| 	"os" | ||||
| 	"path/filepath" | ||||
| 
 | ||||
| 	"github.com/google/nixery/layers" | ||||
| ) | ||||
| 
 | ||||
| // Create a new tarball from each of the paths in the list and write the tarball | ||||
| // to the supplied writer. | ||||
| func tarStorePaths(l *layers.Layer, w io.Writer) error { | ||||
| 	t := tar.NewWriter(w) | ||||
| 
 | ||||
| 	for _, path := range l.Contents { | ||||
| 		err := filepath.Walk(path, tarStorePath(t)) | ||||
| 		if err != nil { | ||||
| 			return err | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	if err := t.Close(); err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 
 | ||||
| 	log.Printf("Created layer for '%s'\n", l.Hash()) | ||||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| func tarStorePath(w *tar.Writer) filepath.WalkFunc { | ||||
| 	return func(path string, info os.FileInfo, err error) error { | ||||
| 		if err != nil { | ||||
| 			return err | ||||
| 		} | ||||
| 
 | ||||
| 		// If the entry is not a symlink or regular file, skip it. | ||||
| 		if info.Mode()&os.ModeSymlink == 0 && !info.Mode().IsRegular() { | ||||
| 			return nil | ||||
| 		} | ||||
| 
 | ||||
| 		// the symlink target is read if this entry is a symlink, as it | ||||
| 		// is required when creating the file header | ||||
| 		var link string | ||||
| 		if info.Mode()&os.ModeSymlink != 0 { | ||||
| 			link, err = os.Readlink(path) | ||||
| 			if err != nil { | ||||
| 				return err | ||||
| 			} | ||||
| 		} | ||||
| 
 | ||||
| 		header, err := tar.FileInfoHeader(info, link) | ||||
| 		if err != nil { | ||||
| 			return err | ||||
| 		} | ||||
| 
 | ||||
| 		// The name retrieved from os.FileInfo only contains the file's | ||||
| 		// basename, but the full path is required within the layer | ||||
| 		// tarball. | ||||
| 		header.Name = path | ||||
| 		if err = w.WriteHeader(header); err != nil { | ||||
| 			return err | ||||
| 		} | ||||
| 
 | ||||
| 		// At this point, return if no file content needs to be written | ||||
| 		if !info.Mode().IsRegular() { | ||||
| 			return nil | ||||
| 		} | ||||
| 
 | ||||
| 		f, err := os.Open(path) | ||||
| 		if err != nil { | ||||
| 			return err | ||||
| 		} | ||||
| 
 | ||||
| 		if _, err := io.Copy(w, f); err != nil { | ||||
| 			return err | ||||
| 		} | ||||
| 
 | ||||
| 		f.Close() | ||||
| 
 | ||||
| 		return nil | ||||
| 	} | ||||
| } | ||||
|  | @ -232,99 +232,55 @@ func prepareLayers(ctx context.Context, s *State, image *Image, result *ImageRes | |||
| 	grouped := layers.Group(&result.Graph, &s.Pop, LayerBudget) | ||||
| 
 | ||||
| 	var entries []manifest.Entry | ||||
| 	var missing []layers.Layer | ||||
| 
 | ||||
| 	// Splits the layers into those which are already present in | ||||
| 	// the cache, and those that are missing (i.e. need to be | ||||
| 	// built by Nix). | ||||
| 	// the cache, and those that are missing. | ||||
| 	// | ||||
| 	// Missing layers are built and uploaded to the storage | ||||
| 	// bucket. | ||||
| 	for _, l := range grouped { | ||||
| 		if entry, cached := layerFromCache(ctx, s, l.Hash()); cached { | ||||
| 			entries = append(entries, *entry) | ||||
| 		} else { | ||||
| 			missing = append(missing, l) | ||||
| 		} | ||||
| 	} | ||||
| 			lw := func(w io.Writer) error { | ||||
| 				return tarStorePaths(&l, w) | ||||
| 			} | ||||
| 
 | ||||
| 	built, err := buildLayers(s, image, missing) | ||||
| 	if err != nil { | ||||
| 		log.Printf("Failed to build missing layers: %s\n", err) | ||||
| 		return nil, err | ||||
| 			entry, err := uploadHashLayer(ctx, s, l.Hash(), lw) | ||||
| 			if err != nil { | ||||
| 				return nil, err | ||||
| 			} | ||||
| 
 | ||||
| 			go cacheLayer(ctx, s, l.Hash(), *entry) | ||||
| 			entries = append(entries, *entry) | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	// Symlink layer (built in the first Nix build) needs to be | ||||
| 	// included when hashing & uploading | ||||
| 	built[result.SymlinkLayer.SHA256] = result.SymlinkLayer.Path | ||||
| 
 | ||||
| 	for key, path := range built { | ||||
| 		f, err := os.Open(path) | ||||
| 	// included here manually: | ||||
| 	slkey := result.SymlinkLayer.SHA256 | ||||
| 	entry, err := uploadHashLayer(ctx, s, slkey, func(w io.Writer) error { | ||||
| 		f, err := os.Open(result.SymlinkLayer.Path) | ||||
| 		if err != nil { | ||||
| 			log.Printf("failed to open layer at '%s': %s\n", path, err) | ||||
| 			return nil, err | ||||
| 			log.Printf("failed to upload symlink layer '%s': %s\n", slkey, err) | ||||
| 			return err | ||||
| 		} | ||||
| 		defer f.Close() | ||||
| 
 | ||||
| 		entry, err := uploadHashLayer(ctx, s, key, f) | ||||
| 		f.Close() | ||||
| 		if err != nil { | ||||
| 			return nil, err | ||||
| 		} | ||||
| 		_, err = io.Copy(w, f) | ||||
| 		return err | ||||
| 	}) | ||||
| 
 | ||||
| 		entries = append(entries, *entry) | ||||
| 		go cacheLayer(ctx, s, key, *entry) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 
 | ||||
| 	go cacheLayer(ctx, s, slkey, *entry) | ||||
| 	entries = append(entries, *entry) | ||||
| 
 | ||||
| 	return entries, nil | ||||
| } | ||||
| 
 | ||||
| // Builds remaining layers (those not already cached) via Nix. | ||||
| func buildLayers(s *State, image *Image, grouped []layers.Layer) (map[string]string, error) { | ||||
| 	result := make(map[string]string) | ||||
| 	if len(grouped) == 0 { | ||||
| 		return result, nil | ||||
| 	} | ||||
| 
 | ||||
| 	srcType, srcArgs := s.Cfg.Pkgs.Render(image.Tag) | ||||
| 	args := []string{ | ||||
| 		"--argstr", "srcType", srcType, | ||||
| 		"--argstr", "srcArgs", srcArgs, | ||||
| 	} | ||||
| 
 | ||||
| 	layerInput := make(map[string][]string) | ||||
| 	allPaths := []string{} | ||||
| 	for _, l := range grouped { | ||||
| 		layerInput[l.Hash()] = l.Contents | ||||
| 
 | ||||
| 		// The derivation responsible for building layers does not | ||||
| 		// have the derivations that resulted in the required store | ||||
| 		// paths in its context, which means that its sandbox will not | ||||
| 		// contain the necessary paths if sandboxing is enabled. | ||||
| 		// | ||||
| 		// To work around this, all required store paths are added as | ||||
| 		// 'extra-sandbox-paths' parameters. | ||||
| 		for _, p := range l.Contents { | ||||
| 			allPaths = append(allPaths, p) | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	args = append(args, "--option", "extra-sandbox-paths", strings.Join(allPaths, " ")) | ||||
| 
 | ||||
| 	j, _ := json.Marshal(layerInput) | ||||
| 	args = append(args, "--argstr", "layers", string(j)) | ||||
| 
 | ||||
| 	output, err := callNix("nixery-build-layers", image.Name, args) | ||||
| 	if err != nil { | ||||
| 		log.Printf("failed to call nixery-build-layers: %s\n", err) | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	log.Printf("Finished layer preparation for '%s' via Nix\n", image.Name) | ||||
| 
 | ||||
| 	err = json.Unmarshal(output, &result) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 
 | ||||
| 	return result, nil | ||||
| } | ||||
| 
 | ||||
| // renameObject renames an object in the specified Cloud Storage | ||||
| // bucket. | ||||
| // | ||||
|  | @ -368,7 +324,30 @@ func renameObject(ctx context.Context, s *State, old, new string) error { | |||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| // Upload a to the storage bucket, while hashing it at the same time. | ||||
| // layerWriter is the type for functions that can write a layer to the | ||||
| // multiwriter used for uploading & hashing. | ||||
| // | ||||
| // This type exists to avoid duplication between the handling of | ||||
| // symlink layers and store path layers. | ||||
| type layerWriter func(w io.Writer) error | ||||
| 
 | ||||
| // byteCounter is a special io.Writer that counts all bytes written to | ||||
| // it and does nothing else. | ||||
| // | ||||
| // This is required because the ad-hoc writing of tarballs leaves no | ||||
| // single place to count the final tarball size otherwise. | ||||
| type byteCounter struct { | ||||
| 	count int64 | ||||
| } | ||||
| 
 | ||||
| func (b *byteCounter) Write(p []byte) (n int, err error) { | ||||
| 	b.count += int64(len(p)) | ||||
| 	return len(p), nil | ||||
| } | ||||
| 
 | ||||
| // Upload a layer tarball to the storage bucket, while hashing it at | ||||
| // the same time. The supplied function is expected to provide the | ||||
| // layer data to the writer. | ||||
| // | ||||
| // The initial upload is performed in a 'staging' folder, as the | ||||
| // SHA256-hash is not yet available when the upload is initiated. | ||||
|  | @ -378,24 +357,24 @@ func renameObject(ctx context.Context, s *State, old, new string) error { | |||
| // | ||||
| // The return value is the layer's SHA256 hash, which is used in the | ||||
| // image manifest. | ||||
| func uploadHashLayer(ctx context.Context, s *State, key string, data io.Reader) (*manifest.Entry, error) { | ||||
| func uploadHashLayer(ctx context.Context, s *State, key string, lw layerWriter) (*manifest.Entry, error) { | ||||
| 	staging := s.Bucket.Object("staging/" + key) | ||||
| 
 | ||||
| 	// Sets up a "multiwriter" that simultaneously runs both hash | ||||
| 	// algorithms and uploads to the bucket | ||||
| 	sw := staging.NewWriter(ctx) | ||||
| 	shasum := sha256.New() | ||||
| 	multi := io.MultiWriter(sw, shasum) | ||||
| 	counter := &byteCounter{} | ||||
| 	multi := io.MultiWriter(sw, shasum, counter) | ||||
| 
 | ||||
| 	size, err := io.Copy(multi, data) | ||||
| 	err := lw(multi) | ||||
| 	if err != nil { | ||||
| 		log.Printf("failed to upload layer '%s' to staging: %s\n", key, err) | ||||
| 		log.Printf("failed to create and upload layer '%s': %s\n", key, err) | ||||
| 		return nil, err | ||||
| 	} | ||||
| 
 | ||||
| 	if err = sw.Close(); err != nil { | ||||
| 		log.Printf("failed to upload layer '%s' to staging: %s\n", key, err) | ||||
| 		return nil, err | ||||
| 	} | ||||
| 
 | ||||
| 	sha256sum := fmt.Sprintf("%x", shasum.Sum([]byte{})) | ||||
|  | @ -408,6 +387,7 @@ func uploadHashLayer(ctx context.Context, s *State, key string, data io.Reader) | |||
| 		return nil, err | ||||
| 	} | ||||
| 
 | ||||
| 	size := counter.count | ||||
| 	log.Printf("Uploaded layer sha256:%s (%v bytes written)", sha256sum, size) | ||||
| 
 | ||||
| 	entry := manifest.Entry{ | ||||
|  | @ -446,7 +426,14 @@ func BuildImage(ctx context.Context, s *State, image *Image) (*BuildResult, erro | |||
| 	} | ||||
| 
 | ||||
| 	m, c := manifest.Manifest(layers) | ||||
| 	if _, err = uploadHashLayer(ctx, s, c.SHA256, bytes.NewReader(c.Config)); err != nil { | ||||
| 
 | ||||
| 	lw := func(w io.Writer) error { | ||||
| 		r := bytes.NewReader(c.Config) | ||||
| 		_, err := io.Copy(w, r) | ||||
| 		return err | ||||
| 	} | ||||
| 
 | ||||
| 	if _, err = uploadHashLayer(ctx, s, c.SHA256, lw); err != nil { | ||||
| 		log.Printf("failed to upload config for %s: %s\n", image.Name, err) | ||||
| 		return nil, err | ||||
| 	} | ||||
|  |  | |||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue