chore(tvix/nar-bridge): move to nar-bridge-go
Make some space for the rust implementation. Change-Id: I924dc1657be10abe5a11951c3b9de50bae06db19 Reviewed-on: https://cl.tvl.fyi/c/depot/+/11662 Tested-by: BuildkiteCI Autosubmit: flokli <flokli@flokli.de> Reviewed-by: yuka <yuka@yuka.dev>
This commit is contained in:
parent
ce1aa10b69
commit
1392913e98
32 changed files with 13 additions and 14 deletions
71
tvix/nar-bridge-go/pkg/importer/blob_upload.go
Normal file
71
tvix/nar-bridge-go/pkg/importer/blob_upload.go
Normal file
|
|
@ -0,0 +1,71 @@
|
|||
package importer
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"context"
|
||||
"encoding/base64"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
|
||||
castorev1pb "code.tvl.fyi/tvix/castore-go"
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
// the size of individual BlobChunk we send when uploading to BlobService.
|
||||
const chunkSize = 1024 * 1024
|
||||
|
||||
// this produces a callback function that can be used as blobCb for the
|
||||
// importer.Import function call.
|
||||
func GenBlobUploaderCb(ctx context.Context, blobServiceClient castorev1pb.BlobServiceClient) func(io.Reader) ([]byte, error) {
|
||||
return func(blobReader io.Reader) ([]byte, error) {
|
||||
// Ensure the blobReader is buffered to at least the chunk size.
|
||||
blobReader = bufio.NewReaderSize(blobReader, chunkSize)
|
||||
|
||||
putter, err := blobServiceClient.Put(ctx)
|
||||
if err != nil {
|
||||
// return error to the importer
|
||||
return nil, fmt.Errorf("error from blob service: %w", err)
|
||||
}
|
||||
|
||||
blobSize := 0
|
||||
chunk := make([]byte, chunkSize)
|
||||
|
||||
for {
|
||||
n, err := blobReader.Read(chunk)
|
||||
if err != nil && !errors.Is(err, io.EOF) {
|
||||
return nil, fmt.Errorf("unable to read from blobreader: %w", err)
|
||||
}
|
||||
|
||||
if n != 0 {
|
||||
log.WithField("chunk_size", n).Debug("sending chunk")
|
||||
blobSize += n
|
||||
|
||||
// send the blob chunk to the server. The err is only valid in the inner scope
|
||||
if err := putter.Send(&castorev1pb.BlobChunk{
|
||||
Data: chunk[:n],
|
||||
}); err != nil {
|
||||
return nil, fmt.Errorf("sending blob chunk: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
// if our read from blobReader returned an EOF, we're done reading
|
||||
if errors.Is(err, io.EOF) {
|
||||
break
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
resp, err := putter.CloseAndRecv()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("close blob putter: %w", err)
|
||||
}
|
||||
|
||||
log.WithFields(log.Fields{
|
||||
"blob_digest": base64.StdEncoding.EncodeToString(resp.GetDigest()),
|
||||
"blob_size": blobSize,
|
||||
}).Debug("uploaded blob")
|
||||
|
||||
return resp.GetDigest(), nil
|
||||
}
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue