Files
PandaX/apps/job/jobs/jobbase.go
2023-09-19 10:18:50 +08:00

125 lines
2.7 KiB
Go

package jobs
import (
"fmt"
"pandax/apps/job/entity"
"pandax/apps/job/services"
"pandax/pkg/global"
"pandax/pkg/tool"
logEntity "pandax/apps/job/entity"
logServices "pandax/apps/job/services"
"sync"
"time"
"github.com/robfig/cron/v3"
)
var timeFormat = "2006-01-02 15:04:05"
var retryCount = 3
var jobList map[string]JobsExec
var lock sync.Mutex
var Crontab = new(cron.Cron)
func InitJob() {
jobList = map[string]JobsExec{
"cronDevice": CronDeviceHandle{},
"cronProduct": CronProductHandle{},
}
// 启动调度任务
Setup()
}
type JobCore struct {
InvokeTarget string
Name string
JobId string
OrgId int64
Owner string
EntryId int
CronExpression string // 任务表达式
MisfirePolicy string
Args string
Content string
}
type ExecJob struct {
cron *cron.Cron
JobCore
}
func (e *ExecJob) Run() {
startTime := time.Now()
jobLog := logEntity.JobLog{Name: e.Name, EntryId: e.EntryId, TargetInvoke: e.InvokeTarget, Status: "0"}
jobLog.Id = tool.GenerateID()
jobLog.OrgId = e.OrgId
jobLog.Owner = e.Owner
var obj = jobList[e.InvokeTarget]
err := CallExec(obj.(JobsExec), e.Args, e.Content)
if err != nil {
jobLog.LogInfo = fmt.Sprintf("任务运行错误: %s", err.Error())
Remove(e.cron, e.EntryId)
} else {
latencyTime := time.Now().Sub(startTime)
jobLog.LogInfo = fmt.Sprintf("任务运行成功,总耗时 %f", latencyTime.Seconds())
}
// 执行时间
logServices.JobLogModelDao.Insert(jobLog)
// 执行一次
if e.MisfirePolicy == "1" {
Remove(e.cron, e.EntryId)
}
return
}
func Setup() {
Crontab = NewWithSeconds()
err := services.JobModelDao.RemoveAllEntryID()
if err != nil {
global.Log.Info(time.Now().Format(timeFormat), " [ERROR] JobCore remove entry_id error", err)
}
// 其中任务
Crontab.Start()
global.Log.Info(time.Now().Format(timeFormat), " [INFO] JobCore start success.")
// 关闭任务
defer Crontab.Stop()
select {}
}
// AddJob 添加任务
func AddJob(c *cron.Cron, job Job) (int, error) {
if job == nil {
return 0, nil
}
return job.addJob(c)
}
func (h *ExecJob) addJob(c *cron.Cron) (int, error) {
id, err := c.AddJob(h.CronExpression, h)
if err != nil {
return 0, err
}
h.cron = c
EntryId := int(id)
h.EntryId = EntryId
return EntryId, nil
}
// 移除任务(停止任务)
func Remove(c *cron.Cron, entryID int) {
c.Remove(cron.EntryID(entryID))
var job entity.SysJob
job.EntryId = entryID
services.JobModelDao.RemoveEntryID(entryID)
}
func NewWithSeconds() *cron.Cron {
secondParser := cron.NewParser(cron.Second | cron.Minute |
cron.Hour | cron.Dom | cron.Month | cron.DowOptional | cron.Descriptor)
return cron.New(cron.WithParser(secondParser), cron.WithChain())
}