From cb80730652fda424772093632fd694bf3f4e8124 Mon Sep 17 00:00:00 2001 From: Stan Hu <stanhu@gmail.com> Date: Mon, 10 Aug 2020 14:02:22 -0700 Subject: [PATCH] Refactor uploaders to use different upload strategies Previously it was particularly tricky to add a new object storage method because you had to be aware of how to deal with different Goroutines and contexts to handle the Workhorse upload flow (https://docs.gitlab.com/ee/development/uploads.html#direct-upload). In addition, the execution engine to handle this was duplicated across multiple files. The execution engine essentially did the following: 1. Set up an upload context with a deadline 2. Record upload metrics 3. Initialize cleanup functions 4. Initiate upload 5. Validate upload ETag 6. Do cleanup (e.g. delete the temporary file) To reduce code duplication and to make it easier to add new object stores, the common execution sequence is now encapsulated in the `uploader` `Execute()` method. We also introduce an `UploadStrategy` interface that handles the details of the uploads, and `Execute()` calls methods on this interface. Now adding a new object storage type is a matter of implementing the `UploadStrategy` interface without needing to understand the details of the execution engine. --- .../unreleased/sh-refactor-uploaders.yml | 5 + internal/objectstore/multipart.go | 198 ++++++++---------- internal/objectstore/object.go | 124 +++++------ internal/objectstore/s3_object.go | 117 ++++------- internal/objectstore/upload_strategy.go | 46 ++++ internal/objectstore/uploader.go | 132 ++++++++---- 6 files changed, 323 insertions(+), 299 deletions(-) create mode 100644 changelogs/unreleased/sh-refactor-uploaders.yml create mode 100644 internal/objectstore/upload_strategy.go diff --git a/changelogs/unreleased/sh-refactor-uploaders.yml b/changelogs/unreleased/sh-refactor-uploaders.yml new file mode 100644 index 0000000000000..48bb932f0dbe9 --- /dev/null +++ b/changelogs/unreleased/sh-refactor-uploaders.yml @@ -0,0 +1,5 @@ +--- +title: Refactor uploaders to use different upload strategies +merge_request: 553 +author: +type: other diff --git a/internal/objectstore/multipart.go b/internal/objectstore/multipart.go index 4947df4832da9..8ab936d3d023a 100644 --- a/internal/objectstore/multipart.go +++ b/internal/objectstore/multipart.go @@ -22,12 +22,16 @@ var ErrNotEnoughParts = errors.New("not enough Parts") // Multipart represents a MultipartUpload on a S3 compatible Object Store service. // It can be used as io.WriteCloser for uploading an object type Multipart struct { + PartURLs []string // CompleteURL is a presigned URL for CompleteMultipartUpload CompleteURL string // AbortURL is a presigned URL for AbortMultipartUpload AbortURL string // DeleteURL is a presigned URL for RemoveObject - DeleteURL string + DeleteURL string + PutHeaders map[string]string + partSize int64 + etag string uploader } @@ -36,130 +40,63 @@ type Multipart struct { // then uploaded with S3 Upload Part. Once Multipart is Closed a final call to CompleteMultipartUpload will be sent. // In case of any error a call to AbortMultipartUpload will be made to cleanup all the resources func NewMultipart(ctx context.Context, partURLs []string, completeURL, abortURL, deleteURL string, putHeaders map[string]string, deadline time.Time, partSize int64) (*Multipart, error) { - pr, pw := io.Pipe() - uploadCtx, cancelFn := context.WithDeadline(ctx, deadline) m := &Multipart{ + PartURLs: partURLs, CompleteURL: completeURL, AbortURL: abortURL, DeleteURL: deleteURL, - uploader: newUploader(uploadCtx, pw), - } - - go m.trackUploadTime() - go m.cleanup(ctx) - - objectStorageUploadsOpen.Inc() - - go func() { - defer cancelFn() - defer objectStorageUploadsOpen.Dec() - defer func() { - // This will be returned as error to the next write operation on the pipe - pr.CloseWithError(m.uploadError) - }() - - cmu := &CompleteMultipartUpload{} - for i, partURL := range partURLs { - src := io.LimitReader(pr, partSize) - part, err := m.readAndUploadOnePart(partURL, putHeaders, src, i+1) - if err != nil { - m.uploadError = err - return - } - if part == nil { - break - } else { - cmu.Part = append(cmu.Part, part) - } - } - - n, err := io.Copy(ioutil.Discard, pr) - if err != nil { - m.uploadError = fmt.Errorf("drain pipe: %v", err) - return - } - if n > 0 { - m.uploadError = ErrNotEnoughParts - return - } - - if err := m.complete(cmu); err != nil { - m.uploadError = err - return - } - }() - - return m, nil -} - -func (m *Multipart) trackUploadTime() { - started := time.Now() - <-m.ctx.Done() - objectStorageUploadTime.Observe(time.Since(started).Seconds()) -} - -func (m *Multipart) cleanup(ctx context.Context) { - // wait for the upload to finish - <-m.ctx.Done() - - if m.uploadError != nil { - objectStorageUploadRequestsRequestFailed.Inc() - m.abort() - return + PutHeaders: putHeaders, + partSize: partSize, } - // We have now successfully uploaded the file to object storage. Another - // goroutine will hand off the object to gitlab-rails. - <-ctx.Done() + m.uploader = newUploader(m) + m.Execute(ctx, deadline) - // gitlab-rails is now done with the object so it's time to delete it. - m.delete() + return m, nil } -func (m *Multipart) complete(cmu *CompleteMultipartUpload) error { - body, err := xml.Marshal(cmu) - if err != nil { - return fmt.Errorf("marshal CompleteMultipartUpload request: %v", err) - } - - req, err := http.NewRequest("POST", m.CompleteURL, bytes.NewReader(body)) - if err != nil { - return fmt.Errorf("create CompleteMultipartUpload request: %v", err) +func (m *Multipart) Upload(ctx context.Context, r io.Reader) error { + cmu := &CompleteMultipartUpload{} + for i, partURL := range m.PartURLs { + src := io.LimitReader(r, m.partSize) + part, err := m.readAndUploadOnePart(ctx, partURL, m.PutHeaders, src, i+1) + if err != nil { + return err + } + if part == nil { + break + } else { + cmu.Part = append(cmu.Part, part) + } } - req.ContentLength = int64(len(body)) - req.Header.Set("Content-Type", "application/xml") - req = req.WithContext(m.ctx) - resp, err := httpClient.Do(req) + n, err := io.Copy(ioutil.Discard, r) if err != nil { - return fmt.Errorf("CompleteMultipartUpload request %q: %v", mask.URL(m.CompleteURL), err) - } - defer resp.Body.Close() - - if resp.StatusCode != http.StatusOK { - return fmt.Errorf("CompleteMultipartUpload request %v returned: %s", mask.URL(m.CompleteURL), resp.Status) + return fmt.Errorf("drain pipe: %v", err) } - - result := &compoundCompleteMultipartUploadResult{} - decoder := xml.NewDecoder(resp.Body) - if err := decoder.Decode(&result); err != nil { - return fmt.Errorf("decode CompleteMultipartUpload answer: %v", err) + if n > 0 { + return ErrNotEnoughParts } - if result.isError() { - return result + if err := m.complete(ctx, cmu); err != nil { + return err } - if result.CompleteMultipartUploadResult == nil { - return fmt.Errorf("empty CompleteMultipartUploadResult") - } + return nil +} - m.extractETag(result.ETag) +func (m *Multipart) ETag() string { + return m.etag +} +func (m *Multipart) Abort() { + deleteURL(m.AbortURL) +} - return nil +func (m *Multipart) Delete() { + deleteURL(m.DeleteURL) } -func (m *Multipart) readAndUploadOnePart(partURL string, putHeaders map[string]string, src io.Reader, partNumber int) (*completeMultipartUploadPart, error) { +func (m *Multipart) readAndUploadOnePart(ctx context.Context, partURL string, putHeaders map[string]string, src io.Reader, partNumber int) (*completeMultipartUploadPart, error) { file, err := ioutil.TempFile("", "part-buffer") if err != nil { return nil, fmt.Errorf("create temporary buffer file: %v", err) @@ -182,20 +119,20 @@ func (m *Multipart) readAndUploadOnePart(partURL string, putHeaders map[string]s return nil, fmt.Errorf("rewind part %d temporary dump : %v", partNumber, err) } - etag, err := m.uploadPart(partURL, putHeaders, file, n) + etag, err := m.uploadPart(ctx, partURL, putHeaders, file, n) if err != nil { return nil, fmt.Errorf("upload part %d: %v", partNumber, err) } return &completeMultipartUploadPart{PartNumber: partNumber, ETag: etag}, nil } -func (m *Multipart) uploadPart(url string, headers map[string]string, body io.Reader, size int64) (string, error) { - deadline, ok := m.ctx.Deadline() +func (m *Multipart) uploadPart(ctx context.Context, url string, headers map[string]string, body io.Reader, size int64) (string, error) { + deadline, ok := ctx.Deadline() if !ok { return "", fmt.Errorf("missing deadline") } - part, err := newObject(m.ctx, url, "", headers, deadline, size, false) + part, err := newObject(ctx, url, "", headers, deadline, size, false) if err != nil { return "", err } @@ -213,10 +150,45 @@ func (m *Multipart) uploadPart(url string, headers map[string]string, body io.Re return part.ETag(), nil } -func (m *Multipart) delete() { - m.syncAndDelete(m.DeleteURL) -} +func (m *Multipart) complete(ctx context.Context, cmu *CompleteMultipartUpload) error { + body, err := xml.Marshal(cmu) + if err != nil { + return fmt.Errorf("marshal CompleteMultipartUpload request: %v", err) + } + + req, err := http.NewRequest("POST", m.CompleteURL, bytes.NewReader(body)) + if err != nil { + return fmt.Errorf("create CompleteMultipartUpload request: %v", err) + } + req.ContentLength = int64(len(body)) + req.Header.Set("Content-Type", "application/xml") + req = req.WithContext(ctx) + + resp, err := httpClient.Do(req) + if err != nil { + return fmt.Errorf("CompleteMultipartUpload request %q: %v", mask.URL(m.CompleteURL), err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("CompleteMultipartUpload request %v returned: %s", mask.URL(m.CompleteURL), resp.Status) + } + + result := &compoundCompleteMultipartUploadResult{} + decoder := xml.NewDecoder(resp.Body) + if err := decoder.Decode(&result); err != nil { + return fmt.Errorf("decode CompleteMultipartUpload answer: %v", err) + } + + if result.isError() { + return result + } -func (m *Multipart) abort() { - m.syncAndDelete(m.AbortURL) + if result.CompleteMultipartUploadResult == nil { + return fmt.Errorf("empty CompleteMultipartUploadResult") + } + + m.etag = extractETag(result.ETag) + + return nil } diff --git a/internal/objectstore/object.go b/internal/objectstore/object.go index 169d76d727095..2a6bd8004d369 100644 --- a/internal/objectstore/object.go +++ b/internal/objectstore/object.go @@ -7,7 +7,6 @@ import ( "io/ioutil" "net" "net/http" - "strings" "time" "gitlab.com/gitlab-org/labkit/correlation" @@ -36,109 +35,82 @@ var httpClient = &http.Client{ Transport: httpTransport, } -type StatusCodeError error - // Object represents an object on a S3 compatible Object Store service. // It can be used as io.WriteCloser for uploading an object type Object struct { - // PutURL is a presigned URL for PutObject - PutURL string - // DeleteURL is a presigned URL for RemoveObject - DeleteURL string + // putURL is a presigned URL for PutObject + putURL string + // deleteURL is a presigned URL for RemoveObject + deleteURL string + putHeaders map[string]string + size int64 + etag string + metrics bool uploader } +type StatusCodeError error + // NewObject opens an HTTP connection to Object Store and returns an Object pointer that can be used for uploading. func NewObject(ctx context.Context, putURL, deleteURL string, putHeaders map[string]string, deadline time.Time, size int64) (*Object, error) { return newObject(ctx, putURL, deleteURL, putHeaders, deadline, size, true) } func newObject(ctx context.Context, putURL, deleteURL string, putHeaders map[string]string, deadline time.Time, size int64, metrics bool) (*Object, error) { - started := time.Now() - pr, pw := io.Pipe() + o := &Object{ + putURL: putURL, + deleteURL: deleteURL, + putHeaders: putHeaders, + size: size, + metrics: metrics, + } + + o.uploader = newMD5Uploader(o, metrics) + o.Execute(ctx, deadline) + + return o, nil +} + +func (o *Object) Upload(ctx context.Context, r io.Reader) error { // we should prevent pr.Close() otherwise it may shadow error set with pr.CloseWithError(err) - req, err := http.NewRequest(http.MethodPut, putURL, ioutil.NopCloser(pr)) + req, err := http.NewRequest(http.MethodPut, o.putURL, ioutil.NopCloser(r)) + if err != nil { - if metrics { - objectStorageUploadRequestsRequestFailed.Inc() - } - return nil, fmt.Errorf("PUT %q: %v", mask.URL(putURL), err) + return fmt.Errorf("PUT %q: %v", mask.URL(o.putURL), err) } - req.ContentLength = size + req.ContentLength = o.size - for k, v := range putHeaders { + for k, v := range o.putHeaders { req.Header.Set(k, v) } - uploadCtx, cancelFn := context.WithDeadline(ctx, deadline) - o := &Object{ - PutURL: putURL, - DeleteURL: deleteURL, - uploader: newMD5Uploader(uploadCtx, pw), - } - - if metrics { - objectStorageUploadsOpen.Inc() + resp, err := httpClient.Do(req) + if err != nil { + return fmt.Errorf("PUT request %q: %v", mask.URL(o.putURL), err) } + defer resp.Body.Close() - go func() { - // wait for the upload to finish - <-o.ctx.Done() - if metrics { - objectStorageUploadTime.Observe(time.Since(started).Seconds()) - } - - // wait for provided context to finish before performing cleanup - <-ctx.Done() - o.delete() - }() - - go func() { - defer cancelFn() - if metrics { - defer objectStorageUploadsOpen.Dec() - } - defer func() { - // This will be returned as error to the next write operation on the pipe - pr.CloseWithError(o.uploadError) - }() - - req = req.WithContext(o.ctx) - - resp, err := httpClient.Do(req) - if err != nil { - if metrics { - objectStorageUploadRequestsRequestFailed.Inc() - } - o.uploadError = fmt.Errorf("PUT request %q: %v", mask.URL(o.PutURL), err) - return - } - defer resp.Body.Close() - - if resp.StatusCode != http.StatusOK { - if metrics { - objectStorageUploadRequestsInvalidStatus.Inc() - } - o.uploadError = StatusCodeError(fmt.Errorf("PUT request %v returned: %s", mask.URL(o.PutURL), resp.Status)) - return + if resp.StatusCode != http.StatusOK { + if o.metrics { + objectStorageUploadRequestsInvalidStatus.Inc() } + return StatusCodeError(fmt.Errorf("PUT request %v returned: %s", mask.URL(o.putURL), resp.Status)) + } - o.extractETag(resp.Header.Get("ETag")) - o.uploadError = compareMD5(o.md5Sum(), o.etag) - }() + o.etag = extractETag(resp.Header.Get("ETag")) - return o, nil + return nil } -func (o *Object) delete() { - o.syncAndDelete(o.DeleteURL) +func (o *Object) ETag() string { + return o.etag } -func compareMD5(local, remote string) error { - if !strings.EqualFold(local, remote) { - return fmt.Errorf("ETag mismatch. expected %q got %q", local, remote) - } +func (o *Object) Abort() { + o.Delete() +} - return nil +func (o *Object) Delete() { + deleteURL(o.deleteURL) } diff --git a/internal/objectstore/s3_object.go b/internal/objectstore/s3_object.go index 5ea0773f833d3..7444283bbc7d6 100644 --- a/internal/objectstore/s3_object.go +++ b/internal/objectstore/s3_object.go @@ -5,23 +5,36 @@ import ( "io" "time" - "gitlab.com/gitlab-org/gitlab-workhorse/internal/config" - "github.com/aws/aws-sdk-go/aws" - "github.com/aws/aws-sdk-go/service/s3" "github.com/aws/aws-sdk-go/service/s3/s3manager" - "gitlab.com/gitlab-org/labkit/log" + + "gitlab.com/gitlab-org/gitlab-workhorse/internal/config" ) type S3Object struct { credentials config.S3Credentials config config.S3Config objectName string + uploaded bool + uploader } +func NewS3Object(ctx context.Context, objectName string, s3Credentials config.S3Credentials, s3Config config.S3Config, deadline time.Time) (*S3Object, error) { + o := &S3Object{ + credentials: s3Credentials, + config: s3Config, + objectName: objectName, + } + + o.uploader = newUploader(o) + o.Execute(ctx, deadline) + + return o, nil +} + func setEncryptionOptions(input *s3manager.UploadInput, s3Config config.S3Config) { if s3Config.ServerSideEncryption != "" { input.ServerSideEncryption = aws.String(s3Config.ServerSideEncryption) @@ -32,88 +45,48 @@ func setEncryptionOptions(input *s3manager.UploadInput, s3Config config.S3Config } } -func NewS3Object(ctx context.Context, objectName string, s3Credentials config.S3Credentials, s3Config config.S3Config, deadline time.Time) (*S3Object, error) { - pr, pw := io.Pipe() - objectStorageUploadsOpen.Inc() - uploadCtx, cancelFn := context.WithDeadline(ctx, deadline) - - o := &S3Object{ - uploader: newUploader(uploadCtx, pw), - credentials: s3Credentials, - config: s3Config, +func (s *S3Object) Upload(ctx context.Context, r io.Reader) error { + sess, err := setupS3Session(s.credentials, s.config) + if err != nil { + log.WithError(err).Error("error creating S3 session") + return err } - go o.trackUploadTime() - go o.cleanup(ctx) - - go func() { - defer cancelFn() - defer objectStorageUploadsOpen.Dec() - defer func() { - // This will be returned as error to the next write operation on the pipe - pr.CloseWithError(o.uploadError) - }() - - sess, err := setupS3Session(s3Credentials, s3Config) - if err != nil { - o.uploadError = err - log.WithError(err).Error("error creating S3 session") - return - } + uploader := s3manager.NewUploader(sess) - o.objectName = objectName - uploader := s3manager.NewUploader(sess) + input := &s3manager.UploadInput{ + Bucket: aws.String(s.config.Bucket), + Key: aws.String(s.objectName), + Body: r, + } - input := &s3manager.UploadInput{ - Bucket: aws.String(s3Config.Bucket), - Key: aws.String(objectName), - Body: pr, - } + setEncryptionOptions(input, s.config) - setEncryptionOptions(input, s3Config) + _, err = uploader.UploadWithContext(ctx, input) + if err != nil { + log.WithError(err).Error("error uploading S3 session") + return err + } - _, err = uploader.UploadWithContext(uploadCtx, input) - if err != nil { - o.uploadError = err - objectStorageUploadRequestsRequestFailed.Inc() - log.WithError(err).Error("error uploading S3 session") - return - } - }() + s.uploaded = true - return o, nil + return nil } -func (o *S3Object) trackUploadTime() { - started := time.Now() - <-o.ctx.Done() - objectStorageUploadTime.Observe(time.Since(started).Seconds()) +func (s *S3Object) ETag() string { + return "" } -func (o *S3Object) cleanup(ctx context.Context) { - // wait for the upload to finish - <-o.ctx.Done() - - if o.uploadError != nil { - objectStorageUploadRequestsRequestFailed.Inc() - o.delete() - return - } - - // We have now successfully uploaded the file to object storage. Another - // goroutine will hand off the object to gitlab-rails. - <-ctx.Done() - - // gitlab-rails is now done with the object so it's time to delete it. - o.delete() +func (s *S3Object) Abort() { + s.Delete() } -func (o *S3Object) delete() { - if o.objectName == "" { +func (s *S3Object) Delete() { + if !s.uploaded { return } - session, err := setupS3Session(o.credentials, o.config) + session, err := setupS3Session(s.credentials, s.config) if err != nil { log.WithError(err).Error("error setting up S3 session in delete") return @@ -121,8 +94,8 @@ func (o *S3Object) delete() { svc := s3.New(session) input := &s3.DeleteObjectInput{ - Bucket: aws.String(o.config.Bucket), - Key: aws.String(o.objectName), + Bucket: aws.String(s.config.Bucket), + Key: aws.String(s.objectName), } // Note we can't use the request context because in a successful diff --git a/internal/objectstore/upload_strategy.go b/internal/objectstore/upload_strategy.go new file mode 100644 index 0000000000000..5707ba5f24eae --- /dev/null +++ b/internal/objectstore/upload_strategy.go @@ -0,0 +1,46 @@ +package objectstore + +import ( + "context" + "io" + "net/http" + + "gitlab.com/gitlab-org/labkit/log" + "gitlab.com/gitlab-org/labkit/mask" +) + +type uploadStrategy interface { + Upload(ctx context.Context, r io.Reader) error + ETag() string + Abort() + Delete() +} + +func deleteURL(url string) { + if url == "" { + return + } + + req, err := http.NewRequest("DELETE", url, nil) + if err != nil { + log.WithError(err).WithField("object", mask.URL(url)).Warning("Delete failed") + return + } + // TODO: consider adding the context to the outgoing request for better instrumentation + + // here we are not using u.ctx because we must perform cleanup regardless of parent context + resp, err := httpClient.Do(req) + if err != nil { + log.WithError(err).WithField("object", mask.URL(url)).Warning("Delete failed") + return + } + resp.Body.Close() +} + +func extractETag(rawETag string) string { + if rawETag != "" && rawETag[0] == '"' { + rawETag = rawETag[1 : len(rawETag)-1] + } + + return rawETag +} diff --git a/internal/objectstore/uploader.go b/internal/objectstore/uploader.go index a8e52e22ffdf7..0d8d24b4739f8 100644 --- a/internal/objectstore/uploader.go +++ b/internal/objectstore/uploader.go @@ -4,12 +4,11 @@ import ( "context" "crypto/md5" "encoding/hex" + "fmt" "hash" "io" - "net/http" - - "gitlab.com/gitlab-org/labkit/log" - "gitlab.com/gitlab-org/labkit/mask" + "strings" + "time" ) // Upload represents an upload to an ObjectStorage provider @@ -33,16 +32,23 @@ type uploader struct { uploadError error // ctx is the internal context bound to the upload request ctx context.Context + + pr *io.PipeReader + pw *io.PipeWriter + strategy uploadStrategy + metrics bool } -func newUploader(ctx context.Context, w io.WriteCloser) uploader { - return uploader{w: w, c: w, ctx: ctx} +func newUploader(strategy uploadStrategy) uploader { + pr, pw := io.Pipe() + return uploader{w: pw, c: pw, pr: pr, pw: pw, strategy: strategy, metrics: true} } -func newMD5Uploader(ctx context.Context, w io.WriteCloser) uploader { +func newMD5Uploader(strategy uploadStrategy, metrics bool) uploader { + pr, pw := io.Pipe() hasher := md5.New() - mw := io.MultiWriter(w, hasher) - return uploader{w: mw, c: w, md5: hasher, ctx: ctx} + mw := io.MultiWriter(pw, hasher) + return uploader{w: mw, c: pw, pr: pr, pw: pw, md5: hasher, strategy: strategy, metrics: metrics} } // Close implements the standard io.Closer interface: it closes the http client request. @@ -65,50 +71,100 @@ func (u *uploader) Write(p []byte) (int, error) { return u.w.Write(p) } -// syncAndDelete wait for Context to be Done and then performs the requested HTTP call -func (u *uploader) syncAndDelete(url string) { - if url == "" { - return +func (u *uploader) md5Sum() string { + if u.md5 == nil { + return "" } + checksum := u.md5.Sum(nil) + return hex.EncodeToString(checksum) +} + +// ETag returns the checksum of the uploaded object returned by the ObjectStorage provider via ETag Header. +// This method will wait until upload context is done before returning. +func (u *uploader) ETag() string { <-u.ctx.Done() - req, err := http.NewRequest("DELETE", url, nil) - if err != nil { - log.WithError(err).WithField("object", mask.URL(url)).Warning("Delete failed") - return + return u.etag +} + +func (u *uploader) Execute(ctx context.Context, deadline time.Time) { + if u.metrics { + objectStorageUploadsOpen.Inc() } - // TODO: consider adding the context to the outgoing request for better instrumentation + uploadCtx, cancelFn := context.WithDeadline(ctx, deadline) + u.ctx = uploadCtx - // here we are not using u.ctx because we must perform cleanup regardless of parent context - resp, err := httpClient.Do(req) - if err != nil { - log.WithError(err).WithField("object", mask.URL(url)).Warning("Delete failed") - return + if u.metrics { + go u.trackUploadTime() } - resp.Body.Close() + go u.cleanup(ctx) + go func() { + defer cancelFn() + if u.metrics { + defer objectStorageUploadsOpen.Dec() + } + defer func() { + // This will be returned as error to the next write operation on the pipe + u.pr.CloseWithError(u.uploadError) + }() + + err := u.strategy.Upload(uploadCtx, u.pr) + if err != nil { + u.uploadError = err + if u.metrics { + objectStorageUploadRequestsRequestFailed.Inc() + } + return + } + + u.etag = u.strategy.ETag() + + if u.md5 != nil { + err := compareMD5(u.md5Sum(), u.etag) + if err != nil { + u.uploadError = err + if u.metrics { + objectStorageUploadRequestsRequestFailed.Inc() + } + } + } + }() } -func (u *uploader) extractETag(rawETag string) { - if rawETag != "" && rawETag[0] == '"' { - rawETag = rawETag[1 : len(rawETag)-1] +func (u *uploader) trackUploadTime() { + started := time.Now() + <-u.ctx.Done() + + if u.metrics { + objectStorageUploadTime.Observe(time.Since(started).Seconds()) } - u.etag = rawETag } -func (u *uploader) md5Sum() string { - if u.md5 == nil { - return "" +func (u *uploader) cleanup(ctx context.Context) { + // wait for the upload to finish + <-u.ctx.Done() + + if u.uploadError != nil { + if u.metrics { + objectStorageUploadRequestsRequestFailed.Inc() + } + u.strategy.Abort() + return } - checksum := u.md5.Sum(nil) - return hex.EncodeToString(checksum) + // We have now successfully uploaded the file to object storage. Another + // goroutine will hand off the object to gitlab-rails. + <-ctx.Done() + + // gitlab-rails is now done with the object so it's time to delete it. + u.strategy.Delete() } -// ETag returns the checksum of the uploaded object returned by the ObjectStorage provider via ETag Header. -// This method will wait until upload context is done before returning. -func (u *uploader) ETag() string { - <-u.ctx.Done() +func compareMD5(local, remote string) error { + if !strings.EqualFold(local, remote) { + return fmt.Errorf("ETag mismatch. expected %q got %q", local, remote) + } - return u.etag + return nil } -- GitLab