mirror of https://github.com/go-gitea/gitea.git
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
259 lines
6.7 KiB
259 lines
6.7 KiB
// Copyright 2023 The Gitea Authors. All rights reserved. |
|
// SPDX-License-Identifier: MIT |
|
|
|
package queue |
|
|
|
import ( |
|
"fmt" |
|
"strconv" |
|
"sync" |
|
"testing" |
|
"time" |
|
|
|
"code.gitea.io/gitea/modules/log" |
|
|
|
"github.com/stretchr/testify/assert" |
|
) |
|
|
|
func TestPersistableChannelUniqueQueue(t *testing.T) { |
|
tmpDir := t.TempDir() |
|
fmt.Printf("TempDir %s\n", tmpDir) |
|
_ = log.NewLogger(1000, "console", "console", `{"level":"warn","stacktracelevel":"NONE","stderr":true}`) |
|
|
|
// Common function to create the Queue |
|
newQueue := func(name string, handle func(data ...Data) []Data) Queue { |
|
q, err := NewPersistableChannelUniqueQueue(handle, |
|
PersistableChannelUniqueQueueConfiguration{ |
|
Name: name, |
|
DataDir: tmpDir, |
|
QueueLength: 200, |
|
MaxWorkers: 1, |
|
BlockTimeout: 1 * time.Second, |
|
BoostTimeout: 5 * time.Minute, |
|
BoostWorkers: 1, |
|
Workers: 0, |
|
}, "task-0") |
|
assert.NoError(t, err) |
|
return q |
|
} |
|
|
|
// runs the provided queue and provides some timer function |
|
type channels struct { |
|
readyForShutdown chan struct{} // closed when shutdown functions have been assigned |
|
readyForTerminate chan struct{} // closed when terminate functions have been assigned |
|
signalShutdown chan struct{} // Should close to signal shutdown |
|
doneShutdown chan struct{} // closed when shutdown function is done |
|
queueTerminate []func() // list of atTerminate functions to call atTerminate - need to be accessed with lock |
|
} |
|
runQueue := func(q Queue, lock *sync.Mutex) *channels { |
|
chans := &channels{ |
|
readyForShutdown: make(chan struct{}), |
|
readyForTerminate: make(chan struct{}), |
|
signalShutdown: make(chan struct{}), |
|
doneShutdown: make(chan struct{}), |
|
} |
|
go q.Run(func(atShutdown func()) { |
|
go func() { |
|
lock.Lock() |
|
select { |
|
case <-chans.readyForShutdown: |
|
default: |
|
close(chans.readyForShutdown) |
|
} |
|
lock.Unlock() |
|
<-chans.signalShutdown |
|
atShutdown() |
|
close(chans.doneShutdown) |
|
}() |
|
}, func(atTerminate func()) { |
|
lock.Lock() |
|
defer lock.Unlock() |
|
select { |
|
case <-chans.readyForTerminate: |
|
default: |
|
close(chans.readyForTerminate) |
|
} |
|
chans.queueTerminate = append(chans.queueTerminate, atTerminate) |
|
}) |
|
|
|
return chans |
|
} |
|
|
|
// call to shutdown and terminate the queue associated with the channels |
|
doTerminate := func(chans *channels, lock *sync.Mutex) { |
|
<-chans.readyForTerminate |
|
|
|
lock.Lock() |
|
callbacks := []func(){} |
|
callbacks = append(callbacks, chans.queueTerminate...) |
|
lock.Unlock() |
|
|
|
for _, callback := range callbacks { |
|
callback() |
|
} |
|
} |
|
|
|
mapLock := sync.Mutex{} |
|
executedInitial := map[string][]string{} |
|
hasInitial := map[string][]string{} |
|
|
|
fillQueue := func(name string, done chan struct{}) { |
|
t.Run("Initial Filling: "+name, func(t *testing.T) { |
|
lock := sync.Mutex{} |
|
|
|
startAt100Queued := make(chan struct{}) |
|
stopAt20Shutdown := make(chan struct{}) // stop and shutdown at the 20th item |
|
|
|
handle := func(data ...Data) []Data { |
|
<-startAt100Queued |
|
for _, datum := range data { |
|
s := datum.(string) |
|
mapLock.Lock() |
|
executedInitial[name] = append(executedInitial[name], s) |
|
mapLock.Unlock() |
|
if s == "task-20" { |
|
close(stopAt20Shutdown) |
|
} |
|
} |
|
return nil |
|
} |
|
|
|
q := newQueue(name, handle) |
|
|
|
// add 100 tasks to the queue |
|
for i := 0; i < 100; i++ { |
|
_ = q.Push("task-" + strconv.Itoa(i)) |
|
} |
|
close(startAt100Queued) |
|
|
|
chans := runQueue(q, &lock) |
|
|
|
<-chans.readyForShutdown |
|
<-stopAt20Shutdown |
|
close(chans.signalShutdown) |
|
<-chans.doneShutdown |
|
_ = q.Push("final") |
|
|
|
// check which tasks are still in the queue |
|
for i := 0; i < 100; i++ { |
|
if has, _ := q.(UniqueQueue).Has("task-" + strconv.Itoa(i)); has { |
|
mapLock.Lock() |
|
hasInitial[name] = append(hasInitial[name], "task-"+strconv.Itoa(i)) |
|
mapLock.Unlock() |
|
} |
|
} |
|
if has, _ := q.(UniqueQueue).Has("final"); has { |
|
mapLock.Lock() |
|
hasInitial[name] = append(hasInitial[name], "final") |
|
mapLock.Unlock() |
|
} else { |
|
assert.Fail(t, "UnqueQueue %s should have \"final\"", name) |
|
} |
|
doTerminate(chans, &lock) |
|
mapLock.Lock() |
|
assert.Equal(t, 101, len(executedInitial[name])+len(hasInitial[name])) |
|
mapLock.Unlock() |
|
}) |
|
close(done) |
|
} |
|
|
|
doneA := make(chan struct{}) |
|
doneB := make(chan struct{}) |
|
|
|
go fillQueue("QueueA", doneA) |
|
go fillQueue("QueueB", doneB) |
|
|
|
<-doneA |
|
<-doneB |
|
|
|
executedEmpty := map[string][]string{} |
|
hasEmpty := map[string][]string{} |
|
emptyQueue := func(name string, done chan struct{}) { |
|
t.Run("Empty Queue: "+name, func(t *testing.T) { |
|
lock := sync.Mutex{} |
|
stop := make(chan struct{}) |
|
|
|
// collect the tasks that have been executed |
|
handle := func(data ...Data) []Data { |
|
lock.Lock() |
|
for _, datum := range data { |
|
mapLock.Lock() |
|
executedEmpty[name] = append(executedEmpty[name], datum.(string)) |
|
mapLock.Unlock() |
|
if datum.(string) == "final" { |
|
close(stop) |
|
} |
|
} |
|
lock.Unlock() |
|
return nil |
|
} |
|
|
|
q := newQueue(name, handle) |
|
chans := runQueue(q, &lock) |
|
|
|
<-chans.readyForShutdown |
|
<-stop |
|
close(chans.signalShutdown) |
|
<-chans.doneShutdown |
|
|
|
// check which tasks are still in the queue |
|
for i := 0; i < 100; i++ { |
|
if has, _ := q.(UniqueQueue).Has("task-" + strconv.Itoa(i)); has { |
|
mapLock.Lock() |
|
hasEmpty[name] = append(hasEmpty[name], "task-"+strconv.Itoa(i)) |
|
mapLock.Unlock() |
|
} |
|
} |
|
doTerminate(chans, &lock) |
|
|
|
mapLock.Lock() |
|
assert.Equal(t, 101, len(executedInitial[name])+len(executedEmpty[name])) |
|
assert.Equal(t, 0, len(hasEmpty[name])) |
|
mapLock.Unlock() |
|
}) |
|
close(done) |
|
} |
|
|
|
doneA = make(chan struct{}) |
|
doneB = make(chan struct{}) |
|
|
|
go emptyQueue("QueueA", doneA) |
|
go emptyQueue("QueueB", doneB) |
|
|
|
<-doneA |
|
<-doneB |
|
|
|
mapLock.Lock() |
|
t.Logf("TestPersistableChannelUniqueQueue executedInitiallyA=%v, executedInitiallyB=%v, executedToEmptyA=%v, executedToEmptyB=%v", |
|
len(executedInitial["QueueA"]), len(executedInitial["QueueB"]), len(executedEmpty["QueueA"]), len(executedEmpty["QueueB"])) |
|
|
|
// reset and rerun |
|
executedInitial = map[string][]string{} |
|
hasInitial = map[string][]string{} |
|
executedEmpty = map[string][]string{} |
|
hasEmpty = map[string][]string{} |
|
mapLock.Unlock() |
|
|
|
doneA = make(chan struct{}) |
|
doneB = make(chan struct{}) |
|
|
|
go fillQueue("QueueA", doneA) |
|
go fillQueue("QueueB", doneB) |
|
|
|
<-doneA |
|
<-doneB |
|
|
|
doneA = make(chan struct{}) |
|
doneB = make(chan struct{}) |
|
|
|
go emptyQueue("QueueA", doneA) |
|
go emptyQueue("QueueB", doneB) |
|
|
|
<-doneA |
|
<-doneB |
|
|
|
mapLock.Lock() |
|
t.Logf("TestPersistableChannelUniqueQueue executedInitiallyA=%v, executedInitiallyB=%v, executedToEmptyA=%v, executedToEmptyB=%v", |
|
len(executedInitial["QueueA"]), len(executedInitial["QueueB"]), len(executedEmpty["QueueA"]), len(executedEmpty["QueueB"])) |
|
mapLock.Unlock() |
|
}
|
|
|