mirror of https://github.com/go-gitea/gitea.git
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
230 lines
6.1 KiB
230 lines
6.1 KiB
// Copyright 2020 The Gitea Authors. All rights reserved. |
|
// SPDX-License-Identifier: MIT |
|
|
|
package cron |
|
|
|
import ( |
|
"context" |
|
"fmt" |
|
"reflect" |
|
"strings" |
|
"sync" |
|
"time" |
|
|
|
"code.gitea.io/gitea/models/db" |
|
system_model "code.gitea.io/gitea/models/system" |
|
user_model "code.gitea.io/gitea/models/user" |
|
"code.gitea.io/gitea/modules/graceful" |
|
"code.gitea.io/gitea/modules/log" |
|
"code.gitea.io/gitea/modules/process" |
|
"code.gitea.io/gitea/modules/setting" |
|
"code.gitea.io/gitea/modules/translation" |
|
) |
|
|
|
var ( |
|
lock = sync.Mutex{} |
|
started = false |
|
tasks = []*Task{} |
|
tasksMap = map[string]*Task{} |
|
) |
|
|
|
// Task represents a Cron task |
|
type Task struct { |
|
lock sync.Mutex |
|
Name string |
|
config Config |
|
fun func(context.Context, *user_model.User, Config) error |
|
Status string |
|
LastMessage string |
|
LastDoer string |
|
ExecTimes int64 |
|
// This stores the time of the last manual run of this task. |
|
LastRun time.Time |
|
} |
|
|
|
// DoRunAtStart returns if this task should run at the start |
|
func (t *Task) DoRunAtStart() bool { |
|
return t.config.DoRunAtStart() |
|
} |
|
|
|
// IsEnabled returns if this task is enabled as cron task |
|
func (t *Task) IsEnabled() bool { |
|
return t.config.IsEnabled() |
|
} |
|
|
|
// GetConfig will return a copy of the task's config |
|
func (t *Task) GetConfig() Config { |
|
if reflect.TypeOf(t.config).Kind() == reflect.Ptr { |
|
// Pointer: |
|
return reflect.New(reflect.ValueOf(t.config).Elem().Type()).Interface().(Config) |
|
} |
|
// Not pointer: |
|
return reflect.New(reflect.TypeOf(t.config)).Elem().Interface().(Config) |
|
} |
|
|
|
// Run will run the task incrementing the cron counter with no user defined |
|
func (t *Task) Run() { |
|
t.RunWithUser(&user_model.User{ |
|
ID: -1, |
|
Name: "(Cron)", |
|
LowerName: "(cron)", |
|
}, t.config) |
|
} |
|
|
|
// RunWithUser will run the task incrementing the cron counter at the time with User |
|
func (t *Task) RunWithUser(doer *user_model.User, config Config) { |
|
if !taskStatusTable.StartIfNotRunning(t.Name) { |
|
return |
|
} |
|
t.lock.Lock() |
|
if config == nil { |
|
config = t.config |
|
} |
|
t.ExecTimes++ |
|
t.lock.Unlock() |
|
defer func() { |
|
taskStatusTable.Stop(t.Name) |
|
}() |
|
graceful.GetManager().RunWithShutdownContext(func(baseCtx context.Context) { |
|
defer func() { |
|
if err := recover(); err != nil { |
|
// Recover a panic within the execution of the task. |
|
combinedErr := fmt.Errorf("%s\n%s", err, log.Stack(2)) |
|
log.Error("PANIC whilst running task: %s Value: %v", t.Name, combinedErr) |
|
} |
|
}() |
|
// Store the time of this run, before the function is executed, so it |
|
// matches the behavior of what the cron library does. |
|
t.lock.Lock() |
|
t.LastRun = time.Now() |
|
t.lock.Unlock() |
|
|
|
pm := process.GetManager() |
|
doerName := "" |
|
if doer != nil && doer.ID != -1 { |
|
doerName = doer.Name |
|
} |
|
|
|
ctx, _, finished := pm.AddContext(baseCtx, config.FormatMessage(translation.NewLocale("en-US"), t.Name, "process", doerName)) |
|
defer finished() |
|
|
|
if err := t.fun(ctx, doer, config); err != nil { |
|
var message string |
|
var status string |
|
if db.IsErrCancelled(err) { |
|
status = "cancelled" |
|
message = err.(db.ErrCancelled).Message |
|
} else { |
|
status = "error" |
|
message = err.Error() |
|
} |
|
|
|
t.lock.Lock() |
|
t.LastMessage = message |
|
t.Status = status |
|
t.LastDoer = doerName |
|
t.lock.Unlock() |
|
|
|
if err := system_model.CreateNotice(ctx, system_model.NoticeTask, config.FormatMessage(translation.NewLocale("en-US"), t.Name, "cancelled", doerName, message)); err != nil { |
|
log.Error("CreateNotice: %v", err) |
|
} |
|
return |
|
} |
|
|
|
t.lock.Lock() |
|
t.Status = "finished" |
|
t.LastMessage = "" |
|
t.LastDoer = doerName |
|
t.lock.Unlock() |
|
|
|
if config.DoNoticeOnSuccess() { |
|
if err := system_model.CreateNotice(ctx, system_model.NoticeTask, config.FormatMessage(translation.NewLocale("en-US"), t.Name, "finished", doerName)); err != nil { |
|
log.Error("CreateNotice: %v", err) |
|
} |
|
} |
|
}) |
|
} |
|
|
|
// GetTask gets the named task |
|
func GetTask(name string) *Task { |
|
lock.Lock() |
|
defer lock.Unlock() |
|
log.Info("Getting %s in %v", name, tasksMap[name]) |
|
|
|
return tasksMap[name] |
|
} |
|
|
|
// RegisterTask allows a task to be registered with the cron service |
|
func RegisterTask(name string, config Config, fun func(context.Context, *user_model.User, Config) error) error { |
|
log.Debug("Registering task: %s", name) |
|
|
|
i18nKey := "admin.dashboard." + name |
|
if value := translation.NewLocale("en-US").TrString(i18nKey); value == i18nKey { |
|
return fmt.Errorf("translation is missing for task %q, please add translation for %q", name, i18nKey) |
|
} |
|
|
|
_, err := setting.GetCronSettings(name, config) |
|
if err != nil { |
|
log.Error("Unable to register cron task with name: %s Error: %v", name, err) |
|
return err |
|
} |
|
|
|
task := &Task{ |
|
Name: name, |
|
config: config, |
|
fun: fun, |
|
} |
|
lock.Lock() |
|
locked := true |
|
defer func() { |
|
if locked { |
|
lock.Unlock() |
|
} |
|
}() |
|
if _, has := tasksMap[task.Name]; has { |
|
log.Error("A task with this name: %s has already been registered", name) |
|
return fmt.Errorf("duplicate task with name: %s", task.Name) |
|
} |
|
|
|
if config.IsEnabled() { |
|
// We cannot use the entry return as there is no way to lock it |
|
if err := addTaskToScheduler(task); err != nil { |
|
return err |
|
} |
|
} |
|
|
|
tasks = append(tasks, task) |
|
tasksMap[task.Name] = task |
|
if started && config.IsEnabled() && config.DoRunAtStart() { |
|
lock.Unlock() |
|
locked = false |
|
task.Run() |
|
} |
|
|
|
return nil |
|
} |
|
|
|
// RegisterTaskFatal will register a task but if there is an error log.Fatal |
|
func RegisterTaskFatal(name string, config Config, fun func(context.Context, *user_model.User, Config) error) { |
|
if err := RegisterTask(name, config, fun); err != nil { |
|
log.Fatal("Unable to register cron task %s Error: %v", name, err) |
|
} |
|
} |
|
|
|
func addTaskToScheduler(task *Task) error { |
|
tags := []string{task.Name, task.config.GetSchedule()} // name and schedule can't be get from job, so we add them as tag |
|
if scheduleHasSeconds(task.config.GetSchedule()) { |
|
scheduler = scheduler.CronWithSeconds(task.config.GetSchedule()) |
|
} else { |
|
scheduler = scheduler.Cron(task.config.GetSchedule()) |
|
} |
|
if _, err := scheduler.Tag(tags...).Do(task.Run); err != nil { |
|
log.Error("Unable to register cron task with name: %s Error: %v", task.Name, err) |
|
return err |
|
} |
|
return nil |
|
} |
|
|
|
func scheduleHasSeconds(schedule string) bool { |
|
return len(strings.Fields(schedule)) >= 6 |
|
}
|
|
|