diff --git a/README.md b/README.md index 012b6dd128e835762d917000b67c1feef30a02bc..047cd3773d5468a12c2736a919e7eb584417b4f1 100644 --- a/README.md +++ b/README.md @@ -13,6 +13,12 @@ gitlab-workhorse'][brief-history-blog]. gitlab-workhorse [OPTIONS] Options: + -apiLimit uint + Number of API requests allowed at single time + -apiQueueDuration duration + Maximum queueing duration of requests (default 30s) + -apiQueueLimit uint + Number of API requests allowed to be queued -authBackend string Authentication/authorization backend (default "http://localhost:8080") -authSocket string diff --git a/internal/queueing/queue.go b/internal/queueing/queue.go index 5a5c567300e51353c876aad5f1ff7de5e4076f21..2b7d58ec125e09161e1184a9feb7b463c193d660 100644 --- a/internal/queueing/queue.go +++ b/internal/queueing/queue.go @@ -23,7 +23,7 @@ type Queue struct { func NewQueue(limit, queueLimit uint) *Queue { return &Queue{ busyCh: make(chan struct{}, limit), - waitingCh: make(chan struct{}, limit+queueLimit), + waitingCh: make(chan struct{}, queueLimit), } } @@ -41,9 +41,7 @@ func (s *Queue) Acquire(timeout time.Duration) (err error) { } defer func() { - if err != nil { - <-s.waitingCh - } + <-s.waitingCh }() // fast path: push item to current processed items (non-blocking) @@ -71,6 +69,5 @@ func (s *Queue) Acquire(timeout time.Duration) (err error) { // It triggers next request to be processed if it's in queue func (s *Queue) Release() { // dequeue from queue to allow next request to be processed - <-s.waitingCh <-s.busyCh } diff --git a/internal/queueing/requests_test.go b/internal/queueing/requests_test.go index 607be81edd95c567ef499ec1c8af1f03480b7814..22384685a6a8007368470b3e74213e3c6e1872d5 100644 --- a/internal/queueing/requests_test.go +++ b/internal/queueing/requests_test.go @@ -12,14 +12,14 @@ var httpHandler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) fmt.Fprintln(w, "OK") }) -func slowHttpHandler(closeCh chan struct{}) http.Handler { +func pausedHttpHandler(pauseCh chan struct{}) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - <-closeCh + <-pauseCh fmt.Fprintln(w, "OK") }) } -func TestQueueRequests(t *testing.T) { +func TestNormalRequestProcessing(t *testing.T) { w := httptest.NewRecorder() h := QueueRequests(httpHandler, 1, 1, time.Second) h.ServeHTTP(w, nil) @@ -28,28 +28,34 @@ func TestQueueRequests(t *testing.T) { } } +// testSlowRequestProcessing creates a new queue, +// then it runs a number of requests that are going through queue, +// we return the response of first finished request, +// where status of request can be 200, 429 or 503 func testSlowRequestProcessing(count, limit, queueLimit uint, queueTimeout time.Duration) *httptest.ResponseRecorder { - closeCh := make(chan struct{}) - defer close(closeCh) + pauseCh := make(chan struct{}) + defer close(pauseCh) - handler := QueueRequests(slowHttpHandler(closeCh), limit, queueLimit, queueTimeout) + handler := QueueRequests(pausedHttpHandler(pauseCh), limit, queueLimit, queueTimeout) respCh := make(chan *httptest.ResponseRecorder, count) // queue requests to use up the queue - for count > 0 { + for i := 0; i < count; i++ { go func() { w := httptest.NewRecorder() handler.ServeHTTP(w, nil) respCh <- w }() - count-- } // dequeue first request return <-respCh } +// TestQueueingTimeout performs 2 requests +// the queue limit and length is 1, +// the second request gets timed-out func TestQueueingTimeout(t *testing.T) { w := testSlowRequestProcessing(2, 1, 1, time.Microsecond) @@ -58,7 +64,10 @@ func TestQueueingTimeout(t *testing.T) { } } -func TestQueuedRequests(t *testing.T) { +// TestQueueingTooManyRequests performs 3 requests +// the queue limit and length is 1, +// so the third request has to be rejected with 429 +func TestQueueingTooManyRequests(t *testing.T) { w := testSlowRequestProcessing(3, 1, 1, time.Minute) if w.Code != 429 {