mirror of https://github.com/go-gitea/gitea.git
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
372 lines
11 KiB
372 lines
11 KiB
// Copyright 2023 The Gitea Authors. All rights reserved. |
|
// SPDX-License-Identifier: MIT |
|
|
|
package queue |
|
|
|
import ( |
|
"context" |
|
"runtime/pprof" |
|
"sync" |
|
"sync/atomic" |
|
"time" |
|
|
|
"code.gitea.io/gitea/modules/log" |
|
) |
|
|
|
var ( |
|
infiniteTimerC = make(chan time.Time) |
|
batchDebounceDuration = 100 * time.Millisecond |
|
workerIdleDuration = 1 * time.Second |
|
shutdownDefaultTimeout = 2 * time.Second |
|
|
|
unhandledItemRequeueDuration atomic.Int64 // to avoid data race during test |
|
) |
|
|
|
func init() { |
|
unhandledItemRequeueDuration.Store(int64(time.Second)) |
|
} |
|
|
|
// workerGroup is a group of workers to work with a WorkerPoolQueue |
|
type workerGroup[T any] struct { |
|
q *WorkerPoolQueue[T] |
|
wg sync.WaitGroup |
|
|
|
ctxWorker context.Context |
|
ctxWorkerCancel context.CancelFunc |
|
|
|
batchBuffer []T |
|
popItemChan chan []byte |
|
popItemErr chan error |
|
} |
|
|
|
func (wg *workerGroup[T]) doPrepareWorkerContext() { |
|
wg.ctxWorker, wg.ctxWorkerCancel = context.WithCancel(wg.q.ctxRun) |
|
} |
|
|
|
// doDispatchBatchToWorker dispatches a batch of items to worker's channel. |
|
// If the channel is full, it tries to start a new worker if possible. |
|
func (q *WorkerPoolQueue[T]) doDispatchBatchToWorker(wg *workerGroup[T], flushChan chan flushType) { |
|
batch := wg.batchBuffer |
|
wg.batchBuffer = nil |
|
|
|
if len(batch) == 0 { |
|
return |
|
} |
|
|
|
full := false |
|
select { |
|
case q.batchChan <- batch: |
|
default: |
|
full = true |
|
} |
|
|
|
// TODO: the logic could be improved in the future, to avoid a data-race between "doStartNewWorker" and "workerNum" |
|
// The root problem is that if we skip "doStartNewWorker" here, the "workerNum" might be decreased by other workers later |
|
// So ideally, it should check whether there are enough workers by some approaches, and start new workers if necessary. |
|
// This data-race is not serious, as long as a new worker will be started soon to make sure there are enough workers, |
|
// so no need to hugely refactor at the moment. |
|
q.workerNumMu.Lock() |
|
noWorker := q.workerNum == 0 |
|
if full || noWorker { |
|
if q.workerNum < q.workerMaxNum || noWorker && q.workerMaxNum <= 0 { |
|
q.workerNum++ |
|
q.doStartNewWorker(wg) |
|
} |
|
} |
|
q.workerNumMu.Unlock() |
|
|
|
if full { |
|
select { |
|
case q.batchChan <- batch: |
|
case flush := <-flushChan: |
|
q.doWorkerHandle(batch) |
|
q.doFlush(wg, flush) |
|
case <-q.ctxRun.Done(): |
|
wg.batchBuffer = batch // return the batch to buffer, the "doRun" function will handle it |
|
} |
|
} |
|
} |
|
|
|
// doWorkerHandle calls the safeHandler to handle a batch of items, and it increases/decreases the active worker number. |
|
// If the context has been canceled, it should not be caller because the "Push" still needs the context, in such case, call q.safeHandler directly |
|
func (q *WorkerPoolQueue[T]) doWorkerHandle(batch []T) { |
|
q.workerNumMu.Lock() |
|
q.workerActiveNum++ |
|
q.workerNumMu.Unlock() |
|
|
|
defer func() { |
|
q.workerNumMu.Lock() |
|
q.workerActiveNum-- |
|
q.workerNumMu.Unlock() |
|
}() |
|
|
|
unhandled := q.safeHandler(batch...) |
|
// if none of the items were handled, it should back-off for a few seconds |
|
// in this case the handler (eg: document indexer) may have encountered some errors/failures |
|
if len(unhandled) == len(batch) && unhandledItemRequeueDuration.Load() != 0 { |
|
if q.isFlushing.Load() { |
|
return // do not requeue items when flushing, since all items failed, requeue them will continue failing. |
|
} |
|
log.Error("Queue %q failed to handle batch of %d items, backoff for a few seconds", q.GetName(), len(batch)) |
|
// TODO: ideally it shouldn't "sleep" here (blocks the worker, then blocks flush). |
|
// It could debounce the requeue operation, and try to requeue the items in the future. |
|
select { |
|
case <-q.ctxRun.Done(): |
|
case <-time.After(time.Duration(unhandledItemRequeueDuration.Load())): |
|
} |
|
} |
|
for _, item := range unhandled { |
|
if err := q.Push(item); err != nil { |
|
if !q.basePushForShutdown(item) { |
|
log.Error("Failed to requeue item for queue %q when calling handler: %v", q.GetName(), err) |
|
} |
|
} |
|
} |
|
} |
|
|
|
// basePushForShutdown tries to requeue items into the base queue when the WorkerPoolQueue is shutting down. |
|
// If the queue is shutting down, it returns true and try to push the items |
|
// Otherwise it does nothing and returns false |
|
func (q *WorkerPoolQueue[T]) basePushForShutdown(items ...T) bool { |
|
shutdownTimeout := time.Duration(q.shutdownTimeout.Load()) |
|
if shutdownTimeout == 0 { |
|
return false |
|
} |
|
ctxShutdown, ctxShutdownCancel := context.WithTimeout(context.Background(), shutdownTimeout) |
|
defer ctxShutdownCancel() |
|
for _, item := range items { |
|
// if there is still any error, the queue can do nothing instead of losing the items |
|
if err := q.baseQueue.PushItem(ctxShutdown, q.marshal(item)); err != nil { |
|
log.Error("Failed to requeue item for queue %q when shutting down: %v", q.GetName(), err) |
|
} |
|
} |
|
return true |
|
} |
|
|
|
func resetIdleTicker(t *time.Ticker, dur time.Duration) { |
|
t.Reset(dur) |
|
select { |
|
case <-t.C: |
|
default: |
|
} |
|
} |
|
|
|
// doStartNewWorker starts a new worker for the queue, the worker reads from worker's channel and handles the items. |
|
func (q *WorkerPoolQueue[T]) doStartNewWorker(wp *workerGroup[T]) { |
|
wp.wg.Go(func() { |
|
log.Debug("Queue %q starts new worker", q.GetName()) |
|
defer log.Debug("Queue %q stops idle worker", q.GetName()) |
|
|
|
t := time.NewTicker(workerIdleDuration) |
|
defer t.Stop() |
|
|
|
keepWorking := true |
|
stopWorking := func() { |
|
q.workerNumMu.Lock() |
|
keepWorking = false |
|
q.workerNum-- |
|
q.workerNumMu.Unlock() |
|
} |
|
for keepWorking { |
|
select { |
|
case <-wp.ctxWorker.Done(): |
|
stopWorking() |
|
case batch, ok := <-q.batchChan: |
|
if !ok { |
|
stopWorking() |
|
continue |
|
} |
|
q.doWorkerHandle(batch) |
|
// reset the idle ticker, and drain the tick after reset in case a tick is already triggered |
|
resetIdleTicker(t, workerIdleDuration) // key code for TestWorkerPoolQueueWorkerIdleReset |
|
case <-t.C: |
|
q.workerNumMu.Lock() |
|
keepWorking = q.workerNum <= 1 // keep the last worker running |
|
if !keepWorking { |
|
q.workerNum-- |
|
} |
|
q.workerNumMu.Unlock() |
|
} |
|
} |
|
}) |
|
} |
|
|
|
// doFlush flushes the queue: it tries to read all items from the queue and handles them. |
|
// It is for testing purpose only. It's not designed to work for a cluster. |
|
func (q *WorkerPoolQueue[T]) doFlush(wg *workerGroup[T], flush flushType) { |
|
q.isFlushing.Store(true) |
|
defer q.isFlushing.Store(false) |
|
|
|
log.Debug("Queue %q starts flushing", q.GetName()) |
|
defer log.Debug("Queue %q finishes flushing", q.GetName()) |
|
|
|
// stop all workers, and prepare a new worker context to start new workers |
|
wg.ctxWorkerCancel() |
|
wg.wg.Wait() |
|
|
|
defer func() { |
|
close(flush.c) |
|
wg.doPrepareWorkerContext() |
|
}() |
|
|
|
if flush.timeout < 0 { |
|
// discard everything |
|
wg.batchBuffer = nil |
|
for { |
|
select { |
|
case <-wg.popItemChan: |
|
case <-wg.popItemErr: |
|
case <-q.batchChan: |
|
case <-q.ctxRun.Done(): |
|
return |
|
default: |
|
return |
|
} |
|
} |
|
} |
|
|
|
// drain the batch channel first |
|
loop: |
|
for { |
|
select { |
|
case batch := <-q.batchChan: |
|
q.doWorkerHandle(batch) |
|
default: |
|
break loop |
|
} |
|
} |
|
|
|
// drain the popItem channel |
|
emptyCounter := 0 |
|
for { |
|
select { |
|
case <-q.ctxRun.Done(): |
|
log.Debug("Queue %q is shutting down", q.GetName()) |
|
return |
|
case data, dataOk := <-wg.popItemChan: |
|
if !dataOk { |
|
return |
|
} |
|
emptyCounter = 0 |
|
if v, jsonOk := q.unmarshal(data); !jsonOk { |
|
continue |
|
} else { |
|
q.doWorkerHandle([]T{v}) |
|
} |
|
case err := <-wg.popItemErr: |
|
if !q.isCtxRunCanceled() { |
|
log.Error("Failed to pop item from queue %q (doFlush): %v", q.GetName(), err) |
|
} |
|
return |
|
case <-time.After(20 * time.Millisecond): |
|
// There is no reliable way to make sure all queue items are consumed by the Flush, there always might be some items stored in some buffers/temp variables. |
|
// If we run Gitea in a cluster, we can even not guarantee all items are consumed in a deterministic instance. |
|
// Luckily, the "Flush" trick is only used in tests, so far so good. |
|
if cnt, _ := q.baseQueue.Len(q.ctxRun); cnt == 0 && len(wg.popItemChan) == 0 { |
|
emptyCounter++ |
|
} |
|
if emptyCounter >= 2 { |
|
return |
|
} |
|
} |
|
} |
|
} |
|
|
|
func (q *WorkerPoolQueue[T]) isCtxRunCanceled() bool { |
|
select { |
|
case <-q.ctxRun.Done(): |
|
return true |
|
default: |
|
return false |
|
} |
|
} |
|
|
|
var skipFlushChan = make(chan flushType) // an empty flush chan, used to skip reading other flush requests |
|
|
|
// doRun is the main loop of the queue. All related "doXxx" functions are executed in its context. |
|
func (q *WorkerPoolQueue[T]) doRun() { |
|
pprof.SetGoroutineLabels(q.ctxRun) |
|
|
|
log.Debug("Queue %q starts running", q.GetName()) |
|
defer log.Debug("Queue %q stops running", q.GetName()) |
|
|
|
wg := &workerGroup[T]{q: q} |
|
wg.doPrepareWorkerContext() |
|
wg.popItemChan, wg.popItemErr = popItemByChan(q.ctxRun, q.baseQueue.PopItem) |
|
|
|
defer func() { |
|
q.ctxRunCancel() |
|
|
|
// drain all data on the fly |
|
// since the queue is shutting down, the items can't be dispatched to workers because the context is canceled |
|
// it can't call doWorkerHandle either, because there is no chance to push unhandled items back to the queue |
|
var unhandled []T |
|
close(q.batchChan) |
|
for batch := range q.batchChan { |
|
unhandled = append(unhandled, batch...) |
|
} |
|
unhandled = append(unhandled, wg.batchBuffer...) |
|
for data := range wg.popItemChan { |
|
if v, ok := q.unmarshal(data); ok { |
|
unhandled = append(unhandled, v) |
|
} |
|
} |
|
|
|
shutdownTimeout := time.Duration(q.shutdownTimeout.Load()) |
|
if shutdownTimeout != 0 { |
|
// if there is a shutdown context, try to push the items back to the base queue |
|
q.basePushForShutdown(unhandled...) |
|
workerDone := make(chan struct{}) |
|
// the only way to wait for the workers, because the handlers do not have context to wait for |
|
go func() { wg.wg.Wait(); close(workerDone) }() |
|
select { |
|
case <-workerDone: |
|
case <-time.After(shutdownTimeout): |
|
log.Error("Queue %q is shutting down, but workers are still running after timeout", q.GetName()) |
|
} |
|
} else { |
|
// if there is no shutdown context, just call the handler to try to handle the items. if the handler fails again, the items are lost |
|
q.safeHandler(unhandled...) |
|
} |
|
|
|
close(q.shutdownDone) |
|
}() |
|
|
|
var batchDispatchC <-chan time.Time = infiniteTimerC |
|
for { |
|
select { |
|
case flush := <-q.flushChan: |
|
// before flushing, it needs to try to dispatch the batch to worker first, in case there is no worker running |
|
// after the flushing, there is at least one worker running, so "doFlush" could wait for workers to finish |
|
// since we are already in a "flush" operation, so the dispatching function shouldn't read the flush chan. |
|
q.doDispatchBatchToWorker(wg, skipFlushChan) |
|
q.doFlush(wg, flush) |
|
case <-q.ctxRun.Done(): |
|
log.Debug("Queue %q is shutting down", q.GetName()) |
|
return |
|
case data, dataOk := <-wg.popItemChan: |
|
if !dataOk { |
|
return |
|
} |
|
if v, jsonOk := q.unmarshal(data); !jsonOk { |
|
testRecorder.Record("pop:corrupted:%s", data) // in rare cases the levelqueue(leveldb) might be corrupted |
|
continue |
|
} else { |
|
wg.batchBuffer = append(wg.batchBuffer, v) |
|
} |
|
if len(wg.batchBuffer) >= q.batchLength { |
|
q.doDispatchBatchToWorker(wg, q.flushChan) |
|
} else if batchDispatchC == infiniteTimerC { |
|
batchDispatchC = time.After(batchDebounceDuration) |
|
} // else: batchDispatchC is already a debounce timer, it will be triggered soon |
|
case <-batchDispatchC: |
|
batchDispatchC = infiniteTimerC |
|
q.doDispatchBatchToWorker(wg, q.flushChan) |
|
case err := <-wg.popItemErr: |
|
if !q.isCtxRunCanceled() { |
|
log.Error("Failed to pop item from queue %q (doRun): %v", q.GetName(), err) |
|
} |
|
return |
|
} |
|
} |
|
}
|
|
|