Browse Source

Use global lock instead of status pool for cron lock (#35507)

pull/35544/head
Lunny Xiao 3 months ago committed by GitHub
parent
commit
8106d95577
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 57
      modules/sync/status_pool.go
  2. 31
      modules/sync/status_pool_test.go
  3. 2
      routers/init.go
  4. 8
      services/cron/cron.go
  5. 19
      services/cron/tasks.go

57
modules/sync/status_pool.go

@ -1,57 +0,0 @@ @@ -1,57 +0,0 @@
// Copyright 2016 The Gogs Authors. All rights reserved.
// SPDX-License-Identifier: MIT
package sync
import (
"sync"
"code.gitea.io/gitea/modules/container"
)
// StatusTable is a table maintains true/false values.
//
// This table is particularly useful for un/marking and checking values
// in different goroutines.
type StatusTable struct {
lock sync.RWMutex
pool container.Set[string]
}
// NewStatusTable initializes and returns a new StatusTable object.
func NewStatusTable() *StatusTable {
return &StatusTable{
pool: make(container.Set[string]),
}
}
// StartIfNotRunning sets value of given name to true if not already in pool.
// Returns whether set value was set to true
func (p *StatusTable) StartIfNotRunning(name string) bool {
p.lock.Lock()
added := p.pool.Add(name)
p.lock.Unlock()
return added
}
// Start sets value of given name to true in the pool.
func (p *StatusTable) Start(name string) {
p.lock.Lock()
p.pool.Add(name)
p.lock.Unlock()
}
// Stop sets value of given name to false in the pool.
func (p *StatusTable) Stop(name string) {
p.lock.Lock()
p.pool.Remove(name)
p.lock.Unlock()
}
// IsRunning checks if value of given name is set to true in the pool.
func (p *StatusTable) IsRunning(name string) bool {
p.lock.RLock()
exists := p.pool.Contains(name)
p.lock.RUnlock()
return exists
}

31
modules/sync/status_pool_test.go

@ -1,31 +0,0 @@ @@ -1,31 +0,0 @@
// Copyright 2017 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT
package sync
import (
"testing"
"github.com/stretchr/testify/assert"
)
func Test_StatusTable(t *testing.T) {
table := NewStatusTable()
assert.False(t, table.IsRunning("xyz"))
table.Start("xyz")
assert.True(t, table.IsRunning("xyz"))
assert.False(t, table.StartIfNotRunning("xyz"))
assert.True(t, table.IsRunning("xyz"))
table.Stop("xyz")
assert.False(t, table.IsRunning("xyz"))
assert.True(t, table.StartIfNotRunning("xyz"))
assert.True(t, table.IsRunning("xyz"))
table.Stop("xyz")
assert.False(t, table.IsRunning("xyz"))
}

2
routers/init.go

@ -177,7 +177,7 @@ func InitWebInstalled(ctx context.Context) { @@ -177,7 +177,7 @@ func InitWebInstalled(ctx context.Context) {
mustInit(repo_service.InitLicenseClassifier)
// Finally start up the cron
cron.NewContext(ctx)
cron.Init(ctx)
}
// NormalRoutes represents non install routes

8
services/cron/cron.go

@ -11,7 +11,6 @@ import ( @@ -11,7 +11,6 @@ import (
"code.gitea.io/gitea/modules/graceful"
"code.gitea.io/gitea/modules/process"
"code.gitea.io/gitea/modules/sync"
"code.gitea.io/gitea/modules/translation"
"github.com/go-co-op/gocron"
@ -19,13 +18,10 @@ import ( @@ -19,13 +18,10 @@ import (
var scheduler = gocron.NewScheduler(time.Local)
// Prevent duplicate running tasks.
var taskStatusTable = sync.NewStatusTable()
// NewContext begins cron tasks
// Init begins cron tasks
// Each cron task is run within the shutdown context as a running server
// AtShutdown the cron server is stopped
func NewContext(original context.Context) {
func Init(original context.Context) {
defer pprof.SetGoroutineLabels(original)
_, _, finished := process.GetManager().AddTypedContext(graceful.GetManager().ShutdownContext(), "Service: Cron", process.SystemProcessType, true)
initBasicTasks()

19
services/cron/tasks.go

@ -14,6 +14,7 @@ import ( @@ -14,6 +14,7 @@ import (
"code.gitea.io/gitea/models/db"
system_model "code.gitea.io/gitea/models/system"
user_model "code.gitea.io/gitea/models/user"
"code.gitea.io/gitea/modules/globallock"
"code.gitea.io/gitea/modules/graceful"
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/process"
@ -71,20 +72,30 @@ func (t *Task) Run() { @@ -71,20 +72,30 @@ func (t *Task) Run() {
}, t.config)
}
func getCronTaskLockKey(name string) string {
return "cron_task:" + name
}
// RunWithUser will run the task incrementing the cron counter at the time with User
func (t *Task) RunWithUser(doer *user_model.User, config Config) {
if !taskStatusTable.StartIfNotRunning(t.Name) {
locked, releaser, err := globallock.TryLock(graceful.GetManager().ShutdownContext(), getCronTaskLockKey(t.Name))
if err != nil {
log.Error("Failed to acquire lock for cron task %q: %v", t.Name, err)
return
}
if !locked {
log.Trace("a cron task %q is already running", t.Name)
return
}
defer releaser()
t.lock.Lock()
if config == nil {
config = t.config
}
t.ExecTimes++
t.lock.Unlock()
defer func() {
taskStatusTable.Stop(t.Name)
}()
graceful.GetManager().RunWithShutdownContext(func(baseCtx context.Context) {
defer func() {
if err := recover(); err != nil {

Loading…
Cancel
Save