diff --git a/internal/redis/keywatcher_test.go b/internal/redis/keywatcher_test.go index 03380b56a8eaa6633371a19e664f92dfebe45d5e..68c459586e27446b92cdfe1a807bdbf47b9e7add 100644 --- a/internal/redis/keywatcher_test.go +++ b/internal/redis/keywatcher_test.go @@ -36,153 +36,128 @@ func createUnsubscribeMessage(key string) []interface{} { } } +func countWatchers(key string) int { + keyWatcherMutex.Lock() + defer keyWatcherMutex.Unlock() + return len(keyWatcher[key]) +} + +func deleteWatchers(key string) { + keyWatcherMutex.Lock() + defer keyWatcherMutex.Unlock() + delete(keyWatcher, key) +} + +// Forces a run of the `Process` loop against a mock PubSubConn. +func processMessages(numWatchers int, value string) { + psc := redigomock.NewConn() + + // Setup the initial subscription message + psc.Command("SUBSCRIBE", keySubChannel).Expect(createSubscribeMessage(keySubChannel)) + psc.Command("UNSUBSCRIBE", keySubChannel).Expect(createUnsubscribeMessage(keySubChannel)) + psc.AddSubscriptionMessage(createSubscriptionMessage(keySubChannel, runnerKey+"="+value)) + + // Wait for all the `WatchKey` calls to be registered + for countWatchers(runnerKey) != numWatchers { + time.Sleep(time.Millisecond) + } + + processInner(psc) +} + func TestWatchKeySeenChange(t *testing.T) { - mconn, td := setupMockPool() + conn, td := setupMockPool() defer td() - go Process(false) - // Setup the initial subscription message - mconn.Command("SUBSCRIBE", keySubChannel). - Expect(createSubscribeMessage(keySubChannel)) - mconn.Command("UNSUBSCRIBE", keySubChannel). - Expect(createUnsubscribeMessage(keySubChannel)) - mconn.Command("GET", runnerKey). - Expect("something"). - Expect("somethingelse") - mconn.ReceiveWait = true - - mconn.AddSubscriptionMessage(createSubscriptionMessage(keySubChannel, runnerKey+"=somethingelse")) - - // ACTUALLY Fill the buffers - go func(mconn *redigomock.Conn) { - mconn.ReceiveNow <- true - mconn.ReceiveNow <- true - mconn.ReceiveNow <- true - }(mconn) - - val, err := WatchKey(runnerKey, "something", time.Duration(1*time.Second)) - assert.NoError(t, err, "Expected no error") - assert.Equal(t, WatchKeyStatusSeenChange, val, "Expected value to change") + conn.Command("GET", runnerKey).Expect("something") + + wg := &sync.WaitGroup{} + wg.Add(1) + + go func() { + val, err := WatchKey(runnerKey, "something", time.Second) + assert.NoError(t, err, "Expected no error") + assert.Equal(t, WatchKeyStatusSeenChange, val, "Expected value to change") + wg.Done() + }() + + processMessages(1, "somethingelse") + wg.Wait() } func TestWatchKeyNoChange(t *testing.T) { - mconn, td := setupMockPool() + conn, td := setupMockPool() defer td() - go Process(false) - // Setup the initial subscription message - mconn.Command("SUBSCRIBE", keySubChannel). - Expect(createSubscribeMessage(keySubChannel)) - mconn.Command("UNSUBSCRIBE", keySubChannel). - Expect(createUnsubscribeMessage(keySubChannel)) - mconn.Command("GET", runnerKey). - Expect("something"). - Expect("something") - mconn.ReceiveWait = true - - mconn.AddSubscriptionMessage(createSubscriptionMessage(keySubChannel, runnerKey+"=something")) - - // ACTUALLY Fill the buffers - go func(mconn *redigomock.Conn) { - mconn.ReceiveNow <- true - mconn.ReceiveNow <- true - mconn.ReceiveNow <- true - }(mconn) - - val, err := WatchKey(runnerKey, "something", time.Duration(1*time.Second)) - assert.NoError(t, err, "Expected no error") - assert.Equal(t, WatchKeyStatusNoChange, val, "Expected notification without change to value") + conn.Command("GET", runnerKey).Expect("something") + + wg := &sync.WaitGroup{} + wg.Add(1) + + go func() { + val, err := WatchKey(runnerKey, "something", time.Second) + assert.NoError(t, err, "Expected no error") + assert.Equal(t, WatchKeyStatusNoChange, val, "Expected notification without change to value") + wg.Done() + }() + + processMessages(1, "something") + wg.Wait() + } func TestWatchKeyTimeout(t *testing.T) { - mconn, td := setupMockPool() + conn, td := setupMockPool() defer td() - go Process(false) - // Setup the initial subscription message - mconn.Command("SUBSCRIBE", keySubChannel). - Expect(createSubscribeMessage(keySubChannel)) - mconn.Command("UNSUBSCRIBE", keySubChannel). - Expect(createUnsubscribeMessage(keySubChannel)) - mconn.Command("GET", runnerKey). - Expect("something"). - Expect("something") - mconn.ReceiveWait = true - - // ACTUALLY Fill the buffers - go func(mconn *redigomock.Conn) { - mconn.ReceiveNow <- true - mconn.ReceiveNow <- true - mconn.ReceiveNow <- true - }(mconn) - - val, err := WatchKey(runnerKey, "something", time.Duration(1*time.Second)) + conn.Command("GET", runnerKey).Expect("something") + + val, err := WatchKey(runnerKey, "something", time.Millisecond) assert.NoError(t, err, "Expected no error") assert.Equal(t, WatchKeyStatusTimeout, val, "Expected value to not change") + + // Clean up watchers since Process isn't doing that for us (not running) + deleteWatchers(runnerKey) } func TestWatchKeyAlreadyChanged(t *testing.T) { - mconn, td := setupMockPool() + conn, td := setupMockPool() defer td() - go Process(false) - // Setup the initial subscription message - mconn.Command("SUBSCRIBE", keySubChannel). - Expect(createSubscribeMessage(keySubChannel)) - mconn.Command("UNSUBSCRIBE", keySubChannel). - Expect(createUnsubscribeMessage(keySubChannel)) - mconn.Command("GET", runnerKey). - Expect("somethingelse"). - Expect("somethingelse") - mconn.ReceiveWait = true - - // ACTUALLY Fill the buffers - go func(mconn *redigomock.Conn) { - mconn.ReceiveNow <- true - mconn.ReceiveNow <- true - mconn.ReceiveNow <- true - }(mconn) - - val, err := WatchKey(runnerKey, "something", time.Duration(1*time.Second)) + conn.Command("GET", runnerKey).Expect("somethingelse") + + val, err := WatchKey(runnerKey, "something", time.Second) assert.NoError(t, err, "Expected no error") assert.Equal(t, WatchKeyStatusAlreadyChanged, val, "Expected value to have already changed") + + // Clean up watchers since Process isn't doing that for us (not running) + deleteWatchers(runnerKey) } -func TestWatchKeyMassiveParallel(t *testing.T) { - mconn, td := setupMockPool() +func TestWatchKeyMassivelyParallel(t *testing.T) { + runTimes := 100 // 100 parallel watchers + + conn, td := setupMockPool() defer td() - go Process(false) - // Setup the initial subscription message - mconn.Command("SUBSCRIBE", keySubChannel). - Expect(createSubscribeMessage(keySubChannel)) - mconn.Command("UNSUBSCRIBE", keySubChannel). - Expect(createUnsubscribeMessage(keySubChannel)) - getCmd := mconn.Command("GET", runnerKey) - mconn.ReceiveWait = true - - const runTimes = 100 + wg := &sync.WaitGroup{} + wg.Add(runTimes) + + getCmd := conn.Command("GET", runnerKey) + for i := 0; i < runTimes; i++ { - mconn.AddSubscriptionMessage(createSubscriptionMessage(keySubChannel, runnerKey+"=somethingelse")) getCmd = getCmd.Expect("something") } - wg := &sync.WaitGroup{} - // Race-conditions /o/ \o\ for i := 0; i < runTimes; i++ { - wg.Add(1) - go func(mconn *redigomock.Conn) { - defer wg.Done() - // ACTUALLY Fill the buffers - go func(mconn *redigomock.Conn) { - mconn.ReceiveNow <- true - }(mconn) - - val, err := WatchKey(runnerKey, "something", time.Duration(1*time.Second)) + go func() { + val, err := WatchKey(runnerKey, "something", time.Second) assert.NoError(t, err, "Expected no error") assert.Equal(t, WatchKeyStatusSeenChange, val, "Expected value to change") - }(mconn) + wg.Done() + }() } - wg.Wait() + processMessages(runTimes, "somethingelse") + wg.Wait() } diff --git a/internal/redis/redis.go b/internal/redis/redis.go index 051c4059cac2c06a5bde8548fe80ce60698cab1a..7e0825cd147313f79ae2c3e7b26c102b731e73aa 100644 --- a/internal/redis/redis.go +++ b/internal/redis/redis.go @@ -149,8 +149,7 @@ func GetString(key string) (string, error) { if conn == nil { return "", fmt.Errorf("Not connected to redis") } - defer func() { - conn.Close() - }() + defer conn.Close() + return redis.String(conn.Do("GET", key)) }