From baffc0a4b623ff51ee0d40d2f9bfd13f1523391c Mon Sep 17 00:00:00 2001
From: Kamil Trzcinski <ayufan@ayufan.eu>
Date: Tue, 28 Feb 2017 11:01:51 +0100
Subject: [PATCH] Add apiCiLongPolling option to use a new reds queueing
 mechanism

---
 README.md                   |  2 +
 internal/builds/register.go | 94 +++++++++++++++++++++++++++++++++++++
 internal/config/config.go   |  1 +
 internal/upstream/routes.go |  6 ++-
 main.go                     |  2 +
 5 files changed, 103 insertions(+), 2 deletions(-)
 create mode 100644 internal/builds/register.go

diff --git a/README.md b/README.md
index a7e051f45fe00..ddb1d372c510e 100644
--- a/README.md
+++ b/README.md
@@ -36,6 +36,8 @@ gitlab-workhorse'][brief-history-blog].
   gitlab-workhorse [OPTIONS]
 
 Options:
+  -apiCiLongPolling duration
+        Long polling duration for job requesting for runners (default 0s - disabled)
   -apiLimit uint
         Number of API requests allowed at single time
   -apiQueueDuration duration
diff --git a/internal/builds/register.go b/internal/builds/register.go
new file mode 100644
index 0000000000000..ff7bc5626860b
--- /dev/null
+++ b/internal/builds/register.go
@@ -0,0 +1,94 @@
+package builds
+
+import (
+	"bytes"
+	"gitlab.com/gitlab-org/gitlab-workhorse/internal/helper"
+	"gitlab.com/gitlab-org/gitlab-workhorse/internal/redis"
+	"io"
+	"io/ioutil"
+	"net/http"
+	"time"
+)
+
+const MaxRegisterBodySize = 4 * 1024
+
+func readRunnerQueueKey(w http.ResponseWriter, r *http.Request) (string, error) {
+	limitedBody := http.MaxBytesReader(w, r.Body, MaxRegisterBodySize)
+	defer limitedBody.Close()
+
+	// Read body
+	var body bytes.Buffer
+	_, err := io.Copy(&body, limitedBody)
+	if err != nil {
+		return "", err
+	}
+
+	r.Body = ioutil.NopCloser(&body)
+
+	tmpReq := *r
+	tmpReq.Body = ioutil.NopCloser(bytes.NewReader(body.Bytes()))
+
+	err = tmpReq.ParseForm()
+	if err != nil {
+		return "", err
+	}
+
+	token := tmpReq.FormValue("token")
+	if token == "" {
+		return "", nil
+	}
+
+	key := "runner:build_queue:" + token
+	return key, nil
+}
+
+func RegisterHandler(h http.Handler, pollingDuration time.Duration) http.Handler {
+	if pollingDuration == 0 {
+		return h
+	}
+
+	return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+		lastUpdate := r.Header.Get("X-GitLab-Last-Update")
+		if lastUpdate != "" {
+			// We could have a fail-over implementation here, for old runners, that:
+			// Proxies the requests, if this is 204, we delay the response to client,
+			// By checking the response from handler, and reading `X-GitLab-Last-Update`,
+			// and then watching on a key
+			h.ServeHTTP(w, r)
+			return
+		}
+
+		queueKey, err := readRunnerQueueKey(w, r)
+		if err != nil {
+			helper.Fail500(w, r, err)
+			return
+		}
+
+		result, err := redis.WatchKey(queueKey, lastUpdate, pollingDuration)
+		if err != nil {
+			helper.Fail500(w, r, err)
+			return
+		}
+
+		switch result {
+		// It means that we detected a change before starting watching on change,
+		// We proxy request to Rails, to see whether we can receive the build
+		case redis.WatchKeyStatusAlreadyChanged:
+			h.ServeHTTP(w, r)
+
+		// It means that we detected a change after watching.
+		// We could potentially proxy request to Rails, but...
+		// We can end-up with unreliable responses,
+		// as don't really know whether ResponseWriter is still in a sane state,
+		// whether the connection is not dead
+		case redis.WatchKeyStatusSeenChange:
+			w.WriteHeader(204)
+
+		// When we receive one of these statuses, it means that we detected no change,
+		// so we return to runner 204, which means nothing got changed,
+		// and there's no new builds to process
+		case redis.WatchKeyStatusTimeout, redis.WatchKeyStatusNoChange:
+			w.WriteHeader(204)
+		}
+	})
+}
diff --git a/internal/config/config.go b/internal/config/config.go
index af806f83f1f86..7ba5796b4a1b5 100644
--- a/internal/config/config.go
+++ b/internal/config/config.go
@@ -38,6 +38,7 @@ type Config struct {
 	APILimit            uint          `toml:"-"`
 	APIQueueLimit       uint          `toml:"-"`
 	APIQueueTimeout     time.Duration `toml:"-"`
+	APICILongPolling    time.Duration `toml:"-"`
 }
 
 // LoadConfig from a file
