hotgo/server/internal/crons/init.go

244 lines
4.9 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// Package crons
// @Link https://github.com/bufanyun/hotgo
// @Copyright Copyright (c) 2023 HotGo CLI
// @Author Ms <133814250@qq.com>
// @License https://github.com/bufanyun/hotgo/blob/master/LICENSE
//
package crons
import (
"context"
"github.com/gogf/gf/v2/errors/gerror"
"github.com/gogf/gf/v2/frame/g"
"github.com/gogf/gf/v2/os/gcron"
"github.com/gogf/gf/v2/os/gctx"
"github.com/gogf/gf/v2/os/gtime"
"hotgo/internal/consts"
"hotgo/internal/dao"
"hotgo/internal/model/entity"
"hotgo/utility/simple"
"strings"
"sync"
)
type cronStrategy interface {
GetName() string
Execute(ctx context.Context)
}
var (
// 添加新的任务时只需实现cronStrategy接口并加入到cronList即可
cronList []cronStrategy
inst = new(tasks)
)
type tasks struct {
list []*TaskItem
sync.RWMutex
}
type TaskItem struct {
Pattern string // 表达式参考https://goframe.org/pages/viewpage.action?pageId=30736411
Name string // 唯一的任务名称
Params string // 函数参数,多个用,隔开
Fun gcron.JobFunc // 执行的函数接口
Policy int64 // 策略 1并行 2单例 3单次 4多次
Count int // 执行次数仅Policy=4时有效
}
func LoadCronList() {
for _, cron := range cronList {
inst.Add(&TaskItem{
Name: cron.GetName(),
Fun: cron.Execute,
})
}
}
func StopALL() {
for _, v := range gcron.Entries() {
gcron.Remove(v.Name)
}
}
// StartALL 启动任务
func StartALL(sysCron []*entity.SysCron) error {
if len(inst.list) == 0 {
LoadCronList()
}
var (
err error
ct = gctx.New()
)
if len(sysCron) == 0 {
g.Log().Debug(ct, "no scheduled task is available.")
return nil
}
for _, cron := range sysCron {
f := inst.Get(cron.Name)
if f == nil {
return gerror.Newf("该任务没有加入任务列表:%v", cron.Name)
}
// 没有则添加
if gcron.Search(cron.Name) == nil {
var (
t *gcron.Entry
ctx = context.WithValue(gctx.New(), consts.ContextKeyCronArgs, strings.Split(cron.Params, consts.CronSplitStr))
)
switch cron.Policy {
case consts.CronPolicySame:
t, err = gcron.Add(ctx, cron.Pattern, f.Fun, cron.Name)
case consts.CronPolicySingle:
t, err = gcron.AddSingleton(ctx, cron.Pattern, f.Fun, cron.Name)
case consts.CronPolicyOnce:
t, err = gcron.AddOnce(ctx, cron.Pattern, f.Fun, cron.Name)
case consts.CronPolicyTimes:
if f.Count <= 0 {
f.Count = 1
}
t, err = gcron.AddTimes(ctx, cron.Pattern, int(cron.Count), f.Fun, cron.Name)
default:
return gerror.Newf("使用无效的策略, cron.Policy=%v", cron.Policy)
}
if err != nil {
return err
}
if t == nil {
return gerror.New("启动任务失败")
}
}
gcron.Start(cron.Name)
// 执行完毕,单次和多次执行的任务更新状态
if cron.Policy == consts.CronPolicyOnce || cron.Policy == consts.CronPolicyTimes {
_, err = dao.SysCron.Ctx(ct).Where("id", cron.Id).
Data(g.Map{"status": consts.StatusDisable, "updated_at": gtime.Now()}).
Update()
if err != nil {
err = gerror.Wrap(err, consts.ErrorORM)
return err
}
}
}
g.Log().Debug(ct, "load cron success..")
return nil
}
// RefreshStatus 刷新状态
func RefreshStatus(sysCron *entity.SysCron) (err error) {
if sysCron == nil {
return
}
if sysCron.Status == consts.StatusEnabled {
return Start(sysCron)
}
return Stop(sysCron)
}
// Stop 停止单个任务
func Stop(sysCron *entity.SysCron) (err error) {
cr := gcron.Search(sysCron.Name)
if cr == nil {
return
}
cr.Stop()
return
}
// Once 立即执行一次某个任务
func Once(ctx context.Context, sysCron *entity.SysCron) error {
for _, v := range cronList {
if v.GetName() == sysCron.Name {
simple.SafeGo(ctx, func(ctx context.Context) {
v.Execute(ctx)
})
return nil
}
}
return gerror.Newf("定时任务不存在:%+v", sysCron.Name)
}
// Delete 删除任务
func Delete(sysCron *entity.SysCron) (err error) {
if sysCron == nil {
return
}
for _, v := range gcron.Entries() {
if v.Name == sysCron.Name {
gcron.Remove(v.Name)
}
}
return
}
// Start 启动单个任务
func Start(sysCron *entity.SysCron) (err error) {
if sysCron == nil {
return
}
cr := gcron.Search(sysCron.Name)
if cr != nil {
cr.Start()
return
}
return StartALL([]*entity.SysCron{sysCron})
}
// Add 添加任务
func (t *tasks) Add(task *TaskItem) *tasks {
if task.Name == "" || task.Fun == nil {
return t
}
t.Lock()
defer t.Unlock()
t.list = append(t.list, task)
return t
}
// Get 找到任务
func (t *tasks) Get(name string) *TaskItem {
if len(t.list) == 0 {
return nil
}
for _, item := range t.list {
if item.Name == name {
return item
}
}
return nil
}
// Del 删除任务
func (t *tasks) Del(name string) (newList []*TaskItem) {
if len(t.list) == 0 {
return nil
}
t.Lock()
defer t.Unlock()
for _, item := range t.list {
if item.Name == name {
continue
}
newList = append(newList, item)
}
return newList
}