// Copyright 2023 The Gitea Authors. All rights reserved. // SPDX-License-Identifier: MIT package queue import ( "strconv" "sync" "testing" "time" "code.gitea.io/gitea/modules/log" "github.com/stretchr/testify/assert" ) func TestPersistableChannelUniqueQueue(t *testing.T) { tmpDir := t.TempDir() _ = log.NewLogger(1000, "console", "console", `{"level":"warn","stacktracelevel":"NONE","stderr":true}`) // Common function to create the Queue newQueue := func(name string, handle func(data ...Data) []Data) Queue { q, err := NewPersistableChannelUniqueQueue(handle, PersistableChannelUniqueQueueConfiguration{ Name: name, DataDir: tmpDir, QueueLength: 200, MaxWorkers: 1, BlockTimeout: 1 * time.Second, BoostTimeout: 5 * time.Minute, BoostWorkers: 1, Workers: 0, }, "task-0") assert.NoError(t, err) return q } // runs the provided queue and provides some timer function type channels struct { readyForShutdown chan struct{} // closed when shutdown functions have been assigned readyForTerminate chan struct{} // closed when terminate functions have been assigned signalShutdown chan struct{} // Should close to signal shutdown doneShutdown chan struct{} // closed when shutdown function is done queueTerminate []func() // list of atTerminate functions to call atTerminate - need to be accessed with lock } runQueue := func(q Queue, lock *sync.Mutex) *channels { chans := &channels{ readyForShutdown: make(chan struct{}), readyForTerminate: make(chan struct{}), signalShutdown: make(chan struct{}), doneShutdown: make(chan struct{}), } go q.Run(func(atShutdown func()) { go func() { lock.Lock() select { case <-chans.readyForShutdown: default: close(chans.readyForShutdown) } lock.Unlock() <-chans.signalShutdown atShutdown() close(chans.doneShutdown) }() }, func(atTerminate func()) { lock.Lock() defer lock.Unlock() select { case <-chans.readyForTerminate: default: close(chans.readyForTerminate) } chans.queueTerminate = append(chans.queueTerminate, atTerminate) }) return chans } // call to shutdown and terminate the queue associated with the channels doTerminate := func(chans *channels, lock *sync.Mutex) { <-chans.readyForTerminate lock.Lock() callbacks := []func(){} callbacks = append(callbacks, chans.queueTerminate...) lock.Unlock() for _, callback := range callbacks { callback() } } mapLock := sync.Mutex{} executedInitial := map[string][]string{} hasInitial := map[string][]string{} fillQueue := func(name string, done chan struct{}) { t.Run("Initial Filling: "+name, func(t *testing.T) { lock := sync.Mutex{} startAt100Queued := make(chan struct{}) stopAt20Shutdown := make(chan struct{}) // stop and shutdown at the 20th item handle := func(data ...Data) []Data { <-startAt100Queued for _, datum := range data { s := datum.(string) mapLock.Lock() executedInitial[name] = append(executedInitial[name], s) mapLock.Unlock() if s == "task-20" { close(stopAt20Shutdown) } } return nil } q := newQueue(name, handle) // add 100 tasks to the queue for i := 0; i < 100; i++ { _ = q.Push("task-" + strconv.Itoa(i)) } close(startAt100Queued) chans := runQueue(q, &lock) <-chans.readyForShutdown <-stopAt20Shutdown close(chans.signalShutdown) <-chans.doneShutdown _ = q.Push("final") // check which tasks are still in the queue for i := 0; i < 100; i++ { if has, _ := q.(UniqueQueue).Has("task-" + strconv.Itoa(i)); has { mapLock.Lock() hasInitial[name] = append(hasInitial[name], "task-"+strconv.Itoa(i)) mapLock.Unlock() } } if has, _ := q.(UniqueQueue).Has("final"); has { mapLock.Lock() hasInitial[name] = append(hasInitial[name], "final") mapLock.Unlock() } else { assert.Fail(t, "UnqueQueue %s should have \"final\"", name) } doTerminate(chans, &lock) mapLock.Lock() assert.Equal(t, 101, len(executedInitial[name])+len(hasInitial[name])) mapLock.Unlock() }) close(done) } doneA := make(chan struct{}) doneB := make(chan struct{}) go fillQueue("QueueA", doneA) go fillQueue("QueueB", doneB) <-doneA <-doneB executedEmpty := map[string][]string{} hasEmpty := map[string][]string{} emptyQueue := func(name string, done chan struct{}) { t.Run("Empty Queue: "+name, func(t *testing.T) { lock := sync.Mutex{} stop := make(chan struct{}) // collect the tasks that have been executed handle := func(data ...Data) []Data { lock.Lock() for _, datum := range data { mapLock.Lock() executedEmpty[name] = append(executedEmpty[name], datum.(string)) mapLock.Unlock() if datum.(string) == "final" { close(stop) } } lock.Unlock() return nil } q := newQueue(name, handle) chans := runQueue(q, &lock) <-chans.readyForShutdown <-stop close(chans.signalShutdown) <-chans.doneShutdown // check which tasks are still in the queue for i := 0; i < 100; i++ { if has, _ := q.(UniqueQueue).Has("task-" + strconv.Itoa(i)); has { mapLock.Lock() hasEmpty[name] = append(hasEmpty[name], "task-"+strconv.Itoa(i)) mapLock.Unlock() } } doTerminate(chans, &lock) mapLock.Lock() assert.Equal(t, 101, len(executedInitial[name])+len(executedEmpty[name])) assert.Empty(t, hasEmpty[name]) mapLock.Unlock() }) close(done) } doneA = make(chan struct{}) doneB = make(chan struct{}) go emptyQueue("QueueA", doneA) go emptyQueue("QueueB", doneB) <-doneA <-doneB mapLock.Lock() t.Logf("TestPersistableChannelUniqueQueue executedInitiallyA=%v, executedInitiallyB=%v, executedToEmptyA=%v, executedToEmptyB=%v", len(executedInitial["QueueA"]), len(executedInitial["QueueB"]), len(executedEmpty["QueueA"]), len(executedEmpty["QueueB"])) // reset and rerun executedInitial = map[string][]string{} hasInitial = map[string][]string{} executedEmpty = map[string][]string{} hasEmpty = map[string][]string{} mapLock.Unlock() doneA = make(chan struct{}) doneB = make(chan struct{}) go fillQueue("QueueA", doneA) go fillQueue("QueueB", doneB) <-doneA <-doneB doneA = make(chan struct{}) doneB = make(chan struct{}) go emptyQueue("QueueA", doneA) go emptyQueue("QueueB", doneB) <-doneA <-doneB mapLock.Lock() t.Logf("TestPersistableChannelUniqueQueue executedInitiallyA=%v, executedInitiallyB=%v, executedToEmptyA=%v, executedToEmptyB=%v", len(executedInitial["QueueA"]), len(executedInitial["QueueB"]), len(executedEmpty["QueueA"]), len(executedEmpty["QueueB"])) mapLock.Unlock() }