diff --git a/internal/objectstore/object.go b/internal/objectstore/object.go index c2b671a4262a108438873c21eefd35e87a3e5125..7d906e951926e7575f78f0c613758dfc67fbb59f 100644 --- a/internal/objectstore/object.go +++ b/internal/objectstore/object.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "io" + "io/ioutil" "net" "net/http" "net/url" @@ -73,7 +74,8 @@ func NewObject(ctx context.Context, putURL, deleteURL string, timeout time.Durat pr, pw := io.Pipe() o.writeCloser = pw - req, err := http.NewRequest(http.MethodPut, o.PutURL, pr) + // we should prevent pr.Close() otherwise it may shadow error set with pr.CloseWithError(err) + req, err := http.NewRequest(http.MethodPut, o.PutURL, ioutil.NopCloser(pr)) if err != nil { objectStorageUploadRequestsRequestFailed.Inc() return nil, fmt.Errorf("PUT %q: %v", helper.ScrubURLParams(o.PutURL), err) @@ -103,7 +105,10 @@ func NewObject(ctx context.Context, putURL, deleteURL string, timeout time.Durat go func() { defer cancelFn() defer objectStorageUploadsOpen.Dec() - defer pr.Close() + defer func() { + // This will be returned as error to the next write operation on the pipe + pr.CloseWithError(o.uploadError) + }() req = req.WithContext(o.ctx) diff --git a/internal/objectstore/object_test.go b/internal/objectstore/object_test.go index e7e7c03419834223f88f8fb14fc4f8ea76b1b4d1..f70c2c7527981f5acd5449400b1e546bd6a4eb6b 100644 --- a/internal/objectstore/object_test.go +++ b/internal/objectstore/object_test.go @@ -78,6 +78,7 @@ func TestObjectUpload(t *testing.T) { func TestObjectUpload404(t *testing.T) { assert := assert.New(t) + require := require.New(t) ts := httptest.NewServer(http.NotFoundHandler()) defer ts.Close() @@ -87,13 +88,56 @@ func TestObjectUpload404(t *testing.T) { objectURL := ts.URL + test.ObjectPath object, err := objectstore.NewObject(ctx, objectURL, "", testTimeout, test.ObjectSize) - require.NoError(t, err) + require.NoError(err) _, err = io.Copy(object, strings.NewReader(test.ObjectContent)) assert.NoError(err) err = object.Close() assert.Error(err) _, isStatusCodeError := err.(objectstore.StatusCodeError) - assert.True(isStatusCodeError, "Should fail with StatusCodeError") - assert.Contains(err.Error(), "404") + require.True(isStatusCodeError, "Should fail with StatusCodeError") + require.Contains(err.Error(), "404") +} + +type endlessReader struct{} + +func (e *endlessReader) Read(p []byte) (n int, err error) { + for i := 0; i < len(p); i++ { + p[i] = '*' + } + + return len(p), nil +} + +// TestObjectUploadBrokenConnection purpose is to ensure that errors caused by the upload destination get propagated back correctly. +// This is important for troubleshooting in production. +func TestObjectUploadBrokenConnection(t *testing.T) { + // This test server closes connection immediately + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + hj, ok := w.(http.Hijacker) + if !ok { + require.FailNow(t, "webserver doesn't support hijacking") + } + conn, _, err := hj.Hijack() + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + conn.Close() + })) + defer ts.Close() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + objectURL := ts.URL + test.ObjectPath + object, err := objectstore.NewObject(ctx, objectURL, "", testTimeout, -1) + require.NoError(t, err) + + _, copyErr := io.Copy(object, &endlessReader{}) + require.Error(t, copyErr) + require.NotEqual(t, io.ErrClosedPipe, copyErr, "We are shadowing the real error") + + closeErr := object.Close() + require.Equal(t, copyErr, closeErr) }