diff --git a/internal/api/api.go b/internal/api/api.go index d8bd99f599375dbc7df9d4be0c9c8873f3a4450d..6f1f7079645bdce37d6c8481f2cdab9a92c74dd1 100644 --- a/internal/api/api.go +++ b/internal/api/api.go @@ -85,6 +85,10 @@ type RemoteObject struct { DeleteURL string // StoreURL is the temporary presigned S3 PutObject URL to which upload the first found file StoreURL string + // Boolean to indicate whether to use headers included in PutHeaders + CustomPutHeaders bool + // PutHeaders are HTTP headers (e.g. Content-Type) to be sent with StoreURL + PutHeaders map[string]string // ID is a unique identifier of object storage upload ID string // Timeout is a number that represents timeout in seconds for sending data to StoreURL diff --git a/internal/filestore/file_handler.go b/internal/filestore/file_handler.go index e8c78642564071ca6a1c9c3058945f967fdb4978..5fae3fe9b8d85c91de0d4e8b5fb70b6f3659cd4c 100644 --- a/internal/filestore/file_handler.go +++ b/internal/filestore/file_handler.go @@ -100,14 +100,14 @@ func SaveFileFromReader(ctx context.Context, reader io.Reader, size int64, opts }() if opts.IsMultipart() { - remoteWriter, err = objectstore.NewMultipart(ctx, opts.PresignedParts, opts.PresignedCompleteMultipart, opts.PresignedAbortMultipart, opts.PresignedDelete, opts.Deadline, opts.PartSize) + remoteWriter, err = objectstore.NewMultipart(ctx, opts.PresignedParts, opts.PresignedCompleteMultipart, opts.PresignedAbortMultipart, opts.PresignedDelete, opts.PutHeaders, opts.Deadline, opts.PartSize) if err != nil { return nil, err } writers = append(writers, remoteWriter) } else if opts.IsRemote() { - remoteWriter, err = objectstore.NewObject(ctx, opts.PresignedPut, opts.PresignedDelete, opts.Deadline, size) + remoteWriter, err = objectstore.NewObject(ctx, opts.PresignedPut, opts.PresignedDelete, opts.PutHeaders, opts.Deadline, size) if err != nil { return nil, err } diff --git a/internal/filestore/save_file_opts.go b/internal/filestore/save_file_opts.go index a3729c59201760a6fa771fdb42d7d8ee97a4ca22..733f6829924b41235d4ca84216225b4a851af753 100644 --- a/internal/filestore/save_file_opts.go +++ b/internal/filestore/save_file_opts.go @@ -23,6 +23,9 @@ type SaveFileOpts struct { PresignedPut string // PresignedDelete is a presigned S3 DeleteObject compatible URL. PresignedDelete string + // HTTP headers to be sent along with PUT request + PutHeaders map[string]string + // Deadline it the S3 operation deadline, the upload will be aborted if not completed in time Deadline time.Time @@ -65,9 +68,17 @@ func GetOpts(apiResponse *api.Response) *SaveFileOpts { RemoteURL: apiResponse.RemoteObject.GetURL, PresignedPut: apiResponse.RemoteObject.StoreURL, PresignedDelete: apiResponse.RemoteObject.DeleteURL, + PutHeaders: apiResponse.RemoteObject.PutHeaders, Deadline: time.Now().Add(timeout), } + // Backwards compatibility to ensure API servers that do not include the + // CustomPutHeaders flag will default to the original content type. + if !apiResponse.RemoteObject.CustomPutHeaders { + opts.PutHeaders = make(map[string]string) + opts.PutHeaders["Content-Type"] = "application/octet-stream" + } + if multiParams := apiResponse.RemoteObject.MultipartUpload; multiParams != nil { opts.PartSize = multiParams.PartSize opts.PresignedCompleteMultipart = multiParams.CompleteURL diff --git a/internal/filestore/save_file_opts_test.go b/internal/filestore/save_file_opts_test.go index d114e8591100bed5925b316998e3abdfd514b069..35abde2f0fd279f41cb091a137a641f16f264313 100644 --- a/internal/filestore/save_file_opts_test.go +++ b/internal/filestore/save_file_opts_test.go @@ -76,8 +76,10 @@ func TestSaveFileOptsLocalAndRemote(t *testing.T) { func TestGetOpts(t *testing.T) { tests := []struct { - name string - multipart *api.MultipartUploadParams + name string + multipart *api.MultipartUploadParams + customPutHeaders bool + putHeaders map[string]string }{ { name: "Single upload", @@ -90,6 +92,21 @@ func TestGetOpts(t *testing.T) { PartURLs: []string{"http://part1", "http://part2"}, }, }, + { + name: "Single upload with custom content type", + customPutHeaders: true, + putHeaders: map[string]string{"Content-Type": "image/jpeg"}, + }, { + name: "Multipart upload with custom content type", + multipart: &api.MultipartUploadParams{ + PartSize: 10, + CompleteURL: "http://complete", + AbortURL: "http://abort", + PartURLs: []string{"http://part1", "http://part2"}, + }, + customPutHeaders: true, + putHeaders: map[string]string{"Content-Type": "image/jpeg"}, + }, } for _, test := range tests { @@ -99,12 +116,14 @@ func TestGetOpts(t *testing.T) { apiResponse := &api.Response{ TempPath: "/tmp", RemoteObject: api.RemoteObject{ - Timeout: 10, - ID: "id", - GetURL: "http://get", - StoreURL: "http://store", - DeleteURL: "http://delete", - MultipartUpload: test.multipart, + Timeout: 10, + ID: "id", + GetURL: "http://get", + StoreURL: "http://store", + DeleteURL: "http://delete", + MultipartUpload: test.multipart, + CustomPutHeaders: test.customPutHeaders, + PutHeaders: test.putHeaders, }, } deadline := time.Now().Add(time.Duration(apiResponse.RemoteObject.Timeout) * time.Second) @@ -116,6 +135,12 @@ func TestGetOpts(t *testing.T) { assert.Equal(apiResponse.RemoteObject.GetURL, opts.RemoteURL) assert.Equal(apiResponse.RemoteObject.StoreURL, opts.PresignedPut) assert.Equal(apiResponse.RemoteObject.DeleteURL, opts.PresignedDelete) + if test.customPutHeaders { + assert.Equal(opts.PutHeaders, apiResponse.RemoteObject.PutHeaders) + } else { + assert.Equal(opts.PutHeaders, map[string]string{"Content-Type": "application/octet-stream"}) + } + if test.multipart == nil { assert.False(opts.IsMultipart()) assert.Empty(opts.PresignedCompleteMultipart) diff --git a/internal/objectstore/multipart.go b/internal/objectstore/multipart.go index b656c589843d993a32326da6d03e055416373ec6..aba123951cd5ee328c4a6410b8baf3aac5f83e2d 100644 --- a/internal/objectstore/multipart.go +++ b/internal/objectstore/multipart.go @@ -36,7 +36,7 @@ type Multipart struct { // NewMultipart provides Multipart pointer that can be used for uploading. Data written will be split buffered on disk up to size bytes // then uploaded with S3 Upload Part. Once Multipart is Closed a final call to CompleteMultipartUpload will be sent. // In case of any error a call to AbortMultipartUpload will be made to cleanup all the resources -func NewMultipart(ctx context.Context, partURLs []string, completeURL, abortURL, deleteURL string, deadline time.Time, partSize int64) (*Multipart, error) { +func NewMultipart(ctx context.Context, partURLs []string, completeURL, abortURL, deleteURL string, putHeaders map[string]string, deadline time.Time, partSize int64) (*Multipart, error) { pr, pw := io.Pipe() uploadCtx, cancelFn := context.WithDeadline(ctx, deadline) m := &Multipart{ @@ -62,7 +62,7 @@ func NewMultipart(ctx context.Context, partURLs []string, completeURL, abortURL, cmu := &CompleteMultipartUpload{} for i, partURL := range partURLs { src := io.LimitReader(pr, partSize) - part, err := m.readAndUploadOnePart(partURL, src, i+1) + part, err := m.readAndUploadOnePart(partURL, putHeaders, src, i+1) if err != nil { m.uploadError = err return @@ -175,7 +175,7 @@ func (m *Multipart) verifyETag(cmu *CompleteMultipartUpload) error { return nil } -func (m *Multipart) readAndUploadOnePart(partURL string, src io.Reader, partNumber int) (*completeMultipartUploadPart, error) { +func (m *Multipart) readAndUploadOnePart(partURL string, putHeaders map[string]string, src io.Reader, partNumber int) (*completeMultipartUploadPart, error) { file, err := ioutil.TempFile("", "part-buffer") if err != nil { return nil, fmt.Errorf("Unable to create a temporary file for buffering: %v", err) @@ -198,20 +198,20 @@ func (m *Multipart) readAndUploadOnePart(partURL string, src io.Reader, partNumb return nil, fmt.Errorf("Cannot rewind part %d temporary dump : %v", partNumber, err) } - etag, err := m.uploadPart(partURL, file, n) + etag, err := m.uploadPart(partURL, putHeaders, file, n) if err != nil { return nil, fmt.Errorf("Cannot upload part %d: %v", partNumber, err) } return &completeMultipartUploadPart{PartNumber: partNumber, ETag: etag}, nil } -func (m *Multipart) uploadPart(url string, body io.Reader, size int64) (string, error) { +func (m *Multipart) uploadPart(url string, headers map[string]string, body io.Reader, size int64) (string, error) { deadline, ok := m.ctx.Deadline() if !ok { return "", fmt.Errorf("Missing deadline") } - part, err := newObject(m.ctx, url, "", deadline, size, false) + part, err := newObject(m.ctx, url, "", headers, deadline, size, false) if err != nil { return "", err } diff --git a/internal/objectstore/object.go b/internal/objectstore/object.go index e0c3c3919a6c8db679d8e766eebaf875690832ad..28f3daf31cca8db838d16ab7a768c513cdf68c30 100644 --- a/internal/objectstore/object.go +++ b/internal/objectstore/object.go @@ -47,11 +47,11 @@ type Object struct { } // NewObject opens an HTTP connection to Object Store and returns an Object pointer that can be used for uploading. -func NewObject(ctx context.Context, putURL, deleteURL string, deadline time.Time, size int64) (*Object, error) { - return newObject(ctx, putURL, deleteURL, deadline, size, true) +func NewObject(ctx context.Context, putURL, deleteURL string, putHeaders map[string]string, deadline time.Time, size int64) (*Object, error) { + return newObject(ctx, putURL, deleteURL, putHeaders, deadline, size, true) } -func newObject(ctx context.Context, putURL, deleteURL string, deadline time.Time, size int64, metrics bool) (*Object, error) { +func newObject(ctx context.Context, putURL, deleteURL string, putHeaders map[string]string, deadline time.Time, size int64, metrics bool) (*Object, error) { started := time.Now() pr, pw := io.Pipe() // we should prevent pr.Close() otherwise it may shadow error set with pr.CloseWithError(err) @@ -63,7 +63,10 @@ func newObject(ctx context.Context, putURL, deleteURL string, deadline time.Time return nil, fmt.Errorf("PUT %q: %v", helper.ScrubURLParams(putURL), err) } req.ContentLength = size - req.Header.Set("Content-Type", "application/octet-stream") + + for k, v := range putHeaders { + req.Header.Set(k, v) + } uploadCtx, cancelFn := context.WithDeadline(ctx, deadline) o := &Object{ diff --git a/internal/objectstore/object_test.go b/internal/objectstore/object_test.go index 3665a6b760fd32c0eadb248d893d5d8366827503..e5f9a562caf1c428f0bfc55cfe8e1184544e605e 100644 --- a/internal/objectstore/object_test.go +++ b/internal/objectstore/object_test.go @@ -18,7 +18,7 @@ import ( const testTimeout = 10 * time.Second -func testObjectUploadNoErrors(t *testing.T, useDeleteURL bool) { +func testObjectUploadNoErrors(t *testing.T, useDeleteURL bool, contentType string) { assert := assert.New(t) osStub, ts := test.StartObjectStore() @@ -30,11 +30,13 @@ func testObjectUploadNoErrors(t *testing.T, useDeleteURL bool) { deleteURL = objectURL } + putHeaders := map[string]string{"Content-Type": contentType} + ctx, cancel := context.WithCancel(context.Background()) defer cancel() deadline := time.Now().Add(testTimeout) - object, err := objectstore.NewObject(ctx, objectURL, deleteURL, deadline, test.ObjectSize) + object, err := objectstore.NewObject(ctx, objectURL, deleteURL, putHeaders, deadline, test.ObjectSize) require.NoError(t, err) // copy data @@ -46,6 +48,8 @@ func testObjectUploadNoErrors(t *testing.T, useDeleteURL bool) { err = object.Close() assert.NoError(err) + assert.Equal(contentType, osStub.GetHeader(test.ObjectPath, "Content-Type")) + // Checking MD5 extraction assert.Equal(osStub.GetObjectMD5(test.ObjectPath), object.ETag()) @@ -73,8 +77,9 @@ func testObjectUploadNoErrors(t *testing.T, useDeleteURL bool) { } func TestObjectUpload(t *testing.T) { - t.Run("with delete URL", func(t *testing.T) { testObjectUploadNoErrors(t, true) }) - t.Run("without delete URL", func(t *testing.T) { testObjectUploadNoErrors(t, false) }) + t.Run("with delete URL", func(t *testing.T) { testObjectUploadNoErrors(t, true, "application/octet-stream") }) + t.Run("without delete URL", func(t *testing.T) { testObjectUploadNoErrors(t, false, "application/octet-stream") }) + t.Run("with custom content type", func(t *testing.T) { testObjectUploadNoErrors(t, false, "image/jpeg") }) } func TestObjectUpload404(t *testing.T) { @@ -89,7 +94,7 @@ func TestObjectUpload404(t *testing.T) { deadline := time.Now().Add(testTimeout) objectURL := ts.URL + test.ObjectPath - object, err := objectstore.NewObject(ctx, objectURL, "", deadline, test.ObjectSize) + object, err := objectstore.NewObject(ctx, objectURL, "", map[string]string{}, deadline, test.ObjectSize) require.NoError(err) _, err = io.Copy(object, strings.NewReader(test.ObjectContent)) @@ -134,7 +139,7 @@ func TestObjectUploadBrokenConnection(t *testing.T) { deadline := time.Now().Add(testTimeout) objectURL := ts.URL + test.ObjectPath - object, err := objectstore.NewObject(ctx, objectURL, "", deadline, -1) + object, err := objectstore.NewObject(ctx, objectURL, "", map[string]string{}, deadline, -1) require.NoError(t, err) _, copyErr := io.Copy(object, &endlessReader{}) diff --git a/internal/objectstore/test/objectstore_stub.go b/internal/objectstore/test/objectstore_stub.go index c8bd57536aa1cae2f9121e65b0d58fb05ce4d7b9..cd100e6724907423bb4208ce503aae2dbd581162 100644 --- a/internal/objectstore/test/objectstore_stub.go +++ b/internal/objectstore/test/objectstore_stub.go @@ -27,6 +27,8 @@ type ObjectstoreStub struct { overwriteMD5 map[string]string // multipart is a map of MultipartUploads multipart map[string]partsEtagMap + // HTTP header sent along request + headers map[string]*http.Header puts int deletes int @@ -45,6 +47,7 @@ func StartObjectStoreWithCustomMD5(md5Hashes map[string]string) (*ObjectstoreStu bucket: make(map[string]string), multipart: make(map[string]partsEtagMap), overwriteMD5: make(map[string]string), + headers: make(map[string]*http.Header), } for k, v := range md5Hashes { @@ -79,6 +82,18 @@ func (o *ObjectstoreStub) GetObjectMD5(path string) string { return o.bucket[path] } +// GetHeader returns a given HTTP header of the object uploaded to the path +func (o *ObjectstoreStub) GetHeader(path, key string) string { + o.m.Lock() + defer o.m.Unlock() + + if val, ok := o.headers[path]; ok { + return val.Get(key) + } + + return "" +} + // InitiateMultipartUpload prepare the ObjectstoreStob to receive a MultipartUpload on path // It will return an error if a MultipartUpload is already in progress on that path // InitiateMultipartUpload is only used during test setup. @@ -146,6 +161,7 @@ func (o *ObjectstoreStub) putObject(w http.ResponseWriter, r *http.Request) { etag = hex.EncodeToString(checksum) } + o.headers[objectPath] = &r.Header o.puts++ if o.isMultipartUpload(objectPath) { pNumber := r.URL.Query().Get("partNumber")