diff --git a/apps/flow/api/flow_work_classify.go b/apps/flow/api/flow_work_classify.go deleted file mode 100644 index f8efc11..0000000 --- a/apps/flow/api/flow_work_classify.go +++ /dev/null @@ -1,65 +0,0 @@ -package api - -// ========================================================================== -// 生成日期:2022-08-24 22:02:33 +0800 CST -// 生成路径: apps/flow/api/flow_work_classify.go -// 生成人:panda -// ========================================================================== -import ( - "github.com/XM-GO/PandaKit/model" - "github.com/XM-GO/PandaKit/restfulx" - - "github.com/XM-GO/PandaKit/utils" - "pandax/apps/flow/entity" - "pandax/apps/flow/services" -) - -type FlowWorkClassifyApi struct { - FlowWorkClassifyApp services.FlowWorkClassifyModel -} - -// GetFlowWorkClassifyList Classify列表数据 -func (p *FlowWorkClassifyApi) GetFlowWorkClassifyList(rc *restfulx.ReqCtx) { - data := entity.FlowWorkClassify{} - pageNum := restfulx.QueryInt(rc, "pageNum", 1) - pageSize := restfulx.QueryInt(rc, "pageSize", 10) - data.Name = restfulx.QueryParam(rc, "name") - - list, total := p.FlowWorkClassifyApp.FindListPage(pageNum, pageSize, data) - - rc.ResData = model.ResultPage{ - Total: total, - PageNum: int64(pageNum), - PageSize: int64(pageNum), - Data: list, - } -} - -// GetFlowWorkClassify 获取Classify -func (p *FlowWorkClassifyApi) GetFlowWorkClassify(rc *restfulx.ReqCtx) { - id := restfulx.PathParamInt(rc, "id") - p.FlowWorkClassifyApp.FindOne(int64(id)) -} - -// InsertFlowWorkClassify 添加Classify -func (p *FlowWorkClassifyApi) InsertFlowWorkClassify(rc *restfulx.ReqCtx) { - var data entity.FlowWorkClassify - restfulx.BindQuery(rc, &data) - - p.FlowWorkClassifyApp.Insert(data) -} - -// UpdateFlowWorkClassify 修改Classify -func (p *FlowWorkClassifyApi) UpdateFlowWorkClassify(rc *restfulx.ReqCtx) { - var data entity.FlowWorkClassify - restfulx.BindQuery(rc, &data) - - p.FlowWorkClassifyApp.Update(data) -} - -// DeleteFlowWorkClassify 删除Classify -func (p *FlowWorkClassifyApi) DeleteFlowWorkClassify(rc *restfulx.ReqCtx) { - id := restfulx.PathParam(rc, "id") - ids := utils.IdsStrToIdsIntGroup(id) - p.FlowWorkClassifyApp.Delete(ids) -} diff --git a/apps/flow/entity/work_classify.go b/apps/flow/entity/work_classify.go deleted file mode 100644 index d966c0b..0000000 --- a/apps/flow/entity/work_classify.go +++ /dev/null @@ -1,14 +0,0 @@ -package entity - -import "github.com/XM-GO/PandaKit/model" - -// FlowWorkClassify 工作流流程分类 -type FlowWorkClassify struct { - model.BaseAutoModel - Name string `gorm:"column:name; type: varchar(128)" json:"name"` // 分类名称 - Creator int `gorm:"column:creator; type: int(11)" json:"creator"` // 创建者 -} - -func (FlowWorkClassify) TableName() string { - return "flow_work_classify" -} diff --git a/apps/flow/entity/work_info.go b/apps/flow/entity/work_info.go deleted file mode 100644 index 6a83b9a..0000000 --- a/apps/flow/entity/work_info.go +++ /dev/null @@ -1,25 +0,0 @@ -package entity - -import ( - "encoding/json" - "github.com/XM-GO/PandaKit/model" -) - -// FlowWorkInfo 工作流信息 -type FlowWorkInfo struct { - model.BaseAutoModel - Name string `gorm:"column:name; type:varchar(128)" json:"name"` // 流程名称 - Icon string `gorm:"column:icon; type:varchar(128)" json:"icon" ` // 流程标签 - Structure json.RawMessage `gorm:"column:structure; type:json" json:"structure" ` // 流程结构 - Classify int `gorm:"column:classify; type:int(11)" json:"classify"` // 分类ID - Templates json.RawMessage `gorm:"column:templates; type:json" json:"templates"` // 模版 - Task json.RawMessage `gorm:"column:task; type:json" json:"task"` // 任务ID, array, 可执行多个任务,可以当成通知任务,每个节点都会去执行 - SubmitCount int `gorm:"column:submit_count; type:int(11); default:0" json:"submitCount"` // 提交统计 - Creator int `gorm:"column:creator; type:int(11)" json:"creator"` // 创建者 - Notice json.RawMessage `gorm:"column:notice; type:json" json:"notice"` // 绑定通知 - Remarks string `gorm:"column:remarks; type:varchar(1024)" json:"remarks"` // 流程备注 -} - -func (FlowWorkInfo) TableName() string { - return "flow_work_info" -} diff --git a/apps/flow/entity/work_order.go b/apps/flow/entity/work_order.go deleted file mode 100644 index bced08e..0000000 --- a/apps/flow/entity/work_order.go +++ /dev/null @@ -1,26 +0,0 @@ -package entity - -import ( - "encoding/json" - "github.com/XM-GO/PandaKit/model" -) - -// FlowWorkOrder 工作流工单 -type FlowWorkOrder struct { - model.BaseAutoModel - Title string `gorm:"column:title; type:varchar(128)" json:"title"` // 工单标题 - Priority int `gorm:"column:priority; type:int(11)" json:"priority"` // 工单优先级 1,正常 2,紧急 3,非常紧急 - Process int `gorm:"column:process; type:int(11)" json:"process"` // 流程ID - Classify int `gorm:"column:classify; type:int(11)" json:"classify"` // 分类ID - IsEnd int `gorm:"column:is_end; type:int(11); default:0" json:"is_end"` // 是否结束, 0 未结束,1 已结束 - IsDenied int `gorm:"column:is_denied; type:int(11); default:0" json:"is_denied"` // 是否被拒绝, 0 没有,1 有 - State json.RawMessage `gorm:"column:state; type:json" json:"state"` // 状态信息 - RelatedPerson json.RawMessage `gorm:"column:related_person; type:json" json:"related_person"` // 工单所有处理人 - Creator int `gorm:"column:creator; type:int(11)" json:"creator"` // 创建人 - UrgeCount int `gorm:"column:urge_count; type:int(11); default:0" json:"urge_count"` // 催办次数 - UrgeLastTime int `gorm:"column:urge_last_time; type:int(11); default:0" json:"urge_last_time"` // 上一次催促时间 -} - -func (FlowWorkOrder) TableName() string { - return "flow_work_order" -} diff --git a/apps/flow/entity/work_order_templates.go b/apps/flow/entity/work_order_templates.go deleted file mode 100644 index 92d9f73..0000000 --- a/apps/flow/entity/work_order_templates.go +++ /dev/null @@ -1,18 +0,0 @@ -package entity - -import ( - "encoding/json" - "github.com/XM-GO/PandaKit/model" -) - -// FlowWorkOrderTemplate 工单绑定模版数据 -type FlowWorkOrderTemplate struct { - model.BaseAutoModel - WorkOrder int `gorm:"column:work_order; type: int(11)" json:"work_order"` // 工单ID - FormStructure json.RawMessage `gorm:"column:form_structure; type: json" json:"form_structure"` // 表单结构 - FormData json.RawMessage `gorm:"column:form_data; type: json" json:"form_data"` // 表单数据 -} - -func (FlowWorkOrderTemplate) TableName() string { - return "flow_work_order_templates" -} diff --git a/apps/flow/entity/work_stage.go b/apps/flow/entity/work_stage.go deleted file mode 100644 index 2849dac..0000000 --- a/apps/flow/entity/work_stage.go +++ /dev/null @@ -1,25 +0,0 @@ -package entity - -import ( - "github.com/XM-GO/PandaKit/model" -) - -// FlowWorkStage 工作流工序(流转历史) -type FlowWorkStage struct { - model.BaseAutoModel - Title string `gorm:"column:title; type: varchar(128)" json:"title"` // 工单标题 - WorkOrder int `gorm:"column:work_order; type: int(11)" json:"work_order"` // 工单ID - State string `gorm:"column:state; type: varchar(128)" json:"state"` // 工单状态 - Source string `gorm:"column:source; type: varchar(128)" json:"source"` // 源节点ID - Target string `gorm:"column:target; type: varchar(128)" json:"target"` // 目标节点ID - Stage string `gorm:"column:stage; type: varchar(128)" json:"stage"` // 流转ID - Status int `gorm:"column:status; type: int(11)" json:"status"` // 流转状态 1 同意, 0 拒绝, 2 其他 - Processor string `gorm:"column:processor; type: varchar(45)" json:"processor"` // 处理人 - ProcessorId int `gorm:"column:processor_id; type: int(11)" json:"processor_id"` // 处理人ID - CostDuration int64 `gorm:"column:cost_duration; type: int(11)" json:"cost_duration"` // 处理时长 - Remarks string `gorm:"column:remarks; type: longtext" json:"remarks"` // 备注 -} - -func (FlowWorkStage) TableName() string { - return "flow_work_stage" -} diff --git a/apps/flow/entity/work_task.go b/apps/flow/entity/work_task.go deleted file mode 100644 index 0fc0bb5..0000000 --- a/apps/flow/entity/work_task.go +++ /dev/null @@ -1,33 +0,0 @@ -package entity - -import ( - "github.com/XM-GO/PandaKit/model" -) - -// FlowWorkTask 工作流任务 -type FlowWorkTask struct { - model.BaseAutoModel - Name string `gorm:"column:name; type: varchar(256)" json:"name"` // 任务名称 - TaskType string `gorm:"column:task_type; type: varchar(45)" json:"task_type"` // 任务类型 - Content string `gorm:"column:content; type: longtext" json:"content"` // 任务内容 - Creator int `gorm:"column:creator; type: int(11)" json:"creator"` // 创建者 - Remarks string `gorm:"column:remarks; type: longtext" json:"remarks"` // 备注 -} - -func (FlowWorkTask) TableName() string { - return "flow_work_task" -} - -// FlowWorkTaskHistory 工作流任务执行历史 -type FlowWorkTaskHistory struct { - model.BaseAutoModel - Task int `gorm:"column:task; type: int(11)" json:"task"` // 任务ID - Name string `gorm:"column:name; type: varchar(256)" json:"name"` // 任务名称 - TaskType int `gorm:"column:task_type; type: int(11)" json:"task_type"` // 任务类型, python, shell - ExecutionTime string `gorm:"column:execution_time; type: varchar(128)" json:"execution_time"` // 执行时间 - Result string `gorm:"column:result; type: longtext" json:"result"` // 任务返回 -} - -func (FlowWorkTaskHistory) TableName() string { - return "flow_work_task_history" -} diff --git a/apps/flow/entity/work_templates.go b/apps/flow/entity/work_templates.go deleted file mode 100644 index 409e3a0..0000000 --- a/apps/flow/entity/work_templates.go +++ /dev/null @@ -1,19 +0,0 @@ -package entity - -import ( - "encoding/json" - "github.com/XM-GO/PandaKit/model" -) - -// FlowWorkTemplates 工作流表单模板 -type FlowWorkTemplates struct { - model.BaseAutoModel - Name string `gorm:"column:name; type: varchar(128)" json:"name" binding:"required"` // 模板名称 - FormStructure json.RawMessage `gorm:"column:form_structure; type: json" json:"form_structure" binding:"required"` // 表单结构 - Creator int `gorm:"column:creator; type: int(11)" json:"creator"` // 创建者 - Remarks string `gorm:"column:remarks; type: longtext" json:"remarks"` // 备注 -} - -func (FlowWorkTemplates) TableName() string { - return "flow_work_templates" -} diff --git a/apps/flow/router/flow_work_classify.go b/apps/flow/router/flow_work_classify.go deleted file mode 100644 index 2fed179..0000000 --- a/apps/flow/router/flow_work_classify.go +++ /dev/null @@ -1,70 +0,0 @@ -// ========================================================================== -// 生成日期:2022-08-24 22:02:33 +0800 CST -// 生成路径: apps/flow/router/flow_work_classify.go -// 生成人:panda -// ========================================================================== -package router - -import ( - "github.com/XM-GO/PandaKit/model" - "github.com/XM-GO/PandaKit/restfulx" - "pandax/apps/flow/api" - "pandax/apps/flow/entity" - "pandax/apps/flow/services" - - restfulspec "github.com/emicklei/go-restful-openapi/v2" - "github.com/emicklei/go-restful/v3" -) - -func InitFlowWorkClassifyRouter(container *restful.Container) { - s := &api.FlowWorkClassifyApi{ - FlowWorkClassifyApp: services.FlowWorkClassifyModelDao, - } - - ws := new(restful.WebService) - ws.Path("/flow/workclassify").Produces(restful.MIME_JSON) - tags := []string{"workclassify"} - - ws.Route(ws.GET("/list").To(func(request *restful.Request, response *restful.Response) { - restfulx.NewReqCtx(request, response).WithLog("获取Classify分页列表").Handle(s.GetFlowWorkClassifyList) - }). - Doc("获取Classify分页列表"). - Param(ws.QueryParameter("pageNum", "页数").Required(true).DataType("int")). - Param(ws.QueryParameter("pageSize", "每页条数").Required(true).DataType("int")). - Metadata(restfulspec.KeyOpenAPITags, tags). - Writes(model.ResultPage{}). - Returns(200, "OK", model.ResultPage{})) - - ws.Route(ws.GET("/{id}").To(func(request *restful.Request, response *restful.Response) { - restfulx.NewReqCtx(request, response).WithLog("获取Classify信息").Handle(s.GetFlowWorkClassify) - }). - Doc("获取Classify信息"). - Param(ws.PathParameter("id", "Id").DataType("int")). - Metadata(restfulspec.KeyOpenAPITags, tags). - Writes(entity.FlowWorkClassify{}). // on the response - Returns(200, "OK", entity.FlowWorkClassify{}). - Returns(404, "Not Found", nil)) - - ws.Route(ws.POST("").To(func(request *restful.Request, response *restful.Response) { - restfulx.NewReqCtx(request, response).WithLog("添加Classify信息").Handle(s.InsertFlowWorkClassify) - }). - Doc("添加Classify信息"). - Metadata(restfulspec.KeyOpenAPITags, tags). - Reads(entity.FlowWorkClassify{})) - - ws.Route(ws.PUT("").To(func(request *restful.Request, response *restful.Response) { - restfulx.NewReqCtx(request, response).WithLog("修改Classify信息").Handle(s.UpdateFlowWorkClassify) - }). - Doc("修改Classify信息"). - Metadata(restfulspec.KeyOpenAPITags, tags). - Reads(entity.FlowWorkClassify{})) - - ws.Route(ws.DELETE("/{id}").To(func(request *restful.Request, response *restful.Response) { - restfulx.NewReqCtx(request, response).WithLog("删除Classify信息").Handle(s.DeleteFlowWorkClassify) - }). - Doc("删除Classify信息"). - Metadata(restfulspec.KeyOpenAPITags, tags). - Param(ws.PathParameter("id", "多id 1,2,3").DataType("string"))) - - container.Add(ws) -} diff --git a/apps/flow/services/flow_work_classify.go b/apps/flow/services/flow_work_classify.go deleted file mode 100644 index 93699d0..0000000 --- a/apps/flow/services/flow_work_classify.go +++ /dev/null @@ -1,87 +0,0 @@ -// ========================================================================== -// 生成日期:2022-08-24 22:02:33 +0800 CST -// 生成路径: apps/flow/services/flow_work_classify.go -// 生成人:panda -// ========================================================================== - -package services - -import ( - "github.com/XM-GO/PandaKit/biz" - "pandax/apps/flow/entity" - "pandax/pkg/global" -) - -type ( - FlowWorkClassifyModel interface { - Insert(data entity.FlowWorkClassify) *entity.FlowWorkClassify - FindOne(id int64) *entity.FlowWorkClassify - FindListPage(page, pageSize int, data entity.FlowWorkClassify) (*[]entity.FlowWorkClassify, int64) - FindList(data entity.FlowWorkClassify) *[]entity.FlowWorkClassify - Update(data entity.FlowWorkClassify) *entity.FlowWorkClassify - Delete(ids []int64) - } - - workclassifyModelImpl struct { - table string - } -) - -var FlowWorkClassifyModelDao FlowWorkClassifyModel = &workclassifyModelImpl{ - table: `flow_work_classify`, -} - -func (m *workclassifyModelImpl) Insert(data entity.FlowWorkClassify) *entity.FlowWorkClassify { - err := global.Db.Table(m.table).Create(&data).Error - biz.ErrIsNil(err, "添加工作流分类失败") - return &data -} - -func (m *workclassifyModelImpl) FindOne(id int64) *entity.FlowWorkClassify { - resData := new(entity.FlowWorkClassify) - db := global.Db.Table(m.table).Where("id = ?", id) - err := db.First(resData).Error - biz.ErrIsNil(err, "查询工作流分类失败") - return resData -} - -func (m *workclassifyModelImpl) FindListPage(page, pageSize int, data entity.FlowWorkClassify) (*[]entity.FlowWorkClassify, int64) { - list := make([]entity.FlowWorkClassify, 0) - var total int64 = 0 - offset := pageSize * (page - 1) - db := global.Db.Table(m.table) - // 此处填写 where参数判断 - if data.Name != "" { - db = db.Where("name like ?", "%"+data.Name+"%") - } - if data.Creator != 0 { - db = db.Where("creator = ?", data.Creator) - } - err := db.Count(&total).Error - err = db.Order("create_time").Limit(pageSize).Offset(offset).Find(&list).Error - biz.ErrIsNil(err, "查询工作流分类分页列表失败") - return &list, total -} - -func (m *workclassifyModelImpl) FindList(data entity.FlowWorkClassify) *[]entity.FlowWorkClassify { - list := make([]entity.FlowWorkClassify, 0) - db := global.Db.Table(m.table) - // 此处填写 where参数判断 - if data.Name != "" { - db = db.Where("name like ?", "%"+data.Name+"%") - } - if data.Creator != 0 { - db = db.Where("creator = ?", data.Creator) - } - biz.ErrIsNil(db.Order("create_time").Find(&list).Error, "查询工作流分类列表失败") - return &list -} - -func (m *workclassifyModelImpl) Update(data entity.FlowWorkClassify) *entity.FlowWorkClassify { - biz.ErrIsNil(global.Db.Table(m.table).Updates(&data).Error, "修改工作流分类失败") - return &data -} - -func (m *workclassifyModelImpl) Delete(ids []int64) { - biz.ErrIsNil(global.Db.Table(m.table).Delete(&entity.FlowWorkClassify{}, "id in (?)", ids).Error, "删除工作流分类失败") -} diff --git a/go.mod b/go.mod index f74e611..0052d77 100644 --- a/go.mod +++ b/go.mod @@ -28,11 +28,15 @@ require ( github.com/casbin/casbin/v2 v2.37.4 // indirect github.com/casbin/gorm-adapter/v3 v3.4.6 // indirect github.com/denisenkom/go-mssqldb v0.11.0 // indirect + github.com/dlclark/regexp2 v1.7.0 // indirect + github.com/dop251/goja v0.0.0-20230226152633-7c93113e17ac // indirect + github.com/eclipse/paho.mqtt.golang v1.4.2 // indirect github.com/go-ole/go-ole v1.2.5 // indirect github.com/go-openapi/jsonpointer v0.19.5 // indirect github.com/go-openapi/jsonreference v0.20.0 // indirect github.com/go-openapi/swag v0.19.15 // indirect github.com/go-redis/redis v6.15.9+incompatible // indirect + github.com/go-sourcemap/sourcemap v2.1.3+incompatible // indirect github.com/go-sql-driver/mysql v1.6.0 // indirect github.com/golang-sql/civil v0.0.0-20190719163853-cb61b32ac6fe // indirect github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0 // indirect @@ -55,6 +59,7 @@ require ( github.com/json-iterator/go v1.1.12 // indirect github.com/lib/pq v1.10.4 // indirect github.com/mailru/easyjson v0.7.6 // indirect + github.com/mitchellh/mapstructure v1.5.0 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect github.com/mohae/deepcopy v0.0.0-20170929034955-c48cc78d4826 // indirect diff --git a/go.sum b/go.sum index a0cb282..80f51d7 100644 --- a/go.sum +++ b/go.sum @@ -47,6 +47,16 @@ github.com/dgrijalva/jwt-go v3.2.0+incompatible h1:7qlOGliEKZXTDg6OTjfoBKDXWrumC github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ= github.com/didip/tollbooth v4.0.2+incompatible h1:fVSa33JzSz0hoh2NxpwZtksAzAgd7zjmGO20HCZtF4M= github.com/didip/tollbooth v4.0.2+incompatible/go.mod h1:A9b0665CE6l1KmzpDws2++elm/CsuWBMa5Jv4WY0PEY= +github.com/dlclark/regexp2 v1.4.1-0.20201116162257-a2a8dda75c91/go.mod h1:2pZnwuY/m+8K6iRw6wQdMtk+rH5tNGR1i55kozfMjCc= +github.com/dlclark/regexp2 v1.7.0 h1:7lJfhqlPssTb1WQx4yvTHN0uElPEv52sbaECrAQxjAo= +github.com/dlclark/regexp2 v1.7.0/go.mod h1:DHkYz0B9wPfa6wondMfaivmHpzrQ3v9q8cnmRbL6yW8= +github.com/dop251/goja v0.0.0-20211022113120-dc8c55024d06/go.mod h1:R9ET47fwRVRPZnOGvHxxhuZcbrMCuiqOz3Rlrh4KSnk= +github.com/dop251/goja v0.0.0-20230226152633-7c93113e17ac h1:NGu46Adk2oPN3tinGFItahy4W9l+9uhEf03ZxbwmdVE= +github.com/dop251/goja v0.0.0-20230226152633-7c93113e17ac/go.mod h1:yRkwfj0CBpOGre+TwBsqPV0IH0Pk73e4PXJOeNDboGs= +github.com/dop251/goja_nodejs v0.0.0-20210225215109-d91c329300e7/go.mod h1:hn7BA7c8pLvoGndExHudxTDKZ84Pyvv+90pbBjbTz0Y= +github.com/dop251/goja_nodejs v0.0.0-20211022123610-8dd9abb0616d/go.mod h1:DngW8aVqWbuLRMHItjPUyqdj+HWPvnQe8V8y1nDpIbM= +github.com/eclipse/paho.mqtt.golang v1.4.2 h1:66wOzfUHSSI1zamx7jR6yMEI5EuHnT1G6rNA5PM12m4= +github.com/eclipse/paho.mqtt.golang v1.4.2/go.mod h1:JGt0RsEwEX+Xa/agj90YJ9d9DH2b7upDZMK9HRbFvCA= github.com/emicklei/go-restful-openapi/v2 v2.9.0 h1:djsWqjhI0EVYfkLCCX6jZxUkLmYUq2q9tt09ZbixfyE= github.com/emicklei/go-restful-openapi/v2 v2.9.0/go.mod h1:VKNgZyYviM1hnyrjD9RDzP2RuE94xTXxV+u6MGN4v4k= github.com/emicklei/go-restful/v3 v3.7.3/go.mod h1:6n3XBCmQQb25CM2LCACGz8ukIrRry+4bhvbpWn3mrbc= @@ -85,6 +95,8 @@ github.com/go-playground/universal-translator v0.18.0/go.mod h1:UvRDBj+xPUEGrFYl github.com/go-playground/validator/v10 v10.8.0/go.mod h1:9JhgTzTaE31GZDpH/HSvHiRJrJ3iKAgqqH0Bl/Ocjdk= github.com/go-redis/redis v6.15.9+incompatible h1:K0pv1D7EQUjfyoMql+r/jZqCLizCGKFlFgcHWWmHQjg= github.com/go-redis/redis v6.15.9+incompatible/go.mod h1:NAIEuMOZ/fxfXJIrKDQDz8wamY7mA7PouImQ2Jvg6kA= +github.com/go-sourcemap/sourcemap v2.1.3+incompatible h1:W1iEw64niKVGogNgBN3ePyLFfuisuzeidWPMPWmECqU= +github.com/go-sourcemap/sourcemap v2.1.3+incompatible/go.mod h1:F8jJfvm2KbVjc5NqelyYJmf/v5J0dwNLS2mL4sNA1Jg= github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg= github.com/go-sql-driver/mysql v1.6.0 h1:BCTh4TKNUYmOmMUcQ3IipzF5prigylS7XXjEkfCHuOE= github.com/go-sql-driver/mysql v1.6.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg= @@ -226,6 +238,8 @@ github.com/mattn/go-colorable v0.1.6/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope github.com/mattn/go-isatty v0.0.5/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s= github.com/mattn/go-isatty v0.0.7/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s= github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU= +github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY= +github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= @@ -349,6 +363,7 @@ golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20190813141303-74dc4d7220e7/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200425230154-ff2c4b7c35a0/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= diff --git a/pkg/rule_engine/instance.go b/pkg/rule_engine/instance.go index cb7e1f1..022c5ba 100644 --- a/pkg/rule_engine/instance.go +++ b/pkg/rule_engine/instance.go @@ -15,8 +15,8 @@ type ruleChainInstance struct { nodes map[string]nodes.Node } -func newRuleChainInstance(data []byte) (*ruleChainInstance, []error) { - errors := []error{} +func NewRuleChainInstance(data []byte) (*ruleChainInstance, []error) { + errors := make([]error, 0) manifest, err := manifest.New(data) if err != nil { @@ -29,7 +29,7 @@ func newRuleChainInstance(data []byte) (*ruleChainInstance, []error) { // newWithManifest create rule chain by user's manifest file func newInstanceWithManifest(m *manifest.Manifest) (*ruleChainInstance, []error) { - errs := []error{} + errs := make([]error, 0) r := &ruleChainInstance{ firstRuleNodeId: m.FirstRuleNodeId, nodes: make(map[string]nodes.Node), @@ -49,7 +49,6 @@ func newInstanceWithManifest(m *manifest.Manifest) (*ruleChainInstance, []error) } r.nodes[n.Id] = node } - for _, edge := range m.Edges { originalNode, found := r.nodes[edge.SourceNodeId] if !found { @@ -63,7 +62,7 @@ func newInstanceWithManifest(m *manifest.Manifest) (*ruleChainInstance, []error) errs = append(errs, err) continue } - originalNode.AddLinkedNode(edge.Type, targetNode) + originalNode.AddLinkedNode(edge.Properties["type"].(string), targetNode) } for name, node := range r.nodes { targetNodes := node.GetLinkedNodes() diff --git a/pkg/rule_engine/instance_test.go b/pkg/rule_engine/instance_test.go new file mode 100644 index 0000000..0680858 --- /dev/null +++ b/pkg/rule_engine/instance_test.go @@ -0,0 +1,17 @@ +package rule_engine + +import ( + "io/ioutil" + "testing" +) + +func TestNewRuleChainInstance(t *testing.T) { + buf, err := ioutil.ReadFile("./manifest/manifest_sample.json") + if err != nil { + t.Error(err) + } + _, errs := NewRuleChainInstance(buf) + if len(errs) > 0 { + t.Error(errs[0]) + } +} diff --git a/pkg/rule_engine/manifest/manifest.go b/pkg/rule_engine/manifest/manifest.go index fa32a41..21b9349 100644 --- a/pkg/rule_engine/manifest/manifest.go +++ b/pkg/rule_engine/manifest/manifest.go @@ -12,9 +12,10 @@ type Node struct { } type Edge struct { - SourceNodeId string `json:"sourceNodeId" yaml:"sourceNodeId"` - TargetNodeId string `json:"targetNodeId" yaml:"targetNodeId"` - Type string `json:"type" yaml:"type"` //success or fail + SourceNodeId string `json:"sourceNodeId" yaml:"sourceNodeId"` + TargetNodeId string `json:"targetNodeId" yaml:"targetNodeId"` + Type string `json:"type" yaml:"type"` //success or fail + Properties map[string]interface{} `json:"properties" yaml:"properties"` //debugMode } type Manifest struct { @@ -26,12 +27,13 @@ type Manifest struct { func New(data []byte) (*Manifest, error) { firstRuleNodeId := "" manifest := make(map[string]interface{}) - if err := json.Unmarshal(data, manifest); err != nil { + if err := json.Unmarshal(data, &manifest); err != nil { logrus.WithError(err).Errorf("invalid node chain manifest file") return nil, err } nodes := make([]Node, 0) - for _, node := range manifest["nodes"].([]map[string]interface{}) { + for _, mn := range manifest["nodes"].([]interface{}) { + node := mn.(map[string]interface{}) if node["type"].(string) == "InputNode" { firstRuleNodeId = node["id"].(string) } @@ -42,9 +44,11 @@ func New(data []byte) (*Manifest, error) { }) } edges := make([]Edge, 0) - for _, edge := range manifest["edges"].([]map[string]interface{}) { + for _, en := range manifest["edges"].([]interface{}) { + edge := en.(map[string]interface{}) edges = append(edges, Edge{ Type: edge["type"].(string), + Properties: edge["properties"].(map[string]interface{}), SourceNodeId: edge["sourceNodeId"].(string), TargetNodeId: edge["targetNodeId"].(string), }) diff --git a/pkg/rule_engine/manifest/manifest_sample.json b/pkg/rule_engine/manifest/manifest_sample.json index a6dcf80..b80e944 100644 --- a/pkg/rule_engine/manifest/manifest_sample.json +++ b/pkg/rule_engine/manifest/manifest_sample.json @@ -7,8 +7,7 @@ "y": 280, "properties": { "icon": "/src/assets/icon_module/svg/start.svg", - "debugMode": false, - "status": false + "debugMode": false }, "zIndex": 1002, "text": { @@ -24,8 +23,7 @@ "y": 160, "properties": { "icon": "/src/assets/icon_module/svg/function.svg", - "debugMode": false, - "status": false + "debugMode": false }, "zIndex": 1004, "text": { @@ -42,7 +40,7 @@ "properties": { "icon": "/src/assets/icon_module/svg/switch.svg", "debugMode": false, - "status": false + "scripts": "return {\n msg: msg,\n metadata: metadata,\n msgType: msgType\n};" }, "zIndex": 1006, "text": { @@ -66,7 +64,9 @@ "x": 540, "y": 160 }, - "properties": {}, + "properties": { + "type": "Success" + }, "zIndex": 1007, "pointsList": [ { @@ -100,7 +100,9 @@ "x": 540, "y": 460 }, - "properties": {}, + "properties": { + "type": "Failure" + }, "zIndex": 1008, "pointsList": [ { diff --git a/pkg/rule_engine/message/message.go b/pkg/rule_engine/message/message.go index 493b931..18c3250 100644 --- a/pkg/rule_engine/message/message.go +++ b/pkg/rule_engine/message/message.go @@ -1,17 +1,28 @@ package message +import "github.com/sirupsen/logrus" + // Message ... type Message interface { GetOriginator() string GetType() string - GetPayload() []byte + GetData() []byte + GetMetadata() Metadata SetType(string) - SetPayload([]byte) + SetData([]byte) SetOriginator(string) + SetMetadata(Metadata) MarshalBinary() ([]byte, error) UnmarshalBinary(b []byte) error } +// Metadata ... +type Metadata interface { + Keys() []string + GetKeyValue(key string) interface{} + SetKeyValue(key string, val interface{}) +} + // Predefined message types const ( MessageTypePostAttributesRequest = "Post attributes" @@ -25,31 +36,75 @@ const ( // NewMessage ... func NewMessage() Message { return &defaultMessage{ - payload: []byte{}, + data: []byte{}, } } type defaultMessage struct { - originator string //数据发布者 - messageType string //数据类型,数据来源 - payload []byte //二进制数据 + id string //uuid + ts int64 //时间戳 + msgType string //消息类型,数据来源 + originator string //数据发布者 + customerId string //客户Id UUID + entityId string //实体Id UUID + data []byte //数据 + dataType string //数据类型 JSON + metadata Metadata //数据的元数据 } // NewMessageWithDetail ... -func NewMessageWithDetail(originator string, messageType string, payload []byte) Message { +func NewMessageWithDetail(originator string, messageType string, msg []byte) Message { return &defaultMessage{ - originator: originator, - messageType: messageType, - payload: payload, + originator: originator, + msgType: messageType, + data: msg, } } func (t *defaultMessage) GetOriginator() string { return t.originator } -func (t *defaultMessage) GetType() string { return t.messageType } -func (t *defaultMessage) GetPayload() []byte { return t.payload } -func (t *defaultMessage) SetType(messageType string) { t.messageType = messageType } -func (t *defaultMessage) SetPayload(payload []byte) { t.payload = payload } +func (t *defaultMessage) GetType() string { return t.msgType } +func (t *defaultMessage) GetData() []byte { return t.data } +func (t *defaultMessage) GetMetadata() Metadata { return t.metadata } +func (t *defaultMessage) SetType(msgType string) { t.msgType = msgType } +func (t *defaultMessage) SetData(data []byte) { t.data = data } func (t *defaultMessage) SetOriginator(originator string) { t.originator = originator } +func (t *defaultMessage) SetMetadata(metadata Metadata) { t.metadata = metadata } func (t *defaultMessage) MarshalBinary() ([]byte, error) { return nil, nil } func (t *defaultMessage) UnmarshalBinary(b []byte) error { return nil } + +// NewMetadata ... +func NewMetadata() Metadata { + return &defaultMetadata{ + values: make(map[string]interface{}), + } +} + +type defaultMetadata struct { + values map[string]interface{} +} + +func newDefaultMetadata(vals map[string]interface{}) Metadata { + return &defaultMetadata{ + values: vals, + } +} + +func (t *defaultMetadata) Keys() []string { + keys := make([]string, 0) + for key := range t.values { + keys = append(keys, key) + } + return keys +} + +func (t *defaultMetadata) GetKeyValue(key string) interface{} { + if _, found := t.values[key]; !found { + logrus.Fatalf("no key '%s' in metadata", key) + } + return t.values[key] +} + +func (t *defaultMetadata) SetKeyValue(key string, val interface{}) { + t.values[key] = val +} diff --git a/pkg/rule_engine/nodes/action_assign_to_customer_node.go b/pkg/rule_engine/nodes/action_assign_to_customer_node.go new file mode 100644 index 0000000..38ac240 --- /dev/null +++ b/pkg/rule_engine/nodes/action_assign_to_customer_node.go @@ -0,0 +1,13 @@ +package nodes + +type assignToCustomerNode struct { + bareNode +} + +type assignToCustomerFactory struct{} + +func (f assignToCustomerFactory) Name() string { return "AssignCustomerFactoryNode" } +func (f assignToCustomerFactory) Category() string { return NODE_CATEGORY_ACTION } +func (f assignToCustomerFactory) Create(id string, meta Metadata) (Node, error) { + return nil, nil +} diff --git a/pkg/rule_engine/nodes/action_clear_alarm_node.go b/pkg/rule_engine/nodes/action_clear_alarm_node.go new file mode 100644 index 0000000..6212e72 --- /dev/null +++ b/pkg/rule_engine/nodes/action_clear_alarm_node.go @@ -0,0 +1,37 @@ +package nodes + +import ( + "pandax/pkg/rule_engine/message" + "time" + + "github.com/sirupsen/logrus" +) + +const ClearAlarmNodeName = "ClearAlarmNode" + +type clearAlarmNodeFactory struct{} + +type clearAlarmNode struct { + bareNode + DetailBuilderScript string `json:"detailBuilderScript" yaml:"detailBuilderScript"` + AlarmType string `json:"alarmType" yaml:"alarmType"` + AlarmSeverity string `json:"alarmSeverity" yaml:"alarmSeverity"` + Propagate string `json:"propagate" yaml:"propagate"` + AlarmStartTime *time.Time `json:"alarmStartTime" yaml:"alarmStartTime"` + AlarmEndTime *time.Time `json:"alarmEndTime" yaml:"alarmEndTime"` +} + +func (f clearAlarmNodeFactory) Name() string { return ClearAlarmNodeName } +func (f clearAlarmNodeFactory) Category() string { return NODE_CATEGORY_ACTION } +func (f clearAlarmNodeFactory) Create(id string, meta Metadata) (Node, error) { + labels := []string{"Created", "Updated", "Failure"} + node := &clearAlarmNode{ + bareNode: newBareNode(f.Name(), id, meta, labels), + } + return decodePath(meta, node) +} + +func (n *clearAlarmNode) Handle(msg message.Message) error { + logrus.Infof("%s handle message '%s'", n.Name(), msg.GetType()) + return nil +} diff --git a/pkg/rule_engine/nodes/action_create_alarm_node.go b/pkg/rule_engine/nodes/action_create_alarm_node.go new file mode 100644 index 0000000..627cbf7 --- /dev/null +++ b/pkg/rule_engine/nodes/action_create_alarm_node.go @@ -0,0 +1,43 @@ +package nodes + +import ( + "fmt" + "pandax/pkg/rule_engine/message" + + "github.com/sirupsen/logrus" +) + +type createAlarmNode struct { + bareNode + DetailBuilderScript string `json:"detailBuilderScript" yaml:"detailBuilderScript"` + AlarmType string `json:"alarmType" yaml:"alarmType"` + AlarmSeverity string `json:"alarmSeverity" yaml:"alarmSeverity"` + Propagate string `json:"propagate" yaml:"propagate"` + AlarmStartTime string `json:"alarmStartTime" yaml:"alarmStartTime"` + AlarmEndTime string `json:"alarmEndTime" yaml:"alarmEndTime"` +} + +type createAlarmNodeFactory struct{} + +func (f createAlarmNodeFactory) Name() string { return "CreateAlarmNode" } +func (f createAlarmNodeFactory) Category() string { return NODE_CATEGORY_ACTION } +func (f createAlarmNodeFactory) Create(id string, meta Metadata) (Node, error) { + labels := []string{"Created", "Updated", "Failure"} + node := &createAlarmNode{ + bareNode: newBareNode(f.Name(), id, meta, labels), + } + return decodePath(meta, node) +} + +func (n *createAlarmNode) Handle(msg message.Message) error { + logrus.Infof("%s handle message '%s'", n.Name(), msg.GetType()) + + node1 := n.GetLinkedNode("Created") + node2 := n.GetLinkedNode("Updated") + node3 := n.GetLinkedNode("Failure") + if node1 == nil || node2 == nil || node3 == nil { + return fmt.Errorf("no valid label linked node in %s", n.Name()) + } + + return nil +} diff --git a/pkg/rule_engine/nodes/action_create_relation_node.go b/pkg/rule_engine/nodes/action_create_relation_node.go new file mode 100644 index 0000000..dc05056 --- /dev/null +++ b/pkg/rule_engine/nodes/action_create_relation_node.go @@ -0,0 +1,34 @@ +package nodes + +import ( + "pandax/pkg/rule_engine/message" +) + +type createRelationNode struct { + bareNode + Direction string + RelationType string + EntityType string + EntityNamePattern string + EntityTypePattern string + EntityCacheExpiration int64 + CreateEntityIfNotExists bool + ChangeOriginatorToRelatedEntity bool + RemoveCurrentRelations bool +} + +type createRelationNodeFactory struct{} + +func (f createRelationNodeFactory) Name() string { return "CreateRelationNode" } +func (f createRelationNodeFactory) Category() string { return NODE_CATEGORY_ACTION } +func (f createRelationNodeFactory) Create(id string, meta Metadata) (Node, error) { + labels := []string{"Success", "Failure"} + node := &createRelationNode{ + bareNode: newBareNode(f.Name(), id, meta, labels), + } + return decodePath(meta, node) +} + +func (n *createRelationNode) Handle(msg message.Message) error { + return nil +} diff --git a/pkg/rule_engine/nodes/action_delay_node.go b/pkg/rule_engine/nodes/action_delay_node.go index 802d104..9ab5aff 100644 --- a/pkg/rule_engine/nodes/action_delay_node.go +++ b/pkg/rule_engine/nodes/action_delay_node.go @@ -1,3 +1,14 @@ +// Licensed under the Apache License, Version 2.0 (the "License"); you may +// not use p file except in compliance with the License. You may obtain +// a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations +// under the License. package nodes import ( diff --git a/pkg/rule_engine/nodes/action_delete_relation_node.go b/pkg/rule_engine/nodes/action_delete_relation_node.go new file mode 100644 index 0000000..7ed5a9f --- /dev/null +++ b/pkg/rule_engine/nodes/action_delete_relation_node.go @@ -0,0 +1,9 @@ +package nodes + +type deleteRelationNodeFactory struct{} + +func (f deleteRelationNodeFactory) Name() string { return "DeleteRelationNode" } +func (f deleteRelationNodeFactory) Category() string { return NODE_CATEGORY_ACTION } +func (f deleteRelationNodeFactory) Create(id string, meta Metadata) (Node, error) { + return nil, nil +} diff --git a/pkg/rule_engine/nodes/action_generator_node.go b/pkg/rule_engine/nodes/action_generator_node.go new file mode 100644 index 0000000..2b8a23f --- /dev/null +++ b/pkg/rule_engine/nodes/action_generator_node.go @@ -0,0 +1,38 @@ +package nodes + +import ( + "fmt" + "github.com/sirupsen/logrus" + "pandax/pkg/rule_engine/message" +) + +type messageGeneratorNode struct { + bareNode + DetailBuilderScript string `json:"detail_builder_script" yaml:"detail_builder_script"` + FrequenceInSecond int32 `json:"frequency" yaml:"frequency"` +} + +type messageGeneratorNodeFactory struct{} + +func (f messageGeneratorNodeFactory) Name() string { return "MessageGeneratorNode" } +func (f messageGeneratorNodeFactory) Category() string { return NODE_CATEGORY_ACTION } + +func (f messageGeneratorNodeFactory) Create(id string, meta Metadata) (Node, error) { + labels := []string{"Created", "Updated"} + node := &messageGeneratorNode{ + bareNode: newBareNode(f.Name(), id, meta, labels), + } + return decodePath(meta, node) +} + +func (n *messageGeneratorNode) Handle(msg message.Message) error { + logrus.Infof("%s handle message '%s'", n.Name(), msg.GetType()) + + createdLabelNode := n.GetLinkedNode("Created") + updatedLabelNode := n.GetLinkedNode("Updated") + if createdLabelNode == nil || updatedLabelNode == nil { + return fmt.Errorf("no valid label linked node in %s", n.Name()) + } + + return nil +} diff --git a/pkg/rule_engine/nodes/action_log_node.go b/pkg/rule_engine/nodes/action_log_node.go new file mode 100644 index 0000000..6712388 --- /dev/null +++ b/pkg/rule_engine/nodes/action_log_node.go @@ -0,0 +1,41 @@ +package nodes + +import ( + "fmt" + "log" + "pandax/pkg/rule_engine/message" +) + +type logNode struct { + bareNode + Script string +} + +type logNodeFactory struct{} + +func (f logNodeFactory) Name() string { return "LogNode" } +func (f logNodeFactory) Category() string { return NODE_CATEGORY_ACTION } +func (f logNodeFactory) Create(id string, meta Metadata) (Node, error) { + labels := []string{"Success", "Failure"} + node := &logNode{ + bareNode: newBareNode(f.Name(), id, meta, labels), + } + return decodePath(meta, node) +} + +func (n *logNode) Handle(msg message.Message) error { + successLableNode := n.GetLinkedNode("Success") + failureLableNode := n.GetLinkedNode("Failure") + + scriptEngine := NewScriptEngine() + logMessage, err := scriptEngine.ScriptToString(msg, n.Script) + + if successLableNode == nil || failureLableNode == nil { + return fmt.Errorf("no valid label linked node in %s", n.Name()) + } + if err != nil { + return failureLableNode.Handle(msg) + } + log.Println(logMessage) + return successLableNode.Handle(msg) +} diff --git a/pkg/rule_engine/nodes/action_rpc_call_reply_node.go b/pkg/rule_engine/nodes/action_rpc_call_reply_node.go new file mode 100644 index 0000000..64b5c07 --- /dev/null +++ b/pkg/rule_engine/nodes/action_rpc_call_reply_node.go @@ -0,0 +1,9 @@ +package nodes + +type rpcCallReplyNodeFactory struct{} + +func (f rpcCallReplyNodeFactory) Name() string { return "RPCCallReplyNode" } +func (f rpcCallReplyNodeFactory) Category() string { return NODE_CATEGORY_ACTION } +func (f rpcCallReplyNodeFactory) Create(id string, meta Metadata) (Node, error) { + return nil, nil +} diff --git a/pkg/rule_engine/nodes/action_rpc_call_request_node.go b/pkg/rule_engine/nodes/action_rpc_call_request_node.go new file mode 100644 index 0000000..e162009 --- /dev/null +++ b/pkg/rule_engine/nodes/action_rpc_call_request_node.go @@ -0,0 +1,14 @@ +package nodes + +type rPCCallRequestNode struct { + bareNode + TimeoutInSeconds int +} + +type rpcCallRequestNodeFactory struct{} + +func (f rpcCallRequestNodeFactory) Name() string { return "RPCCallRequestNode" } +func (f rpcCallRequestNodeFactory) Category() string { return NODE_CATEGORY_ACTION } +func (f rpcCallRequestNodeFactory) Create(id string, meta Metadata) (Node, error) { + return nil, nil +} diff --git a/pkg/rule_engine/nodes/action_save_attributes_node.go b/pkg/rule_engine/nodes/action_save_attributes_node.go new file mode 100644 index 0000000..5947ad8 --- /dev/null +++ b/pkg/rule_engine/nodes/action_save_attributes_node.go @@ -0,0 +1,35 @@ +package nodes + +import ( + "fmt" + "pandax/pkg/rule_engine/message" +) + +type SaveAttributesNode struct { + bareNode +} + +type saveAttributesNodeFactory struct{} + +func (f saveAttributesNodeFactory) Name() string { return "SaveAttributesNode" } +func (f saveAttributesNodeFactory) Category() string { return NODE_CATEGORY_ACTION } +func (f saveAttributesNodeFactory) Create(id string, meta Metadata) (Node, error) { + labels := []string{"Success", "Failure"} + node := &SaveAttributesNode{ + bareNode: newBareNode(f.Name(), id, meta, labels), + } + return decodePath(meta, node) +} + +func (n *SaveAttributesNode) Handle(msg message.Message) error { + successLableNode := n.GetLinkedNode("Success") + failureLableNode := n.GetLinkedNode("Failure") + if successLableNode == nil || failureLableNode == nil { + return fmt.Errorf("no valid label linked node in %s", n.Name()) + } + if msg.GetType() != "POST_ATTRIBUTES_REQUEST" { + return failureLableNode.Handle(msg) + } + + return nil +} diff --git a/pkg/rule_engine/nodes/action_save_timeseries_node.go b/pkg/rule_engine/nodes/action_save_timeseries_node.go new file mode 100644 index 0000000..bfb96b8 --- /dev/null +++ b/pkg/rule_engine/nodes/action_save_timeseries_node.go @@ -0,0 +1,9 @@ +package nodes + +type saveTimeSeriesNodeFactory struct{} + +func (f saveTimeSeriesNodeFactory) Name() string { return "SaveTimeSeriesNode" } +func (f saveTimeSeriesNodeFactory) Category() string { return NODE_CATEGORY_ACTION } +func (f saveTimeSeriesNodeFactory) Create(id string, meta Metadata) (Node, error) { + return nil, nil +} diff --git a/pkg/rule_engine/nodes/action_unassign_from_customer_node.go b/pkg/rule_engine/nodes/action_unassign_from_customer_node.go new file mode 100644 index 0000000..7576d27 --- /dev/null +++ b/pkg/rule_engine/nodes/action_unassign_from_customer_node.go @@ -0,0 +1,9 @@ +package nodes + +type unassignFromCustomerNodeFactory struct{} + +func (f unassignFromCustomerNodeFactory) Name() string { return "UnassignFromCustomerNode" } +func (f unassignFromCustomerNodeFactory) Category() string { return NODE_CATEGORY_ACTION } +func (f unassignFromCustomerNodeFactory) Create(id string, meta Metadata) (Node, error) { + return nil, nil +} diff --git a/pkg/rule_engine/nodes/enrichment_customer_attr_node.go b/pkg/rule_engine/nodes/enrichment_customer_attr_node.go new file mode 100644 index 0000000..0c9145a --- /dev/null +++ b/pkg/rule_engine/nodes/enrichment_customer_attr_node.go @@ -0,0 +1,23 @@ +package nodes + +import "pandax/pkg/rule_engine/message" + +type enrichmentCustomerNode struct { + bareNode +} + +type enrichmentCustomerAttrNodeFactory struct{} + +func (f enrichmentCustomerAttrNodeFactory) Name() string { return "EnrichmentCustomerNode" } +func (f enrichmentCustomerAttrNodeFactory) Category() string { return NODE_CATEGORY_ENRICHMENT } +func (f enrichmentCustomerAttrNodeFactory) Create(id string, meta Metadata) (Node, error) { + labels := []string{"Success", "Failure"} + node := &enrichmentCustomerNode{ + bareNode: newBareNode(f.Name(), id, meta, labels), + } + return decodePath(meta, node) +} + +func (n *enrichmentCustomerNode) Handle(msg message.Message) error { + return nil +} diff --git a/pkg/rule_engine/nodes/enrichment_device_attr_node.go b/pkg/rule_engine/nodes/enrichment_device_attr_node.go new file mode 100644 index 0000000..35d414e --- /dev/null +++ b/pkg/rule_engine/nodes/enrichment_device_attr_node.go @@ -0,0 +1,9 @@ +package nodes + +type enrichmentDeviceAttrNodeFactory struct{} + +func (f enrichmentDeviceAttrNodeFactory) Name() string { return "EnrichmentDeviceAttrbute" } +func (f enrichmentDeviceAttrNodeFactory) Category() string { return NODE_CATEGORY_ENRICHMENT } +func (f enrichmentDeviceAttrNodeFactory) Create(id string, meta Metadata) (Node, error) { + return nil, nil +} diff --git a/pkg/rule_engine/nodes/enrichment_originator_attr_node.go b/pkg/rule_engine/nodes/enrichment_originator_attr_node.go new file mode 100644 index 0000000..046d552 --- /dev/null +++ b/pkg/rule_engine/nodes/enrichment_originator_attr_node.go @@ -0,0 +1,9 @@ +package nodes + +type enrichmentOriginatorAttrNodeFactory struct{} + +func (f enrichmentOriginatorAttrNodeFactory) Name() string { return "EnrichmentOriginatorAttribute" } +func (f enrichmentOriginatorAttrNodeFactory) Category() string { return NODE_CATEGORY_ENRICHMENT } +func (f enrichmentOriginatorAttrNodeFactory) Create(id string, meta Metadata) (Node, error) { + return nil, nil +} diff --git a/pkg/rule_engine/nodes/enrichment_originator_fields_node.go b/pkg/rule_engine/nodes/enrichment_originator_fields_node.go new file mode 100644 index 0000000..0b36b1f --- /dev/null +++ b/pkg/rule_engine/nodes/enrichment_originator_fields_node.go @@ -0,0 +1,9 @@ +package nodes + +type enrichmentOriginatorFieldsNodeFactory struct{} + +func (f enrichmentOriginatorFieldsNodeFactory) Name() string { return "EnrichmentOriginatorFieldsNode" } +func (f enrichmentOriginatorFieldsNodeFactory) Category() string { return NODE_CATEGORY_ENRICHMENT } +func (f enrichmentOriginatorFieldsNodeFactory) Create(id string, meta Metadata) (Node, error) { + return nil, nil +} diff --git a/pkg/rule_engine/nodes/enrichment_originator_telemetry_node.go b/pkg/rule_engine/nodes/enrichment_originator_telemetry_node.go new file mode 100644 index 0000000..a95ec00 --- /dev/null +++ b/pkg/rule_engine/nodes/enrichment_originator_telemetry_node.go @@ -0,0 +1,11 @@ +package nodes + +type enrichmentOriginatorTelemetryNodeFactory struct{} + +func (f enrichmentOriginatorTelemetryNodeFactory) Name() string { + return "EnrichmentOriginatorTelemetryNode" +} +func (f enrichmentOriginatorTelemetryNodeFactory) Category() string { return NODE_CATEGORY_ENRICHMENT } +func (f enrichmentOriginatorTelemetryNodeFactory) Create(id string, meta Metadata) (Node, error) { + return nil, nil +} diff --git a/pkg/rule_engine/nodes/enrichment_tenant_attr.go b/pkg/rule_engine/nodes/enrichment_tenant_attr.go new file mode 100644 index 0000000..30bde52 --- /dev/null +++ b/pkg/rule_engine/nodes/enrichment_tenant_attr.go @@ -0,0 +1,17 @@ +package nodes + +type enrichmentTenantNode struct { + bareNode +} + +type enrichmentTenantNodeFactory struct{} + +func (f enrichmentTenantNodeFactory) Name() string { return "EnrichmentTenantNode" } +func (f enrichmentTenantNodeFactory) Category() string { return NODE_CATEGORY_ENRICHMENT } +func (f enrichmentTenantNodeFactory) Create(id string, meta Metadata) (Node, error) { + labels := []string{"Success", "Failure"} + node := &enrichmentTenantNode{ + bareNode: newBareNode(f.Name(), id, meta, labels), + } + return decodePath(meta, node) +} diff --git a/pkg/rule_engine/nodes/external_mqtt_node.go b/pkg/rule_engine/nodes/external_mqtt_node.go new file mode 100644 index 0000000..5c66ff1 --- /dev/null +++ b/pkg/rule_engine/nodes/external_mqtt_node.go @@ -0,0 +1,64 @@ +package nodes + +import ( + "fmt" + "pandax/pkg/rule_engine/message" + "time" + + mqtt "github.com/eclipse/paho.mqtt.golang" + "github.com/sirupsen/logrus" +) + +type externalMqttNode struct { + bareNode + TopicPattern string + Host string + Port string + ConnectTimeoutSec int + ClientId string + CleanSession bool + Ssl bool + MqttCli mqtt.Client +} + +type externalMqttNodeFactory struct{} + +func (f externalMqttNodeFactory) Name() string { return "ExternalMqttNode" } +func (f externalMqttNodeFactory) Category() string { return NODE_CATEGORY_EXTERNAL } +func (f externalMqttNodeFactory) Create(id string, meta Metadata) (Node, error) { + labels := []string{"Success", "Failure"} + + node := &externalMqttNode{ + bareNode: newBareNode(f.Name(), id, meta, labels), + } + broker := fmt.Sprintf("tcp://%s:%s", node.Host, node.Port) + opts := mqtt.NewClientOptions().AddBroker(broker) + opts.SetClientID(node.ClientId) + opts.SetCleanSession(node.CleanSession) + opts.SetConnectTimeout(time.Duration(node.ConnectTimeoutSec) * time.Second) + node.MqttCli = mqtt.NewClient(opts) + + if token := node.MqttCli.Connect(); token.Wait() && token.Error() != nil { + logrus.WithError(token.Error()) + return nil, token.Error() + } + return decodePath(meta, node) +} + +func (n *externalMqttNode) Handle(msg message.Message) error { + logrus.Infof("%s handle message '%s'", n.Name(), msg.GetType()) + + successLabelNode := n.GetLinkedNode("Success") + failureLabelNode := n.GetLinkedNode("Failure") + topic := n.TopicPattern //need fix add msg.metadata in it + sendmqttmsg, err := msg.MarshalBinary() + if err != nil { + return nil + } + token := n.MqttCli.Publish(topic, 1, false, sendmqttmsg) + if token.Wait() && token.Error() != nil { + fmt.Println(token.Error()) + return failureLabelNode.Handle(msg) + } + return successLabelNode.Handle(msg) +} diff --git a/pkg/rule_engine/nodes/external_restapi_node.go b/pkg/rule_engine/nodes/external_restapi_node.go new file mode 100644 index 0000000..8457b7e --- /dev/null +++ b/pkg/rule_engine/nodes/external_restapi_node.go @@ -0,0 +1,26 @@ +package nodes + +type externalRestapiNode struct { + bareNode + RestEndpointUrlPattern string + RequestMethod string + headers map[string]string + UseSimpleClientHttpFactory bool + ReadTimeoutMs int + MaxParallelRequestsCount int + UseRedisQueueForMsgPersistence bool + trimQueue bool + MaxQueueSize int +} + +type externalRestapiNodeFactory struct{} + +func (f externalRestapiNodeFactory) Name() string { return "ExternalRestapiNode" } +func (f externalRestapiNodeFactory) Category() string { return NODE_CATEGORY_EXTERNAL } +func (f externalRestapiNodeFactory) Create(id string, meta Metadata) (Node, error) { + labels := []string{"True", "False"} + node := &externalRestapiNode{ + bareNode: newBareNode(f.Name(), id, meta, labels), + } + return decodePath(meta, node) +} diff --git a/pkg/rule_engine/nodes/factory.go b/pkg/rule_engine/nodes/factory.go index b607f7c..3997e35 100644 --- a/pkg/rule_engine/nodes/factory.go +++ b/pkg/rule_engine/nodes/factory.go @@ -11,6 +11,7 @@ const ( NODE_CATEGORY_TRANSFORM = "transform" NODE_CATEGORY_EXTERNAL = "external" NODE_CATEGORY_OTHERS = "others" + NODE_CATEGORY_FLOWS = "flows" ) // Factory is node's factory to create node based on metadata diff --git a/pkg/rule_engine/nodes/filter_check_existence_fields_node.go b/pkg/rule_engine/nodes/filter_check_existence_fields_node.go new file mode 100644 index 0000000..a737654 --- /dev/null +++ b/pkg/rule_engine/nodes/filter_check_existence_fields_node.go @@ -0,0 +1,34 @@ +package nodes + +import ( + "github.com/sirupsen/logrus" + "pandax/pkg/rule_engine/message" +) + +//检查关联关系 +//该消息来自与哪个实体或到那个实体 +type checkExistenceFieldsNode struct { + bareNode +} + +type checkExistenceFieldsNodeFactory struct{} + +func (f checkExistenceFieldsNodeFactory) Name() string { return "CheckExistenceFieldsNode" } +func (f checkExistenceFieldsNodeFactory) Category() string { return NODE_CATEGORY_FILTER } + +func (f checkExistenceFieldsNodeFactory) Create(id string, meta Metadata) (Node, error) { + labels := []string{"True", "False"} + node := &checkExistenceFieldsNode{ + bareNode: newBareNode(f.Name(), id, meta, labels), + } + return decodePath(meta, node) +} + +func (n *checkExistenceFieldsNode) Handle(msg message.Message) error { + logrus.Infof("%s handle message '%s'", n.Name(), msg.GetType()) + + //trueLabelNode := n.GetLinkedNode("True") + //falseLabelNode := n.GetLinkedNode("False") + + return nil +} diff --git a/pkg/rule_engine/nodes/filter_check_relation_node.go b/pkg/rule_engine/nodes/filter_check_relation_node.go new file mode 100644 index 0000000..e33ecee --- /dev/null +++ b/pkg/rule_engine/nodes/filter_check_relation_node.go @@ -0,0 +1,67 @@ +package nodes + +import ( + "github.com/sirupsen/logrus" + "pandax/pkg/rule_engine/message" +) + +const ( + RelationTypeContains = "Contains" + RelationTypeNotContains = "NotContains" +) + +//检查关联关系 +//该消息来自与哪个实体或到那个实体 +type checkRelationFilterNode struct { + bareNode + Direction string `json:"direction" yaml:"direction"` + RelationType string `json:"relationType" yaml:"relationType"` + InstanceType string `json:"instanceType" yaml:"instanceType"` + Values []string `json:"values" yaml:"values"` +} + +type checkRelationFilterNodeFactory struct{} + +func (f checkRelationFilterNodeFactory) Name() string { return "CheckRelationFilterNode" } +func (f checkRelationFilterNodeFactory) Category() string { return NODE_CATEGORY_FILTER } + +func (f checkRelationFilterNodeFactory) Create(id string, meta Metadata) (Node, error) { + labels := []string{"True", "False"} + node := &checkRelationFilterNode{ + bareNode: newBareNode(f.Name(), id, meta, labels), + Values: []string{}, + } + return decodePath(meta, node) +} + +func (n *checkRelationFilterNode) Handle(msg message.Message) error { + logrus.Infof("%s handle message '%s'", n.Name(), msg.GetType()) + + trueLabelNode := n.GetLinkedNode("True") + falseLabelNode := n.GetLinkedNode("False") + + //direction := msg.GetDirection() + attr := msg.GetMetadata().GetKeyValue(n.InstanceType) + switch n.RelationType { + case RelationTypeContains: + for _, val := range n.Values { + // specified attribute exist in names + if attr == val { + return trueLabelNode.Handle(msg) + } + } + // not found + return falseLabelNode.Handle(msg) + + case RelationTypeNotContains: + for _, val := range n.Values { + // specified attribute exist in names + if attr == val { + return falseLabelNode.Handle(msg) + } + } + // not found + return trueLabelNode.Handle(msg) + } + return nil +} diff --git a/pkg/rule_engine/nodes/filter_message_type_node.go b/pkg/rule_engine/nodes/filter_message_type_node.go new file mode 100644 index 0000000..a61ff92 --- /dev/null +++ b/pkg/rule_engine/nodes/filter_message_type_node.go @@ -0,0 +1,47 @@ +package nodes + +import ( + "fmt" + "github.com/sirupsen/logrus" + "pandax/pkg/rule_engine/message" +) + +type messageTypeFilterNode struct { + bareNode + MessageTypes []string `json:"messageTypes" yaml:"messageTypes"` +} + +type messageTypeFilterNodeFactory struct{} + +func (f messageTypeFilterNodeFactory) Name() string { return "MessageTypeFilterNode" } +func (f messageTypeFilterNodeFactory) Category() string { return NODE_CATEGORY_FILTER } + +func (f messageTypeFilterNodeFactory) Create(id string, meta Metadata) (Node, error) { + labels := []string{"True", "False"} + node := &messageTypeFilterNode{ + bareNode: newBareNode(f.Name(), id, meta, labels), + MessageTypes: []string{}, + } + return decodePath(meta, node) +} + +func (n *messageTypeFilterNode) Handle(msg message.Message) error { + logrus.Infof("%s handle message '%s'", n.Name(), msg.GetType()) + + trueLabelNode := n.GetLinkedNode("True") + falseLabelNode := n.GetLinkedNode("False") + if trueLabelNode == nil || falseLabelNode == nil { + return fmt.Errorf("no true or false label linked node in %s", n.Name()) + } + messageType := msg.GetType() + + // TODO: how to resolve user customized message type dynamically + //userMessageType := msg.GetMetadata().GetKeyValue(n.Metadata().MessageTypeKey) + userMessageType := "TODO" + for _, filterType := range n.MessageTypes { + if filterType == messageType || filterType == userMessageType { + return trueLabelNode.Handle(msg) + } + } + return falseLabelNode.Handle(msg) +} diff --git a/pkg/rule_engine/nodes/filter_message_type_switch_node.go b/pkg/rule_engine/nodes/filter_message_type_switch_node.go new file mode 100644 index 0000000..d462180 --- /dev/null +++ b/pkg/rule_engine/nodes/filter_message_type_switch_node.go @@ -0,0 +1,46 @@ +package nodes + +import ( + "fmt" + "pandax/pkg/rule_engine/message" + + "github.com/sirupsen/logrus" +) + +type messageTypeSwitchNode struct { + bareNode +} +type messageTypeSwitchNodeFactory struct{} + +func (f messageTypeSwitchNodeFactory) Name() string { return "MessageTypeSwitchNode" } +func (f messageTypeSwitchNodeFactory) Category() string { return NODE_CATEGORY_FILTER } + +func (f messageTypeSwitchNodeFactory) Create(id string, meta Metadata) (Node, error) { + labels := []string{"True", "False"} + node := &messageTypeSwitchNode{ + bareNode: newBareNode(f.Name(), id, meta, labels), + } + return decodePath(meta, node) +} + +func (n *messageTypeSwitchNode) Handle(msg message.Message) error { + logrus.Infof("%s handle message '%s'", n.Name(), msg.GetType()) + + nodes := n.GetLinkedNodes() + messageType := msg.GetType() + messageTypeKey, _ := n.Metadata().Value(NODE_CONFIG_MESSAGE_TYPE_KEY) + userMessageType := msg.GetMetadata().GetKeyValue(messageTypeKey.(string)) + + for label, node := range nodes { + if messageType == label || userMessageType == label { + return node.Handle(msg) + } + } + // if other label exist + if node := n.GetLinkedNode("Other"); node != nil { + return node.Handle(msg) + } + + // not found + return fmt.Errorf("%s no label to handle message", n.Name()) +} diff --git a/pkg/rule_engine/nodes/filter_originator_type_node.go b/pkg/rule_engine/nodes/filter_originator_type_node.go new file mode 100644 index 0000000..cb4b1f5 --- /dev/null +++ b/pkg/rule_engine/nodes/filter_originator_type_node.go @@ -0,0 +1,43 @@ +package nodes + +import ( + "github.com/sirupsen/logrus" + "pandax/pkg/rule_engine/message" +) + +type originatorTypeFilterNode struct { + bareNode + Filters []string `json:"filters" yaml:"filters"` +} + +type originatorFilterNodeFactory struct{} + +func (f originatorFilterNodeFactory) Name() string { return "OriginatorFilterNode" } +func (f originatorFilterNodeFactory) Category() string { return NODE_CATEGORY_FILTER } + +func (f originatorFilterNodeFactory) Create(id string, meta Metadata) (Node, error) { + labels := []string{"True", "False"} + node := &originatorTypeFilterNode{ + bareNode: newBareNode(f.Name(), id, meta, labels), + Filters: []string{}, + } + return decodePath(meta, node) +} + +func (n *originatorTypeFilterNode) Handle(msg message.Message) error { + logrus.Infof("%s handle message '%s'", n.Name(), msg.GetType()) + + trueLabelNode := n.GetLinkedNode("True") + falseLabelNode := n.GetLinkedNode("False") + + //links := n.GetLinks() + originatorType := msg.GetOriginator() + + for _, filter := range n.Filters { + if originatorType == filter { + return trueLabelNode.Handle(msg) + } + } + // not found + return falseLabelNode.Handle(msg) +} diff --git a/pkg/rule_engine/nodes/filter_originator_type_switch_node.go b/pkg/rule_engine/nodes/filter_originator_type_switch_node.go new file mode 100644 index 0000000..d71191b --- /dev/null +++ b/pkg/rule_engine/nodes/filter_originator_type_switch_node.go @@ -0,0 +1,39 @@ +package nodes + +import ( + "fmt" + "github.com/sirupsen/logrus" + "pandax/pkg/rule_engine/message" +) + +type originatorTypeSwitchNode struct { + bareNode +} + +type originatorTypeSwitchNodeFactory struct{} + +func (f originatorTypeSwitchNodeFactory) Name() string { return "OriginatorTypeSwitchNode" } +func (f originatorTypeSwitchNodeFactory) Category() string { return NODE_CATEGORY_FILTER } + +func (f originatorTypeSwitchNodeFactory) Create(id string, meta Metadata) (Node, error) { + labels := []string{} + node := &originatorTypeSwitchNode{ + bareNode: newBareNode(f.Name(), id, meta, labels), + } + return decodePath(meta, node) +} + +func (n *originatorTypeSwitchNode) Handle(msg message.Message) error { + logrus.Infof("%s handle message '%s'", n.Name(), msg.GetType()) + + nodes := n.GetLinkedNodes() + originatorType := msg.GetOriginator() + + for label, node := range nodes { + if originatorType == label { + return node.Handle(msg) + } + } + // not found + return fmt.Errorf("%s no label to handle message", n.Name()) +} diff --git a/pkg/rule_engine/nodes/filter_script_node.go b/pkg/rule_engine/nodes/filter_script_node.go new file mode 100644 index 0000000..48516ab --- /dev/null +++ b/pkg/rule_engine/nodes/filter_script_node.go @@ -0,0 +1,39 @@ +package nodes + +import ( + "github.com/sirupsen/logrus" + "pandax/pkg/rule_engine/message" +) + +const ScriptFilterNodeName = "ScriptFilterNode" + +type scriptFilterNode struct { + bareNode + Scripts string `json:"scripts" yaml:"scripts"` +} + +type scriptFilterNodeFactory struct{} + +func (f scriptFilterNodeFactory) Name() string { return "ScriptFilterNode" } +func (f scriptFilterNodeFactory) Category() string { return NODE_CATEGORY_FILTER } + +func (f scriptFilterNodeFactory) Create(id string, meta Metadata) (Node, error) { + labels := []string{"True", "False"} + node := &scriptFilterNode{ + bareNode: newBareNode(f.Name(), id, meta, labels), + } + return decodePath(meta, node) +} + +func (n *scriptFilterNode) Handle(msg message.Message) error { + logrus.Infof("%s handle message '%s'", n.Name(), msg.GetType()) + + trueLabelNode := n.GetLinkedNode("True") + falseLabelNode := n.GetLinkedNode("False") + scriptEngine := NewScriptEngine() + isTrue, error := scriptEngine.ScriptOnFilter(msg, n.Scripts) + if isTrue == true && error == nil { + return trueLabelNode.Handle(msg) + } + return falseLabelNode.Handle(msg) +} diff --git a/pkg/rule_engine/nodes/filter_switch_node.go b/pkg/rule_engine/nodes/filter_switch_node.go index 722e76c..efe04a1 100644 --- a/pkg/rule_engine/nodes/filter_switch_node.go +++ b/pkg/rule_engine/nodes/filter_switch_node.go @@ -1,3 +1,14 @@ +// Licensed under the Apache License, Version 2.0 (the "License"); you may +// not use p file except in compliance with the License. You may obtain +// a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations +// under the License. package nodes import ( @@ -26,7 +37,7 @@ func (f switchFilterNodeFactory) Create(id string, meta Metadata) (Node, error) func (n *switchFilterNode) Handle(msg message.Message) error { logrus.Infof("%s handle message '%s'", n.Name(), msg.GetType()) - /*scriptEngine := NewScriptEngine() + scriptEngine := NewScriptEngine() SwitchResults, err := scriptEngine.ScriptOnSwitch(msg, n.Scripts) if err != nil { return nil @@ -38,6 +49,6 @@ func (n *switchFilterNode) Handle(msg message.Message) error { return node.Handle(msg) } } - }*/ + } return nil } diff --git a/pkg/rule_engine/nodes/init.go b/pkg/rule_engine/nodes/init.go index d8d4d7b..bbf47a2 100644 --- a/pkg/rule_engine/nodes/init.go +++ b/pkg/rule_engine/nodes/init.go @@ -3,4 +3,6 @@ package nodes // init register all node's factory func init() { RegisterFactory(inputNodeFactory{}) + RegisterFactory(switchFilterNodeFactory{}) + RegisterFactory(delayNodeFactory{}) } diff --git a/pkg/rule_engine/nodes/metadata.go b/pkg/rule_engine/nodes/metadata.go index 1af89c4..84c38c6 100644 --- a/pkg/rule_engine/nodes/metadata.go +++ b/pkg/rule_engine/nodes/metadata.go @@ -2,7 +2,7 @@ package nodes import ( "fmt" - "github.com/XM-GO/PandaKit/utils" + "github.com/mitchellh/mapstructure" ) const ( @@ -58,5 +58,6 @@ func (c *nodeMetadata) With(key string, val interface{}) Metadata { } func (c *nodeMetadata) DecodePath(rawVal interface{}) error { - return utils.Map2Struct(c.keypairs, rawVal) + //return utils.Map2Struct(c.keypairs, rawVal) + return mapstructure.Decode(c.keypairs, rawVal) } diff --git a/pkg/rule_engine/nodes/script_engine.go b/pkg/rule_engine/nodes/script_engine.go new file mode 100644 index 0000000..9bd29b0 --- /dev/null +++ b/pkg/rule_engine/nodes/script_engine.go @@ -0,0 +1,39 @@ +package nodes + +import "pandax/pkg/rule_engine/message" + +type ScriptEngine interface { + ScriptOnMessage(msg message.Message, script string) (message.Message, error) + //used by filter_switch_node + ScriptOnSwitch(msg message.Message, script string) ([]string, error) + //used by filter_script_node + ScriptOnFilter(msg message.Message, script string) (bool, error) + ScriptToString(msg message.Message, script string) (string, error) +} + +type baseScriptEngine struct { +} + +func NewScriptEngine() ScriptEngine { + return &baseScriptEngine{} +} + +func (bse *baseScriptEngine) ScriptOnMessage(msg message.Message, script string) (message.Message, error) { + + return nil, nil +} + +func (bse *baseScriptEngine) ScriptOnSwitch(msg message.Message, script string) ([]string, error) { + + return nil, nil +} + +func (bse *baseScriptEngine) ScriptOnFilter(msg message.Message, script string) (bool, error) { + + return false, nil +} + +func (bse *baseScriptEngine) ScriptToString(msg message.Message, script string) (string, error) { + + return "", nil +} diff --git a/pkg/rule_engine/nodes/transform_change_originator_node.go b/pkg/rule_engine/nodes/transform_change_originator_node.go new file mode 100644 index 0000000..c7aefc4 --- /dev/null +++ b/pkg/rule_engine/nodes/transform_change_originator_node.go @@ -0,0 +1,43 @@ +package nodes + +import ( + "github.com/sirupsen/logrus" + "pandax/pkg/rule_engine/message" +) + +type transformChangeOriginatorNode struct { + bareNode + OriginatorSource string `json:"originatorSource" yaml:"originatorSource"` + Direction string `json:"direction" yaml:"direction"` + MaxRelationLevel int `json:"maxRelationLevel" yaml:"maxRelationLevel"` + //RelationFilters []runtime.RelationFilter `json:"relationFilters" yaml:"relationFilters"` +} + +type transformChangeOriginatorNodeFactory struct{} + +func (f transformChangeOriginatorNodeFactory) Name() string { return "TransformChangeOriginatorNode" } +func (f transformChangeOriginatorNodeFactory) Category() string { return NODE_CATEGORY_TRANSFORM } + +func (f transformChangeOriginatorNodeFactory) Create(id string, meta Metadata) (Node, error) { + labels := []string{"Success", "Failure"} + node := &transformChangeOriginatorNode{ + bareNode: newBareNode(f.Name(), id, meta, labels), + //RelationFilters: []runtime.RelationFilter{}, + } + return decodePath(meta, node) +} + +func (n *transformChangeOriginatorNode) Handle(msg message.Message) error { + logrus.Infof("%s handle message '%s'", n.Name(), msg.GetType()) + + //successLabelNode := n.GetLinkedNode("Sucess") + failureLabelNode := n.GetLinkedNode("Failure") + //relationQuery := runtime.NewRelationQuery() + + //entities := relationQuery.QueryEntities(n.Direction, n.MaxRelationLevel, n.RelationFilters) + /*if len(entities) > 0 && entities[0] == msg.GetOriginator() { + msg.SetOriginator(entities[0]) + return successLabelNode.Handle(msg) + }*/ + return failureLabelNode.Handle(msg) +} diff --git a/pkg/rule_engine/nodes/transform_script_node.go b/pkg/rule_engine/nodes/transform_script_node.go new file mode 100644 index 0000000..134511f --- /dev/null +++ b/pkg/rule_engine/nodes/transform_script_node.go @@ -0,0 +1,38 @@ +package nodes + +import ( + "github.com/sirupsen/logrus" + "pandax/pkg/rule_engine/message" +) + +type transformScriptNode struct { + bareNode + Script string `json:"script" yaml:"script"` +} + +type transformScriptNodeFactory struct{} + +func (f transformScriptNodeFactory) Name() string { return "TransformScriptNode" } +func (f transformScriptNodeFactory) Category() string { return NODE_CATEGORY_TRANSFORM } + +func (f transformScriptNodeFactory) Create(id string, meta Metadata) (Node, error) { + labels := []string{"Success", "Failure"} + node := &transformScriptNode{ + bareNode: newBareNode(f.Name(), id, meta, labels), + } + return decodePath(meta, node) +} + +func (n *transformScriptNode) Handle(msg message.Message) error { + logrus.Infof("%s handle message '%s'", n.Name(), msg.GetType()) + + successLabelNode := n.GetLinkedNode("Success") + failureLabelNode := n.GetLinkedNode("Failure") + + scriptEngine := NewScriptEngine() + newMessage, err := scriptEngine.ScriptOnMessage(msg, n.Script) + if err != nil { + return failureLabelNode.Handle(msg) + } + return successLabelNode.Handle(newMessage) +} diff --git a/pkg/rule_engine/nodes/transform_to_email_node.go b/pkg/rule_engine/nodes/transform_to_email_node.go new file mode 100644 index 0000000..a5608ee --- /dev/null +++ b/pkg/rule_engine/nodes/transform_to_email_node.go @@ -0,0 +1,51 @@ +package nodes + +import ( + "github.com/sirupsen/logrus" + "pandax/pkg/rule_engine/message" +) + +type transformToEmailNode struct { + bareNode + From string `json:"from" yaml:"from"` + To string `json:"to" yaml:"to"` + Cc string `json:"cc" yaml:"cc"` + Bcc string `json:"bcc" yaml:"bcc"` + Subject string `json:"subject" yaml:"subject"` + Body string `json:"body" yaml:"body"` +} + +type transformToEmailNodeFactory struct{} + +func (f transformToEmailNodeFactory) Name() string { return "TransformToEmailNode" } +func (f transformToEmailNodeFactory) Category() string { return NODE_CATEGORY_TRANSFORM } + +func (f transformToEmailNodeFactory) Create(id string, meta Metadata) (Node, error) { + labels := []string{"Success", "Failure"} + + node := &transformToEmailNode{ + bareNode: newBareNode(f.Name(), id, meta, labels), + } + return decodePath(meta, node) +} + +func (n *transformToEmailNode) Handle(msg message.Message) error { + logrus.Infof("%s handle message '%s'", n.Name(), msg.GetType()) + + successLabelNode := n.GetLinkedNode("Success") + //failureLabelNode := n.GetLinkedNode("Failure") + + /*dialer := runtime.NewDialer(runtime.EMAIL) + variables := map[string]string{ + "from": n.From, + "to": n.To, + "cc": n.Cc, + "bcc": n.Bcc, + "subject": n.Subject, + "body": n.Body, + } + if err := dialer.DialAndSend(msg.GetMetadata(), variables); err != nil { + return failureLabelNode.Handle(msg) + }*/ + return successLabelNode.Handle(msg) +}