diff --git a/internal/upstream/routes.go b/internal/upstream/routes.go
index d3e8eff3a1d06..f35d8459d6790 100644
--- a/internal/upstream/routes.go
+++ b/internal/upstream/routes.go
@@ -9,6 +9,7 @@ import (
 
 	apipkg "gitlab.com/gitlab-org/gitlab-workhorse/internal/api"
 	"gitlab.com/gitlab-org/gitlab-workhorse/internal/artifacts"
+	"gitlab.com/gitlab-org/gitlab-workhorse/internal/builds"
 	"gitlab.com/gitlab-org/gitlab-workhorse/internal/git"
 	"gitlab.com/gitlab-org/gitlab-workhorse/internal/helper"
 	"gitlab.com/gitlab-org/gitlab-workhorse/internal/lfs"
@@ -118,6 +119,7 @@ func (u *Upstream) configureRoutes() {
 
 	uploadAccelerateProxy := upload.Accelerate(path.Join(u.DocumentRoot, "uploads/tmp"), proxy)
 	ciAPIProxyQueue := queueing.QueueRequests(uploadAccelerateProxy, u.APILimit, u.APIQueueLimit, u.APIQueueTimeout)
+	ciAPILongPolling := builds.RegisterHandler(ciAPIProxyQueue, u.APICILongPolling)
 
 	u.Routes = []routeEntry{
 		// Git Clone
@@ -132,8 +134,8 @@ func (u *Upstream) configureRoutes() {
 		// Terminal websocket
 		wsRoute(projectPattern+`environments/[0-9]+/terminal.ws\z`, terminal.Handler(api)),
 
-		// Limit capacity given to builds/register.json
-		route("", ciAPIPattern+`v1/builds/register.json\z`, ciAPIProxyQueue),
+		// Long poll and limit capacity given to builds/register.json
+		route("", ciAPIPattern+`v1/builds/register.json\z`, ciAPILongPolling),
 
 		// Explicitly proxy API requests
 		route("", apiPattern, proxy),
diff --git a/main.go b/main.go
index ee2f595ac5bcb..4a3d0da1e3c8a 100644
--- a/main.go
+++ b/main.go
@@ -51,6 +51,7 @@ var secretPath = flag.String("secretPath", "./.gitlab_workhorse_secret", "File w
 var apiLimit = flag.Uint("apiLimit", 0, "Number of API requests allowed at single time")
 var apiQueueLimit = flag.Uint("apiQueueLimit", 0, "Number of API requests allowed to be queued")
 var apiQueueTimeout = flag.Duration("apiQueueDuration", queueing.DefaultTimeout, "Maximum queueing duration of requests")
+var apiCiLongPolling = flag.Duration("apiCiLongPolling", 0, "Long polling duration for job requesting for runners (default 0s - disabled)")
 var logFile = flag.String("logFile", "", "Log file to be used")
 var prometheusListenAddr = flag.String("prometheusListenAddr", "", "Prometheus listening address, e.g. ':9100'")
 
@@ -121,6 +122,7 @@ func main() {
 		APILimit:            *apiLimit,
 		APIQueueLimit:       *apiQueueLimit,
 		APIQueueTimeout:     *apiQueueTimeout,
+		APICILongPolling:    *apiCiLongPolling,
 	}
 
 	if *configFile != "" {
-- 
GitLab