From aa4fa17837218e2d163e592d10fbfc43aa367eb7 Mon Sep 17 00:00:00 2001
From: Alessio Caiazza <acaiazza@gitlab.com>
Date: Fri, 18 May 2018 17:06:07 +0200
Subject: [PATCH] Fix objectstore error shadowing

When we are uploading big objects, remote server may close the connection
while we are still writing.

This patch allows to fetch the real error instead of io.ErrClosedPipe
---
 internal/objectstore/object.go      |  9 ++++--
 internal/objectstore/object_test.go | 50 +++++++++++++++++++++++++++--
 2 files changed, 54 insertions(+), 5 deletions(-)

diff --git a/internal/objectstore/object.go b/internal/objectstore/object.go
index c2b671a4262a1..7d906e951926e 100644
--- a/internal/objectstore/object.go
+++ b/internal/objectstore/object.go
@@ -4,6 +4,7 @@ import (
 	"context"
 	"fmt"
 	"io"
+	"io/ioutil"
 	"net"
 	"net/http"
 	"net/url"
@@ -73,7 +74,8 @@ func NewObject(ctx context.Context, putURL, deleteURL string, timeout time.Durat
 	pr, pw := io.Pipe()
 	o.writeCloser = pw
 
-	req, err := http.NewRequest(http.MethodPut, o.PutURL, pr)
+	// we should prevent pr.Close() otherwise it may shadow error set with pr.CloseWithError(err)
+	req, err := http.NewRequest(http.MethodPut, o.PutURL, ioutil.NopCloser(pr))
 	if err != nil {
 		objectStorageUploadRequestsRequestFailed.Inc()
 		return nil, fmt.Errorf("PUT %q: %v", helper.ScrubURLParams(o.PutURL), err)
@@ -103,7 +105,10 @@ func NewObject(ctx context.Context, putURL, deleteURL string, timeout time.Durat
 	go func() {
 		defer cancelFn()
 		defer objectStorageUploadsOpen.Dec()
-		defer pr.Close()
+		defer func() {
+			// This will be returned as error to the next write operation on the pipe
+			pr.CloseWithError(o.uploadError)
+		}()
 
 		req = req.WithContext(o.ctx)
 
diff --git a/internal/objectstore/object_test.go b/internal/objectstore/object_test.go
index e7e7c03419834..f70c2c7527981 100644
--- a/internal/objectstore/object_test.go
+++ b/internal/objectstore/object_test.go
@@ -78,6 +78,7 @@ func TestObjectUpload(t *testing.T) {
 
 func TestObjectUpload404(t *testing.T) {
 	assert := assert.New(t)
+	require := require.New(t)
 
 	ts := httptest.NewServer(http.NotFoundHandler())
 	defer ts.Close()
@@ -87,13 +88,56 @@ func TestObjectUpload404(t *testing.T) {
 
 	objectURL := ts.URL + test.ObjectPath
 	object, err := objectstore.NewObject(ctx, objectURL, "", testTimeout, test.ObjectSize)
-	require.NoError(t, err)
+	require.NoError(err)
 	_, err = io.Copy(object, strings.NewReader(test.ObjectContent))
 
 	assert.NoError(err)
 	err = object.Close()
 	assert.Error(err)
 	_, isStatusCodeError := err.(objectstore.StatusCodeError)
-	assert.True(isStatusCodeError, "Should fail with StatusCodeError")
-	assert.Contains(err.Error(), "404")
+	require.True(isStatusCodeError, "Should fail with StatusCodeError")
+	require.Contains(err.Error(), "404")
+}
+
+type endlessReader struct{}
+
+func (e *endlessReader) Read(p []byte) (n int, err error) {
+	for i := 0; i < len(p); i++ {
+		p[i] = '*'
+	}
+
+	return len(p), nil
+}
+
+// TestObjectUploadBrokenConnection purpose is to ensure that errors caused by the upload destination get propagated back correctly.
+// This is important for troubleshooting in production.
+func TestObjectUploadBrokenConnection(t *testing.T) {
+	// This test server closes connection immediately
+	ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+		hj, ok := w.(http.Hijacker)
+		if !ok {
+			require.FailNow(t, "webserver doesn't support hijacking")
+		}
+		conn, _, err := hj.Hijack()
+		if err != nil {
+			http.Error(w, err.Error(), http.StatusInternalServerError)
+			return
+		}
+		conn.Close()
+	}))
+	defer ts.Close()
+
+	ctx, cancel := context.WithCancel(context.Background())
+	defer cancel()
+
+	objectURL := ts.URL + test.ObjectPath
+	object, err := objectstore.NewObject(ctx, objectURL, "", testTimeout, -1)
+	require.NoError(t, err)
+
+	_, copyErr := io.Copy(object, &endlessReader{})
+	require.Error(t, copyErr)
+	require.NotEqual(t, io.ErrClosedPipe, copyErr, "We are shadowing the real error")
+
+	closeErr := object.Close()
+	require.Equal(t, copyErr, closeErr)
 }
-- 
GitLab