diff --git a/changelogs/unreleased/sh-refactor-uploaders.yml b/changelogs/unreleased/sh-refactor-uploaders.yml new file mode 100644 index 0000000000000000000000000000000000000000..48bb932f0dbe9ab5cfe29e3eaab925e27ecf949b --- /dev/null +++ b/changelogs/unreleased/sh-refactor-uploaders.yml @@ -0,0 +1,5 @@ +--- +title: Refactor uploaders to use different upload strategies +merge_request: 553 +author: +type: other diff --git a/internal/objectstore/multipart.go b/internal/objectstore/multipart.go index 4947df4832da9a44b36be0651375bae31f8d25af..8ab936d3d023a73b57b0b246a7a8c8246493e7c0 100644 --- a/internal/objectstore/multipart.go +++ b/internal/objectstore/multipart.go @@ -22,12 +22,16 @@ var ErrNotEnoughParts = errors.New("not enough Parts") // Multipart represents a MultipartUpload on a S3 compatible Object Store service. // It can be used as io.WriteCloser for uploading an object type Multipart struct { + PartURLs []string // CompleteURL is a presigned URL for CompleteMultipartUpload CompleteURL string // AbortURL is a presigned URL for AbortMultipartUpload AbortURL string // DeleteURL is a presigned URL for RemoveObject - DeleteURL string + DeleteURL string + PutHeaders map[string]string + partSize int64 + etag string uploader } @@ -36,130 +40,63 @@ type Multipart struct { // then uploaded with S3 Upload Part. Once Multipart is Closed a final call to CompleteMultipartUpload will be sent. // In case of any error a call to AbortMultipartUpload will be made to cleanup all the resources func NewMultipart(ctx context.Context, partURLs []string, completeURL, abortURL, deleteURL string, putHeaders map[string]string, deadline time.Time, partSize int64) (*Multipart, error) { - pr, pw := io.Pipe() - uploadCtx, cancelFn := context.WithDeadline(ctx, deadline) m := &Multipart{ + PartURLs: partURLs, CompleteURL: completeURL, AbortURL: abortURL, DeleteURL: deleteURL, - uploader: newUploader(uploadCtx, pw), - } - - go m.trackUploadTime() - go m.cleanup(ctx) - - objectStorageUploadsOpen.Inc() - - go func() { - defer cancelFn() - defer objectStorageUploadsOpen.Dec() - defer func() { - // This will be returned as error to the next write operation on the pipe - pr.CloseWithError(m.uploadError) - }() - - cmu := &CompleteMultipartUpload{} - for i, partURL := range partURLs { - src := io.LimitReader(pr, partSize) - part, err := m.readAndUploadOnePart(partURL, putHeaders, src, i+1) - if err != nil { - m.uploadError = err - return - } - if part == nil { - break - } else { - cmu.Part = append(cmu.Part, part) - } - } - - n, err := io.Copy(ioutil.Discard, pr) - if err != nil { - m.uploadError = fmt.Errorf("drain pipe: %v", err) - return - } - if n > 0 { - m.uploadError = ErrNotEnoughParts - return - } - - if err := m.complete(cmu); err != nil { - m.uploadError = err - return - } - }() - - return m, nil -} - -func (m *Multipart) trackUploadTime() { - started := time.Now() - <-m.ctx.Done() - objectStorageUploadTime.Observe(time.Since(started).Seconds()) -} - -func (m *Multipart) cleanup(ctx context.Context) { - // wait for the upload to finish - <-m.ctx.Done() - - if m.uploadError != nil { - objectStorageUploadRequestsRequestFailed.Inc() - m.abort() - return + PutHeaders: putHeaders, + partSize: partSize, } - // We have now successfully uploaded the file to object storage. Another - // goroutine will hand off the object to gitlab-rails. - <-ctx.Done() + m.uploader = newUploader(m) + m.Execute(ctx, deadline) - // gitlab-rails is now done with the object so it's time to delete it. - m.delete() + return m, nil } -func (m *Multipart) complete(cmu *CompleteMultipartUpload) error { - body, err := xml.Marshal(cmu) - if err != nil { - return fmt.Errorf("marshal CompleteMultipartUpload request: %v", err) - } - - req, err := http.NewRequest("POST", m.CompleteURL, bytes.NewReader(body)) - if err != nil { - return fmt.Errorf("create CompleteMultipartUpload request: %v", err) +func (m *Multipart) Upload(ctx context.Context, r io.Reader) error { + cmu := &CompleteMultipartUpload{} + for i, partURL := range m.PartURLs { + src := io.LimitReader(r, m.partSize) + part, err := m.readAndUploadOnePart(ctx, partURL, m.PutHeaders, src, i+1) + if err != nil { + return err + } + if part == nil { + break + } else { + cmu.Part = append(cmu.Part, part) + } } - req.ContentLength = int64(len(body)) - req.Header.Set("Content-Type", "application/xml") - req = req.WithContext(m.ctx) - resp, err := httpClient.Do(req) + n, err := io.Copy(ioutil.Discard, r) if err != nil { - return fmt.Errorf("CompleteMultipartUpload request %q: %v", mask.URL(m.CompleteURL), err) - } - defer resp.Body.Close() - - if resp.StatusCode != http.StatusOK { - return fmt.Errorf("CompleteMultipartUpload request %v returned: %s", mask.URL(m.CompleteURL), resp.Status) + return fmt.Errorf("drain pipe: %v", err) } - - result := &compoundCompleteMultipartUploadResult{} - decoder := xml.NewDecoder(resp.Body) - if err := decoder.Decode(&result); err != nil { - return fmt.Errorf("decode CompleteMultipartUpload answer: %v", err) + if n > 0 { + return ErrNotEnoughParts } - if result.isError() { - return result + if err := m.complete(ctx, cmu); err != nil { + return err } - if result.CompleteMultipartUploadResult == nil { - return fmt.Errorf("empty CompleteMultipartUploadResult") - } + return nil +} - m.extractETag(result.ETag) +func (m *Multipart) ETag() string { + return m.etag +} +func (m *Multipart) Abort() { + deleteURL(m.AbortURL) +} - return nil +func (m *Multipart) Delete() { + deleteURL(m.DeleteURL) } -func (m *Multipart) readAndUploadOnePart(partURL string, putHeaders map[string]string, src io.Reader, partNumber int) (*completeMultipartUploadPart, error) { +func (m *Multipart) readAndUploadOnePart(ctx context.Context, partURL string, putHeaders map[string]string, src io.Reader, partNumber int) (*completeMultipartUploadPart, error) { file, err := ioutil.TempFile("", "part-buffer") if err != nil { return nil, fmt.Errorf("create temporary buffer file: %v", err) @@ -182,20 +119,20 @@ func (m *Multipart) readAndUploadOnePart(partURL string, putHeaders map[string]s return nil, fmt.Errorf("rewind part %d temporary dump : %v", partNumber, err) } - etag, err := m.uploadPart(partURL, putHeaders, file, n) + etag, err := m.uploadPart(ctx, partURL, putHeaders, file, n) if err != nil { return nil, fmt.Errorf("upload part %d: %v", partNumber, err) } return &completeMultipartUploadPart{PartNumber: partNumber, ETag: etag}, nil } -func (m *Multipart) uploadPart(url string, headers map[string]string, body io.Reader, size int64) (string, error) { - deadline, ok := m.ctx.Deadline() +func (m *Multipart) uploadPart(ctx context.Context, url string, headers map[string]string, body io.Reader, size int64) (string, error) { + deadline, ok := ctx.Deadline() if !ok { return "", fmt.Errorf("missing deadline") } - part, err := newObject(m.ctx, url, "", headers, deadline, size, false) + part, err := newObject(ctx, url, "", headers, deadline, size, false) if err != nil { return "", err } @@ -213,10 +150,45 @@ func (m *Multipart) uploadPart(url string, headers map[string]string, body io.Re return part.ETag(), nil } -func (m *Multipart) delete() { - m.syncAndDelete(m.DeleteURL) -} +func (m *Multipart) complete(ctx context.Context, cmu *CompleteMultipartUpload) error { + body, err := xml.Marshal(cmu) + if err != nil { + return fmt.Errorf("marshal CompleteMultipartUpload request: %v", err) + } + + req, err := http.NewRequest("POST", m.CompleteURL, bytes.NewReader(body)) + if err != nil { + return fmt.Errorf("create CompleteMultipartUpload request: %v", err) + } + req.ContentLength = int64(len(body)) + req.Header.Set("Content-Type", "application/xml") + req = req.WithContext(ctx) + + resp, err := httpClient.Do(req) + if err != nil { + return fmt.Errorf("CompleteMultipartUpload request %q: %v", mask.URL(m.CompleteURL), err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("CompleteMultipartUpload request %v returned: %s", mask.URL(m.CompleteURL), resp.Status) + } + + result := &compoundCompleteMultipartUploadResult{} + decoder := xml.NewDecoder(resp.Body) + if err := decoder.Decode(&result); err != nil { + return fmt.Errorf("decode CompleteMultipartUpload answer: %v", err) + } + + if result.isError() { + return result + } -func (m *Multipart) abort() { - m.syncAndDelete(m.AbortURL) + if result.CompleteMultipartUploadResult == nil { + return fmt.Errorf("empty CompleteMultipartUploadResult") + } + + m.etag = extractETag(result.ETag) + + return nil } diff --git a/internal/objectstore/object.go b/internal/objectstore/object.go index 169d76d7270958d0fdfff947d3bd6368a528e42f..2a6bd8004d3698480594cbca713f809480a5964a 100644 --- a/internal/objectstore/object.go +++ b/internal/objectstore/object.go @@ -7,7 +7,6 @@ import ( "io/ioutil" "net" "net/http" - "strings" "time" "gitlab.com/gitlab-org/labkit/correlation" @@ -36,109 +35,82 @@ var httpClient = &http.Client{ Transport: httpTransport, } -type StatusCodeError error - // Object represents an object on a S3 compatible Object Store service. // It can be used as io.WriteCloser for uploading an object type Object struct { - // PutURL is a presigned URL for PutObject - PutURL string - // DeleteURL is a presigned URL for RemoveObject - DeleteURL string + // putURL is a presigned URL for PutObject + putURL string + // deleteURL is a presigned URL for RemoveObject + deleteURL string + putHeaders map[string]string + size int64 + etag string + metrics bool uploader } +type StatusCodeError error + // NewObject opens an HTTP connection to Object Store and returns an Object pointer that can be used for uploading. func NewObject(ctx context.Context, putURL, deleteURL string, putHeaders map[string]string, deadline time.Time, size int64) (*Object, error) { return newObject(ctx, putURL, deleteURL, putHeaders, deadline, size, true) } func newObject(ctx context.Context, putURL, deleteURL string, putHeaders map[string]string, deadline time.Time, size int64, metrics bool) (*Object, error) { - started := time.Now() - pr, pw := io.Pipe() + o := &Object{ + putURL: putURL, + deleteURL: deleteURL, + putHeaders: putHeaders, + size: size, + metrics: metrics, + } + + o.uploader = newMD5Uploader(o, metrics) + o.Execute(ctx, deadline) + + return o, nil +} + +func (o *Object) Upload(ctx context.Context, r io.Reader) error { // we should prevent pr.Close() otherwise it may shadow error set with pr.CloseWithError(err) - req, err := http.NewRequest(http.MethodPut, putURL, ioutil.NopCloser(pr)) + req, err := http.NewRequest(http.MethodPut, o.putURL, ioutil.NopCloser(r)) + if err != nil { - if metrics { - objectStorageUploadRequestsRequestFailed.Inc() - } - return nil, fmt.Errorf("PUT %q: %v", mask.URL(putURL), err) + return fmt.Errorf("PUT %q: %v", mask.URL(o.putURL), err) } - req.ContentLength = size + req.ContentLength = o.size - for k, v := range putHeaders { + for k, v := range o.putHeaders { req.Header.Set(k, v) } - uploadCtx, cancelFn := context.WithDeadline(ctx, deadline) - o := &Object{ - PutURL: putURL, - DeleteURL: deleteURL, - uploader: newMD5Uploader(uploadCtx, pw), - } - - if metrics { - objectStorageUploadsOpen.Inc() + resp, err := httpClient.Do(req) + if err != nil { + return fmt.Errorf("PUT request %q: %v", mask.URL(o.putURL), err) } + defer resp.Body.Close() - go func() { - // wait for the upload to finish - <-o.ctx.Done() - if metrics { - objectStorageUploadTime.Observe(time.Since(started).Seconds()) - } - - // wait for provided context to finish before performing cleanup - <-ctx.Done() - o.delete() - }() - - go func() { - defer cancelFn() - if metrics { - defer objectStorageUploadsOpen.Dec() - } - defer func() { - // This will be returned as error to the next write operation on the pipe - pr.CloseWithError(o.uploadError) - }() - - req = req.WithContext(o.ctx) - - resp, err := httpClient.Do(req) - if err != nil { - if metrics { - objectStorageUploadRequestsRequestFailed.Inc() - } - o.uploadError = fmt.Errorf("PUT request %q: %v", mask.URL(o.PutURL), err) - return - } - defer resp.Body.Close() - - if resp.StatusCode != http.StatusOK { - if metrics { - objectStorageUploadRequestsInvalidStatus.Inc() - } - o.uploadError = StatusCodeError(fmt.Errorf("PUT request %v returned: %s", mask.URL(o.PutURL), resp.Status)) - return + if resp.StatusCode != http.StatusOK { + if o.metrics { + objectStorageUploadRequestsInvalidStatus.Inc() } + return StatusCodeError(fmt.Errorf("PUT request %v returned: %s", mask.URL(o.putURL), resp.Status)) + } - o.extractETag(resp.Header.Get("ETag")) - o.uploadError = compareMD5(o.md5Sum(), o.etag) - }() + o.etag = extractETag(resp.Header.Get("ETag")) - return o, nil + return nil } -func (o *Object) delete() { - o.syncAndDelete(o.DeleteURL) +func (o *Object) ETag() string { + return o.etag } -func compareMD5(local, remote string) error { - if !strings.EqualFold(local, remote) { - return fmt.Errorf("ETag mismatch. expected %q got %q", local, remote) - } +func (o *Object) Abort() { + o.Delete() +} - return nil +func (o *Object) Delete() { + deleteURL(o.deleteURL) } diff --git a/internal/objectstore/s3_object.go b/internal/objectstore/s3_object.go index 5ea0773f833d3f666b664689897cea79367caf53..7444283bbc7d60cd399958c5d56d77eafabe761a 100644 --- a/internal/objectstore/s3_object.go +++ b/internal/objectstore/s3_object.go @@ -5,23 +5,36 @@ import ( "io" "time" - "gitlab.com/gitlab-org/gitlab-workhorse/internal/config" - "github.com/aws/aws-sdk-go/aws" - "github.com/aws/aws-sdk-go/service/s3" "github.com/aws/aws-sdk-go/service/s3/s3manager" - "gitlab.com/gitlab-org/labkit/log" + + "gitlab.com/gitlab-org/gitlab-workhorse/internal/config" ) type S3Object struct { credentials config.S3Credentials config config.S3Config objectName string + uploaded bool + uploader } +func NewS3Object(ctx context.Context, objectName string, s3Credentials config.S3Credentials, s3Config config.S3Config, deadline time.Time) (*S3Object, error) { + o := &S3Object{ + credentials: s3Credentials, + config: s3Config, + objectName: objectName, + } + + o.uploader = newUploader(o) + o.Execute(ctx, deadline) + + return o, nil +} + func setEncryptionOptions(input *s3manager.UploadInput, s3Config config.S3Config) { if s3Config.ServerSideEncryption != "" { input.ServerSideEncryption = aws.String(s3Config.ServerSideEncryption) @@ -32,88 +45,48 @@ func setEncryptionOptions(input *s3manager.UploadInput, s3Config config.S3Config } } -func NewS3Object(ctx context.Context, objectName string, s3Credentials config.S3Credentials, s3Config config.S3Config, deadline time.Time) (*S3Object, error) { - pr, pw := io.Pipe() - objectStorageUploadsOpen.Inc() - uploadCtx, cancelFn := context.WithDeadline(ctx, deadline) - - o := &S3Object{ - uploader: newUploader(uploadCtx, pw), - credentials: s3Credentials, - config: s3Config, +func (s *S3Object) Upload(ctx context.Context, r io.Reader) error { + sess, err := setupS3Session(s.credentials, s.config) + if err != nil { + log.WithError(err).Error("error creating S3 session") + return err } - go o.trackUploadTime() - go o.cleanup(ctx) - - go func() { - defer cancelFn() - defer objectStorageUploadsOpen.Dec() - defer func() { - // This will be returned as error to the next write operation on the pipe - pr.CloseWithError(o.uploadError) - }() - - sess, err := setupS3Session(s3Credentials, s3Config) - if err != nil { - o.uploadError = err - log.WithError(err).Error("error creating S3 session") - return - } + uploader := s3manager.NewUploader(sess) - o.objectName = objectName - uploader := s3manager.NewUploader(sess) + input := &s3manager.UploadInput{ + Bucket: aws.String(s.config.Bucket), + Key: aws.String(s.objectName), + Body: r, + } - input := &s3manager.UploadInput{ - Bucket: aws.String(s3Config.Bucket), - Key: aws.String(objectName), - Body: pr, - } + setEncryptionOptions(input, s.config) - setEncryptionOptions(input, s3Config) + _, err = uploader.UploadWithContext(ctx, input) + if err != nil { + log.WithError(err).Error("error uploading S3 session") + return err + } - _, err = uploader.UploadWithContext(uploadCtx, input) - if err != nil { - o.uploadError = err - objectStorageUploadRequestsRequestFailed.Inc() - log.WithError(err).Error("error uploading S3 session") - return - } - }() + s.uploaded = true - return o, nil + return nil } -func (o *S3Object) trackUploadTime() { - started := time.Now() - <-o.ctx.Done() - objectStorageUploadTime.Observe(time.Since(started).Seconds()) +func (s *S3Object) ETag() string { + return "" } -func (o *S3Object) cleanup(ctx context.Context) { - // wait for the upload to finish - <-o.ctx.Done() - - if o.uploadError != nil { - objectStorageUploadRequestsRequestFailed.Inc() - o.delete() - return - } - - // We have now successfully uploaded the file to object storage. Another - // goroutine will hand off the object to gitlab-rails. - <-ctx.Done() - - // gitlab-rails is now done with the object so it's time to delete it. - o.delete() +func (s *S3Object) Abort() { + s.Delete() } -func (o *S3Object) delete() { - if o.objectName == "" { +func (s *S3Object) Delete() { + if !s.uploaded { return } - session, err := setupS3Session(o.credentials, o.config) + session, err := setupS3Session(s.credentials, s.config) if err != nil { log.WithError(err).Error("error setting up S3 session in delete") return @@ -121,8 +94,8 @@ func (o *S3Object) delete() { svc := s3.New(session) input := &s3.DeleteObjectInput{ - Bucket: aws.String(o.config.Bucket), - Key: aws.String(o.objectName), + Bucket: aws.String(s.config.Bucket), + Key: aws.String(s.objectName), } // Note we can't use the request context because in a successful diff --git a/internal/objectstore/upload_strategy.go b/internal/objectstore/upload_strategy.go new file mode 100644 index 0000000000000000000000000000000000000000..5707ba5f24eaef9cd900b5bcfc821aa96b575cb4 --- /dev/null +++ b/internal/objectstore/upload_strategy.go @@ -0,0 +1,46 @@ +package objectstore + +import ( + "context" + "io" + "net/http" + + "gitlab.com/gitlab-org/labkit/log" + "gitlab.com/gitlab-org/labkit/mask" +) + +type uploadStrategy interface { + Upload(ctx context.Context, r io.Reader) error + ETag() string + Abort() + Delete() +} + +func deleteURL(url string) { + if url == "" { + return + } + + req, err := http.NewRequest("DELETE", url, nil) + if err != nil { + log.WithError(err).WithField("object", mask.URL(url)).Warning("Delete failed") + return + } + // TODO: consider adding the context to the outgoing request for better instrumentation + + // here we are not using u.ctx because we must perform cleanup regardless of parent context + resp, err := httpClient.Do(req) + if err != nil { + log.WithError(err).WithField("object", mask.URL(url)).Warning("Delete failed") + return + } + resp.Body.Close() +} + +func extractETag(rawETag string) string { + if rawETag != "" && rawETag[0] == '"' { + rawETag = rawETag[1 : len(rawETag)-1] + } + + return rawETag +} diff --git a/internal/objectstore/uploader.go b/internal/objectstore/uploader.go index a8e52e22ffdf75eb0fbd2dbc1febfad0257f1e2a..0d8d24b4739f87edf7511e16db1364c28f4d9f7b 100644 --- a/internal/objectstore/uploader.go +++ b/internal/objectstore/uploader.go @@ -4,12 +4,11 @@ import ( "context" "crypto/md5" "encoding/hex" + "fmt" "hash" "io" - "net/http" - - "gitlab.com/gitlab-org/labkit/log" - "gitlab.com/gitlab-org/labkit/mask" + "strings" + "time" ) // Upload represents an upload to an ObjectStorage provider @@ -33,16 +32,23 @@ type uploader struct { uploadError error // ctx is the internal context bound to the upload request ctx context.Context + + pr *io.PipeReader + pw *io.PipeWriter + strategy uploadStrategy + metrics bool } -func newUploader(ctx context.Context, w io.WriteCloser) uploader { - return uploader{w: w, c: w, ctx: ctx} +func newUploader(strategy uploadStrategy) uploader { + pr, pw := io.Pipe() + return uploader{w: pw, c: pw, pr: pr, pw: pw, strategy: strategy, metrics: true} } -func newMD5Uploader(ctx context.Context, w io.WriteCloser) uploader { +func newMD5Uploader(strategy uploadStrategy, metrics bool) uploader { + pr, pw := io.Pipe() hasher := md5.New() - mw := io.MultiWriter(w, hasher) - return uploader{w: mw, c: w, md5: hasher, ctx: ctx} + mw := io.MultiWriter(pw, hasher) + return uploader{w: mw, c: pw, pr: pr, pw: pw, md5: hasher, strategy: strategy, metrics: metrics} } // Close implements the standard io.Closer interface: it closes the http client request. @@ -65,50 +71,100 @@ func (u *uploader) Write(p []byte) (int, error) { return u.w.Write(p) } -// syncAndDelete wait for Context to be Done and then performs the requested HTTP call -func (u *uploader) syncAndDelete(url string) { - if url == "" { - return +func (u *uploader) md5Sum() string { + if u.md5 == nil { + return "" } + checksum := u.md5.Sum(nil) + return hex.EncodeToString(checksum) +} + +// ETag returns the checksum of the uploaded object returned by the ObjectStorage provider via ETag Header. +// This method will wait until upload context is done before returning. +func (u *uploader) ETag() string { <-u.ctx.Done() - req, err := http.NewRequest("DELETE", url, nil) - if err != nil { - log.WithError(err).WithField("object", mask.URL(url)).Warning("Delete failed") - return + return u.etag +} + +func (u *uploader) Execute(ctx context.Context, deadline time.Time) { + if u.metrics { + objectStorageUploadsOpen.Inc() } - // TODO: consider adding the context to the outgoing request for better instrumentation + uploadCtx, cancelFn := context.WithDeadline(ctx, deadline) + u.ctx = uploadCtx - // here we are not using u.ctx because we must perform cleanup regardless of parent context - resp, err := httpClient.Do(req) - if err != nil { - log.WithError(err).WithField("object", mask.URL(url)).Warning("Delete failed") - return + if u.metrics { + go u.trackUploadTime() } - resp.Body.Close() + go u.cleanup(ctx) + go func() { + defer cancelFn() + if u.metrics { + defer objectStorageUploadsOpen.Dec() + } + defer func() { + // This will be returned as error to the next write operation on the pipe + u.pr.CloseWithError(u.uploadError) + }() + + err := u.strategy.Upload(uploadCtx, u.pr) + if err != nil { + u.uploadError = err + if u.metrics { + objectStorageUploadRequestsRequestFailed.Inc() + } + return + } + + u.etag = u.strategy.ETag() + + if u.md5 != nil { + err := compareMD5(u.md5Sum(), u.etag) + if err != nil { + u.uploadError = err + if u.metrics { + objectStorageUploadRequestsRequestFailed.Inc() + } + } + } + }() } -func (u *uploader) extractETag(rawETag string) { - if rawETag != "" && rawETag[0] == '"' { - rawETag = rawETag[1 : len(rawETag)-1] +func (u *uploader) trackUploadTime() { + started := time.Now() + <-u.ctx.Done() + + if u.metrics { + objectStorageUploadTime.Observe(time.Since(started).Seconds()) } - u.etag = rawETag } -func (u *uploader) md5Sum() string { - if u.md5 == nil { - return "" +func (u *uploader) cleanup(ctx context.Context) { + // wait for the upload to finish + <-u.ctx.Done() + + if u.uploadError != nil { + if u.metrics { + objectStorageUploadRequestsRequestFailed.Inc() + } + u.strategy.Abort() + return } - checksum := u.md5.Sum(nil) - return hex.EncodeToString(checksum) + // We have now successfully uploaded the file to object storage. Another + // goroutine will hand off the object to gitlab-rails. + <-ctx.Done() + + // gitlab-rails is now done with the object so it's time to delete it. + u.strategy.Delete() } -// ETag returns the checksum of the uploaded object returned by the ObjectStorage provider via ETag Header. -// This method will wait until upload context is done before returning. -func (u *uploader) ETag() string { - <-u.ctx.Done() +func compareMD5(local, remote string) error { + if !strings.EqualFold(local, remote) { + return fmt.Errorf("ETag mismatch. expected %q got %q", local, remote) + } - return u.etag + return nil }