refactor(tvix/nar-bridge): let callbaks return calculated digests
This aligns behaviour more with how it should be - it's the responsibility of the callback functions to return digests of the things they consume(d). It allows further cleaning up the hasher struct. Change-Id: I9cbfc87e6abd4ff17fadf39eb6563ec3cb7fcc6f Reviewed-on: https://cl.tvl.fyi/c/depot/+/9528 Autosubmit: flokli <flokli@flokli.de> Tested-by: BuildkiteCI Reviewed-by: Connor Brewster <cbrewster@hey.com>
This commit is contained in:
parent
b1ff1267be
commit
f92b0ef933
6 changed files with 109 additions and 100 deletions
|
|
@ -17,15 +17,15 @@ const chunkSize = 1024 * 1024
|
|||
|
||||
// this produces a callback function that can be used as blobCb for the
|
||||
// importer.Import function call.
|
||||
func genBlobServiceWriteCb(ctx context.Context, blobServiceClient castorev1pb.BlobServiceClient) func(io.Reader) error {
|
||||
return func(blobReader io.Reader) error {
|
||||
func genBlobServiceWriteCb(ctx context.Context, blobServiceClient castorev1pb.BlobServiceClient) func(io.Reader) ([]byte, error) {
|
||||
return func(blobReader io.Reader) ([]byte, error) {
|
||||
// Ensure the blobReader is buffered to at least the chunk size.
|
||||
blobReader = bufio.NewReaderSize(blobReader, chunkSize)
|
||||
|
||||
putter, err := blobServiceClient.Put(ctx)
|
||||
if err != nil {
|
||||
// return error to the importer
|
||||
return fmt.Errorf("error from blob service: %w", err)
|
||||
return nil, fmt.Errorf("error from blob service: %w", err)
|
||||
}
|
||||
|
||||
blobSize := 0
|
||||
|
|
@ -34,7 +34,7 @@ func genBlobServiceWriteCb(ctx context.Context, blobServiceClient castorev1pb.Bl
|
|||
for {
|
||||
n, err := blobReader.Read(chunk)
|
||||
if err != nil && !errors.Is(err, io.EOF) {
|
||||
return fmt.Errorf("unable to read from blobreader: %w", err)
|
||||
return nil, fmt.Errorf("unable to read from blobreader: %w", err)
|
||||
}
|
||||
|
||||
if n != 0 {
|
||||
|
|
@ -45,7 +45,7 @@ func genBlobServiceWriteCb(ctx context.Context, blobServiceClient castorev1pb.Bl
|
|||
if err := putter.Send(&castorev1pb.BlobChunk{
|
||||
Data: chunk[:n],
|
||||
}); err != nil {
|
||||
return fmt.Errorf("sending blob chunk: %w", err)
|
||||
return nil, fmt.Errorf("sending blob chunk: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -58,7 +58,7 @@ func genBlobServiceWriteCb(ctx context.Context, blobServiceClient castorev1pb.Bl
|
|||
|
||||
resp, err := putter.CloseAndRecv()
|
||||
if err != nil {
|
||||
return fmt.Errorf("close blob putter: %w", err)
|
||||
return nil, fmt.Errorf("close blob putter: %w", err)
|
||||
}
|
||||
|
||||
log.WithFields(log.Fields{
|
||||
|
|
@ -66,6 +66,6 @@ func genBlobServiceWriteCb(ctx context.Context, blobServiceClient castorev1pb.Bl
|
|||
"blob_size": blobSize,
|
||||
}).Debug("uploaded blob")
|
||||
|
||||
return nil
|
||||
return resp.GetDigest(), nil
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -23,10 +23,10 @@ func NewDirectoriesUploader(ctx context.Context, directoryServiceClient castorev
|
|||
}
|
||||
}
|
||||
|
||||
func (du *DirectoriesUploader) Put(directory *castorev1pb.Directory) error {
|
||||
directoryDgst, err := directory.Digest()
|
||||
func (du *DirectoriesUploader) Put(directory *castorev1pb.Directory) ([]byte, error) {
|
||||
directoryDigest, err := directory.Digest()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed calculating directory digest: %w", err)
|
||||
return nil, fmt.Errorf("failed calculating directory digest: %w", err)
|
||||
}
|
||||
|
||||
// Send the directory to the directory service
|
||||
|
|
@ -34,7 +34,7 @@ func (du *DirectoriesUploader) Put(directory *castorev1pb.Directory) error {
|
|||
if du.directoryServicePutStream == nil {
|
||||
directoryServicePutStream, err := du.directoryServiceClient.Put(du.ctx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to initialize directory service put stream: %v", err)
|
||||
return nil, fmt.Errorf("unable to initialize directory service put stream: %v", err)
|
||||
}
|
||||
du.directoryServicePutStream = directoryServicePutStream
|
||||
}
|
||||
|
|
@ -42,11 +42,11 @@ func (du *DirectoriesUploader) Put(directory *castorev1pb.Directory) error {
|
|||
// send the directory out
|
||||
err = du.directoryServicePutStream.Send(directory)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error sending directory: %w", err)
|
||||
return nil, fmt.Errorf("error sending directory: %w", err)
|
||||
}
|
||||
log.WithField("digest", base64.StdEncoding.EncodeToString(directoryDgst)).Debug("uploaded directory")
|
||||
log.WithField("digest", base64.StdEncoding.EncodeToString(directoryDigest)).Debug("uploaded directory")
|
||||
|
||||
return nil
|
||||
return directoryDigest, nil
|
||||
}
|
||||
|
||||
// Done is called whenever we're
|
||||
|
|
|
|||
|
|
@ -44,7 +44,7 @@ func registerNarPut(s *Server) {
|
|||
// buffer the body by 10MiB
|
||||
bufio.NewReaderSize(r.Body, 10*1024*1024),
|
||||
genBlobServiceWriteCb(ctx, s.blobServiceClient),
|
||||
func(directory *castorev1pb.Directory) error {
|
||||
func(directory *castorev1pb.Directory) ([]byte, error) {
|
||||
return directoriesUploader.Put(directory)
|
||||
},
|
||||
)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue