项目目录优化,任务模块后端代码

This commit is contained in:
PandaGoAdmin
2021-12-23 17:23:27 +08:00
parent 0caf81660c
commit 21ff92a93c
67 changed files with 802 additions and 206 deletions

151
apps/job/api/job.go Normal file
View File

@@ -0,0 +1,151 @@
package api
import (
"pandax/apps/job/entity"
"pandax/apps/job/jobs"
"pandax/apps/job/services"
"pandax/base/biz"
"pandax/base/ctx"
"pandax/base/ginx"
"pandax/base/utils"
)
type JobApi struct {
JobApp services.JobModel
}
// @Summary 添加任务
// @Description 获取JSON
// @Tags 任务
// @Accept application/json
// @Product application/json
// @Param data body entity.SysJob true "data"
// @Success 200 {string} string "{"code": 200, "message": "添加成功"}"
// @Success 200 {string} string "{"code": 400, "message": "添加失败"}"
// @Router /job/job [post]
// @Security X-TOKEN
func (j *JobApi) CreateJob(rc *ctx.ReqCtx) {
var job entity.SysJob
ginx.BindJsonAndValid(rc.GinCtx, &job)
job.CreateBy = rc.LoginAccount.UserName
j.JobApp.Insert(job)
}
// @Summary job列表
// @Description 获取JSON
// @Tags 任务
// @Param status query string false "status"
// @Param jobName query string false "jobName"
// @Param jobGroup query string false "jobGroup"
// @Param pageSize query int false "页条数"
// @Param pageNum query int false "页码"
// @Success 200 {string} string "{"code": 200, "data": [...]}"
// @Router /job/list [get]
// @Security
func (j *JobApi) GetJobList(rc *ctx.ReqCtx) {
pageNum := ginx.QueryInt(rc.GinCtx, "pageNum", 1)
pageSize := ginx.QueryInt(rc.GinCtx, "pageSize", 10)
jobName := rc.GinCtx.Query("jobName")
jobGroup := rc.GinCtx.Query("jobGroup")
status := rc.GinCtx.Query("status")
list, total := j.JobApp.FindListPage(pageNum, pageSize, entity.SysJob{JobName: jobName, JobGroup: jobGroup, Status: status})
rc.ResData = map[string]interface{}{
"data": list,
"total": total,
"pageNum": pageNum,
"pageSize": pageSize,
}
}
// @Summary 获取一个job
// @Description 获取JSON
// @Tags 任务
// @Param jobId path int true "jobId"
// @Success 200 {string} string "{"code": 200, "data": [...]}"
// @Router /job/{jobId} [get]
// @Security
func (j *JobApi) GetJob(rc *ctx.ReqCtx) {
jobId := ginx.PathParamInt(rc.GinCtx, rc.GinCtx.Param("jobId"))
rc.ResData = j.JobApp.FindOne(int64(jobId))
}
// @Summary 修改JOB
// @Description 获取JSON
// @Tags 任务
// @Accept application/json
// @Product application/json
// @Param data body entity.SysJob true "body"
// @Success 200 {string} string "{"code": 200, "message": "添加成功"}"
// @Success 200 {string} string "{"code": 400, "message": "添加失败"}"
// @Router /job [put]
// @Security X-TOKEN
func (l *JobApi) UpdateJob(rc *ctx.ReqCtx) {
var job entity.SysJob
ginx.BindJsonAndValid(rc.GinCtx, &job)
l.JobApp.Update(job)
}
// @Summary 批量删除JOB
// @Description 删除数据
// @Tags 任务
// @Param infoId path string true "以逗号(,分割的infoId"
// @Success 200 {string} string "{"code": 200, "message": "删除成功"}"
// @Success 200 {string} string "{"code": 400, "message": "删除失败"}"
// @Router /job/{jobId} [delete]
func (l *JobApi) DeleteJob(rc *ctx.ReqCtx) {
jobIds := rc.GinCtx.Param("jobId")
group := utils.IdsStrToIdsIntGroup(jobIds)
l.JobApp.Delete(group)
}
// @Summary 停止JOB
// @Description 停止Job
// @Tags 任务
// @Param jobId path int true "jobId"
// @Success 200 {string} string "{"code": 200, "message": "删除成功"}"
// @Success 200 {string} string "{"code": 400, "message": "删除失败"}"
// @Router /job/stop/{jobId} [get]
func (l *JobApi) StopJobForService(rc *ctx.ReqCtx) {
jobId := ginx.PathParamInt(rc.GinCtx, "jobId")
job := l.JobApp.FindOne(int64(jobId))
jobs.Remove(jobs.Crontab, job.EntryId)
}
// @Summary 开始JOB
// @Description 开始Job
// @Tags 任务
// @Success 200 {string} string "{"code": 200, "message": "删除成功"}"
// @Success 200 {string} string "{"code": 400, "message": "删除失败"}"
// @Router /job/stop/{jobId} [get]
func (l *JobApi) StartJobForService(rc *ctx.ReqCtx) {
jobId := ginx.PathParamInt(rc.GinCtx, "jobId")
job := l.JobApp.FindOne(int64(jobId))
biz.IsTrue(job.Status == "0", "以关闭的任务不能开启")
biz.IsTrue(job.EntryId == 0, "任务不能重复启动")
var err error
if job.JobType == "1" {
var j = &jobs.HttpJob{}
j.InvokeTarget = job.InvokeTarget
j.CronExpression = job.CronExpression
j.JobId = job.JobId
j.Name = job.JobName
j.MisfirePolicy = job.MisfirePolicy
job.EntryId, err = jobs.AddJob(jobs.Crontab, j)
biz.ErrIsNil(err, "添加JOB失败")
} else {
var j = &jobs.ExecJob{}
j.InvokeTarget = job.InvokeTarget
j.CronExpression = job.CronExpression
j.JobId = job.JobId
j.Name = job.JobName
j.Args = job.Args
j.MisfirePolicy = job.MisfirePolicy
job.EntryId, err = jobs.AddJob(jobs.Crontab, j)
biz.ErrIsNil(err, "添加JOB失败")
}
l.JobApp.Update(*job)
}

20
apps/job/entity/job.go Normal file
View File

@@ -0,0 +1,20 @@
package entity
import "pandax/base/model"
type SysJob struct {
JobId int64 `json:"jobId" gorm:"primaryKey;autoIncrement"` // 编码
JobName string `json:"jobName" gorm:"type:varchar(255);"` // 名称
JobGroup string `json:"jobGroup" gorm:"type:varchar(255);"` // 任务分组
JobType string `json:"jobType" gorm:"type:varchar(1);"` // 任务类型
CronExpression string `json:"cronExpression" gorm:"type:varchar(255);"` // cron表达式
InvokeTarget string `json:"invokeTarget" gorm:"type:varchar(255);"` // 调用目标
Args string `json:"args" gorm:"type:varchar(255);"` // 目标参数
MisfirePolicy string `json:"misfirePolicy" gorm:"type:varchar(1);"` // 执行策略
Concurrent string `json:"concurrent" gorm:"type:varchar(1);"` // 是否并发
Status string `json:"status" gorm:"type:varchar(1);"` // 状态
EntryId int `json:"entry_id" gorm:"type:int(11);"` // job启动时返回的id
CreateBy string `json:"createBy" gorm:"type:varchar(128);comment:创建人"`
UpdateBy string `json:"updateBy" gorm:"type:varchar(128);comment:更新者"`
model.BaseModel
}

38
apps/job/jobs/examples.go Normal file
View File

@@ -0,0 +1,38 @@
package jobs
import (
"fmt"
"time"
)
// 需要将定义的struct 添加到字典中;
// 字典 key 可以配置到 自动任务 调用目标 中;
func InitJob() {
jobList = map[string]JobsExec{
"ExamplesOne": ExamplesOne{},
// ...
}
}
// 新添加的job 必须按照以下格式定义并实现Exec函数
type ExamplesOne struct {
}
func (t ExamplesOne) Exec(arg interface{}) error {
str := time.Now().Format(timeFormat) + " [INFO] JobCore ExamplesOne exec success"
// TODO: 这里需要注意 Examples 传入参数是 string 所以 arg.(string);请根据对应的类型进行转化;
switch arg.(type) {
case string:
if arg.(string) != "" {
fmt.Println("string", arg.(string))
fmt.Println(str, arg.(string))
} else {
fmt.Println("arg is nil")
fmt.Println(str, "arg is nil")
}
break
}
return nil
}

195
apps/job/jobs/jobbase.go Normal file
View File

@@ -0,0 +1,195 @@
package jobs
import (
"fmt"
"pandax/apps/job/entity"
"pandax/apps/job/services"
"pandax/base/global"
"pandax/base/httpclient"
"sync"
"time"
"github.com/robfig/cron/v3"
)
var timeFormat = "2006-01-02 15:04:05"
var retryCount = 3
var jobList map[string]JobsExec
var lock sync.Mutex
var Crontab = new(cron.Cron)
type JobCore struct {
InvokeTarget string
Name string
JobId int64
EntryId int
CronExpression string // 任务表达式
MisfirePolicy string // 错误执行策略
Args string
}
// 任务类型 http
type HttpJob struct {
cron *cron.Cron
JobCore
}
type ExecJob struct {
cron *cron.Cron
JobCore
}
func (e *ExecJob) Run() {
startTime := time.Now()
var obj = jobList[e.InvokeTarget]
if obj == nil {
global.Log.Warn("[Job] ExecJob Run job nil")
if e.MisfirePolicy == "2" {
Remove(e.cron, e.EntryId)
}
return
}
err := CallExec(obj.(JobsExec), e.Args)
if err != nil {
// 如果失败暂停一段时间重试
fmt.Println(time.Now().Format(timeFormat), " [ERROR] mission failed! ", err)
if e.MisfirePolicy == "2" {
Remove(e.cron, e.EntryId)
return
}
}
// 结束时间
endTime := time.Now()
// 执行时间
latencyTime := endTime.Sub(startTime)
//TODO: 待完善部分
//str := time.Now().Format(timeFormat) + " [INFO] JobCore " + string(e.EntryId) + "exec success , spend :" + latencyTime.String()
//ws.SendAll(str)
global.Log.Info("[Job] JobCore %s exec success , spend :%v", e.Name, latencyTime)
// 执行一次
if e.MisfirePolicy == "1" {
Remove(e.cron, e.EntryId)
}
return
}
//http 任务接口
func (h *HttpJob) Run() {
startTime := time.Now()
var count = 0
LOOP:
if count < retryCount {
_, err := httpclient.NewRequest(h.InvokeTarget).Get().BodyToString()
if err != nil {
time.Sleep(time.Duration(count+1) * 5 * time.Second)
count = count + 1
goto LOOP
}
}
// 结束时间
endTime := time.Now()
// 执行时间
latencyTime := endTime.Sub(startTime)
global.Log.Infof("[Job] JobCore %s exec success , spend :%v", h.Name, latencyTime)
if h.MisfirePolicy == "1" {
Remove(h.cron, h.EntryId)
}
return
}
func Setup() {
Crontab = NewWithSeconds()
// 获取系统job 0是默认1是系统
jl := services.JobModelDao.FindList(entity.SysJob{JobGroup: "1"})
jobList := *jl
if len(jobList) == 0 {
global.Log.Info(time.Now().Format(timeFormat), " [INFO] JobCore total:0")
return
}
err := services.JobModelDao.RemoveAllEntryID()
if err != nil {
global.Log.Info(time.Now().Format(timeFormat), " [ERROR] JobCore remove entry_id error", err)
}
sysJob := entity.SysJob{}
for i := 0; i < len(jobList); i++ {
if jobList[i].Status != "0" && jobList[i].EntryId > 0 {
continue
}
if jobList[i].JobType == "1" {
j := &HttpJob{}
j.InvokeTarget = jobList[i].InvokeTarget
j.CronExpression = jobList[i].CronExpression
j.JobId = jobList[i].JobId
j.Name = jobList[i].JobName
j.MisfirePolicy = jobList[i].MisfirePolicy
sysJob.EntryId, err = AddJob(Crontab, j)
} else if jobList[i].JobType == "2" {
j := &ExecJob{}
j.InvokeTarget = jobList[i].InvokeTarget
j.CronExpression = jobList[i].CronExpression
j.JobId = jobList[i].JobId
j.Name = jobList[i].JobName
j.Args = jobList[i].Args
j.MisfirePolicy = jobList[i].MisfirePolicy
sysJob.EntryId, err = AddJob(Crontab, j)
}
sysJob.JobId = jobList[i].JobId
services.JobModelDao.Update(sysJob)
}
// 其中任务
Crontab.Start()
global.Log.Info(time.Now().Format(timeFormat), " [INFO] JobCore start success.")
// 关闭任务
defer Crontab.Stop()
select {}
}
// 添加任务 AddJob(invokeTarget string, jobId int, jobName string, cronExpression string)
func AddJob(c *cron.Cron, job Job) (int, error) {
if job == nil {
return 0, nil
}
return job.addJob(c)
}
func (h *HttpJob) addJob(c *cron.Cron) (int, error) {
id, err := c.AddJob(h.CronExpression, h)
if err != nil {
return 0, err
}
h.cron = c
EntryId := int(id)
h.EntryId = EntryId
return EntryId, nil
}
func (h *ExecJob) addJob(c *cron.Cron) (int, error) {
id, err := c.AddJob(h.CronExpression, h)
if err != nil {
return 0, err
}
h.cron = c
EntryId := int(id)
h.EntryId = EntryId
return EntryId, nil
}
// 移除任务(停止任务)
func Remove(c *cron.Cron, entryID int) {
c.Remove(cron.EntryID(entryID))
var job entity.SysJob
job.EntryId = entryID
services.JobModelDao.RemoveEntryID(entryID)
}
func NewWithSeconds() *cron.Cron {
secondParser := cron.NewParser(cron.Second | cron.Minute |
cron.Hour | cron.Dom | cron.Month | cron.DowOptional | cron.Descriptor)
return cron.New(cron.WithParser(secondParser), cron.WithChain())
}

16
apps/job/jobs/type.go Normal file
View File

@@ -0,0 +1,16 @@
package jobs
import "github.com/robfig/cron/v3"
type Job interface {
Run()
addJob(*cron.Cron) (int, error)
}
type JobsExec interface {
Exec(arg interface{}) error
}
func CallExec(e JobsExec, arg interface{}) error {
return e.Exec(arg)
}

51
apps/job/router/job.go Normal file
View File

@@ -0,0 +1,51 @@
package router
import (
"github.com/gin-gonic/gin"
"pandax/apps/job/api"
"pandax/apps/job/services"
"pandax/base/ctx"
)
func InitJobRouter(router *gin.RouterGroup) {
// 登录日志
jobApi := &api.JobApi{
JobApp: services.JobModelDao,
}
job := router.Group("")
jobListLog := ctx.NewLogInfo("获取Job列表")
job.GET("list", func(c *gin.Context) {
ctx.NewReqCtxWithGin(c).WithLog(jobListLog).Handle(jobApi.GetJobList)
})
getJobLog := ctx.NewLogInfo("获取Job信息")
job.GET(":jobId", func(c *gin.Context) {
ctx.NewReqCtxWithGin(c).WithLog(getJobLog).Handle(jobApi.GetJob)
})
insertJobLog := ctx.NewLogInfo("添加Job信息")
job.POST("", func(c *gin.Context) {
ctx.NewReqCtxWithGin(c).WithLog(insertJobLog).Handle(jobApi.CreateJob)
})
updateJobLog := ctx.NewLogInfo("修改Job信息")
job.PUT("", func(c *gin.Context) {
ctx.NewReqCtxWithGin(c).WithLog(updateJobLog).Handle(jobApi.UpdateJob)
})
deleteJobLog := ctx.NewLogInfo("删除Job信息")
job.DELETE(":jobId", func(c *gin.Context) {
ctx.NewReqCtxWithGin(c).WithLog(deleteJobLog).Handle(jobApi.DeleteJob)
})
stopJobLog := ctx.NewLogInfo("停止一个job")
job.DELETE(":jobId", func(c *gin.Context) {
ctx.NewReqCtxWithGin(c).WithLog(stopJobLog).Handle(jobApi.StopJobForService)
})
starteJobLog := ctx.NewLogInfo("开启一个job")
job.DELETE(":jobId", func(c *gin.Context) {
ctx.NewReqCtxWithGin(c).WithLog(starteJobLog).Handle(jobApi.StartJobForService)
})
}

115
apps/job/services/job.go Normal file
View File

@@ -0,0 +1,115 @@
package services
import (
"pandax/apps/job/entity"
"pandax/base/biz"
"pandax/base/global"
)
type (
JobModel interface {
Insert(data entity.SysJob) *entity.SysJob
FindOne(jobId int64) *entity.SysJob
FindListPage(page, pageSize int, data entity.SysJob) (*[]entity.SysJob, int64)
FindList(data entity.SysJob) *[]entity.SysJob
Update(data entity.SysJob) *entity.SysJob
Delete(jobId []int64)
FindByEntryId(entryId int64) *entity.SysJob
RemoveAllEntryID() error
RemoveEntryID(EntryID int) error
}
jobModelImpl struct {
table string
}
)
var JobModelDao JobModel = &jobModelImpl{
table: `sys_jobs`,
}
func (m *jobModelImpl) Insert(data entity.SysJob) *entity.SysJob {
global.Db.Table(m.table).Create(&data)
return &data
}
func (m *jobModelImpl) FindOne(jobId int64) *entity.SysJob {
resData := new(entity.SysJob)
err := global.Db.Table(m.table).Where("`job_id` = ?", jobId).First(resData).Error
biz.ErrIsNil(err, "查询任务信息失败")
return resData
}
func (m *jobModelImpl) FindListPage(page, pageSize int, data entity.SysJob) (*[]entity.SysJob, int64) {
list := make([]entity.SysJob, 0)
var total int64 = 0
offset := pageSize * (page - 1)
db := global.Db.Table(m.table)
// 此处填写 where参数判断
if data.JobName != "" {
db = db.Where("job_name = ?", data.JobName)
}
if data.Status != "" {
db = db.Where("status = ?", data.Status)
}
if data.JobGroup != "" {
db = db.Where("job_group = ?", data.JobGroup)
}
err := db.Where("`delete_time` IS NULL").Count(&total).Error
err = db.Order("create_time desc").Limit(pageSize).Offset(offset).Find(&list).Error
biz.ErrIsNil(err, "查询任务分页信息失败")
return &list, total
}
func (m *jobModelImpl) FindList(data entity.SysJob) *[]entity.SysJob {
list := make([]entity.SysJob, 0)
db := global.Db.Table(m.table)
// 此处填写 where参数判断
if data.JobName != "" {
db = db.Where("job_name = ?", data.JobName)
}
if data.Status != "" {
db = db.Where("status = ?", data.Status)
}
if data.JobGroup != "" {
db = db.Where("job_group = ?", data.JobGroup)
}
err := db.Order("create_time desc").Find(&list).Error
if err != nil {
global.Log.Error("查询任务分页信息失败:" + err.Error())
}
return &list
}
func (m *jobModelImpl) Update(data entity.SysJob) *entity.SysJob {
biz.ErrIsNil(global.Db.Table(m.table).Updates(&data).Error, "修改任务失败")
return &data
}
func (m *jobModelImpl) Delete(jobIds []int64) {
err := global.Db.Table(m.table).Delete(&entity.SysJob{}, "`job_id` in (?)", jobIds).Error
biz.ErrIsNil(err, "删除操作日志信息失败")
return
}
func (m *jobModelImpl) FindByEntryId(entryId int64) *entity.SysJob {
resData := new(entity.SysJob)
err := global.Db.Table(m.table).Where("`entry_id` = ?", entryId).First(resData).Error
biz.ErrIsNil(err, "查询失败")
return resData
}
func (m *jobModelImpl) RemoveAllEntryID() error {
if err := global.Db.Table(m.table).Where("entry_id > ?", 0).Update("entry_id", 0).Error; err != nil {
return err
}
return nil
}
func (m *jobModelImpl) RemoveEntryID(EntryID int) error {
if err := global.Db.Table(m.table).Where("entry_id = ?", EntryID).Update("entry_id", 0).Error; err != nil {
return err
}
return nil
}