[feat]添加规则引擎debug功能

This commit is contained in:
PandaX
2023-10-27 16:13:17 +08:00
parent 0fcb262519
commit 28e0bcbe1c
34 changed files with 298 additions and 255 deletions

1
.idea/vcs.xml generated
View File

@@ -2,6 +2,5 @@
<project version="4">
<component name="VcsDirectoryMappings">
<mapping directory="$PROJECT_DIR$" vcs="Git" />
<mapping directory="$PROJECT_DIR$/pkg/rule_engine" vcs="Git" />
</component>
</project>

View File

@@ -0,0 +1,33 @@
package rule_engine
import (
"pandax/pkg/rule_engine/message"
"pandax/pkg/rule_engine/nodes"
)
func GetCategory() []map[string]interface{} {
return nodes.GetCategory()
}
func GetDebugData(ruleId, nodeId string) []message.DebugData {
if data, ok := ruleChainDebugData.Data[ruleId]; ok {
return data.Get(nodeId).Items
}
return nil
}
func GetDebugDataPage(page, pageSize int, ruleId, nodeId string) (int64, []message.DebugData) {
if page < 1 {
page = 1
}
offset := pageSize * (page - 1)
if data, ok := ruleChainDebugData.Data[ruleId]; ok {
total := len(data.Get(nodeId).Items)
end := offset + pageSize
if end >= total {
end = total - 1
}
return int64(total), data.Get(nodeId).Items[offset:end]
}
return 0, nil
}

View File

@@ -3,17 +3,21 @@ package rule_engine
import (
"context"
"github.com/sirupsen/logrus"
"pandax/pkg/global"
"pandax/pkg/rule_engine/manifest"
"pandax/pkg/rule_engine/message"
"pandax/pkg/rule_engine/nodes"
)
var ruleChainDebugData = message.NewRuleChainDebugData(100)
type ruleChainInstance struct {
ruleId string
firstRuleNodeId string
nodes map[string]nodes.Node
}
func NewRuleChainInstance(data []byte) (*ruleChainInstance, []error) {
func NewRuleChainInstance(ruleId string, data []byte) (*ruleChainInstance, []error) {
errors := make([]error, 0)
manifest, err := manifest.New(data)
@@ -22,7 +26,12 @@ func NewRuleChainInstance(data []byte) (*ruleChainInstance, []error) {
logrus.WithError(err).Errorf("invalidi manifest file")
return nil, errors
}
return newInstanceWithManifest(manifest)
withManifest, errs := newInstanceWithManifest(manifest)
if len(errs) > 0 {
return nil, errs
}
withManifest.ruleId = ruleId
return withManifest, nil
}
// newWithManifest create rule chain by user's manifest file
@@ -42,8 +51,24 @@ func newInstanceWithManifest(m *manifest.Manifest) (*ruleChainInstance, []error)
// StartRuleChain TODO 是否需要添加context
func (c *ruleChainInstance) StartRuleChain(context context.Context, message *message.Message) error {
// 处理debug的通道消息
go func() {
for {
select {
case debugMsg := <-message.DeBugChan:
// 保存到tdengine时序数据库中
ruleChainDebugData.Add(c.ruleId, debugMsg.NodeId, debugMsg)
case <-message.EndDeBugChan:
global.Log.Debug("规则链%s,执行结束", message.Id)
return
}
}
}()
if node, found := c.nodes[c.firstRuleNodeId]; found {
return node.Handle(message)
err := node.Handle(message)
message.EndDeBugChan <- struct{}{}
return err
}
return nil
}

View File

@@ -3,6 +3,7 @@ package message
import (
"encoding/json"
"github.com/google/uuid"
"github.com/sirupsen/logrus"
"time"
)
@@ -26,30 +27,59 @@ const (
MONITOR = "MONITOR" //监控
)
const (
DEBUGIN = "In"
DEBUGOUT = "Out"
)
type Msg map[string]interface{}
type Metadata map[string]interface{}
type Message struct {
Id string //uuid 消息Id
Ts time.Time //时间戳
MsgType string //消息类型, attributes参数telemetry遥测Connect连接事件
User string //客户 设备发布人 设备所有者
Msg Msg //数据 数据结构JSON 设备原始数据 msg
Metadata Metadata //消息的元数据 包括设备Id设备类型产品ID等
Id string //uuid 消息Id
Ts time.Time //时间戳
MsgType string //消息类型, attributes参数telemetry遥测Connect连接事件
User string //客户 设备发布人 设备所有者
Msg Msg //数据 数据结构JSON 设备原始数据 msg
Metadata Metadata //消息的元数据 包括设备Id设备类型产品ID等
DeBugChan chan DebugData
EndDeBugChan chan struct{}
}
// NewMessage ...
func NewMessage(user, messageType string, msg Msg, metadata Metadata) *Message {
return &Message{
Id: uuid.New().String(),
Ts: time.Now(),
User: user,
MsgType: messageType,
Msg: msg,
Metadata: metadata,
Id: uuid.New().String(),
Ts: time.Now(),
User: user,
MsgType: messageType,
Msg: msg,
Metadata: metadata,
DeBugChan: make(chan DebugData, 100),
EndDeBugChan: make(chan struct{}),
}
}
func (t *Message) Debug(nodeId, nodeName, debugType, error string) {
if debugType == DEBUGIN {
logrus.Infof("%s handle message '%s'", nodeName, t.MsgType)
}
debug := DebugData{
Ts: time.Now().Format("2006-01-02 15:04:05.000"),
NodeId: nodeId,
MsgId: t.Id,
DebugType: debugType,
MsgType: t.MsgType,
Msg: t.Msg,
Metadata: t.Metadata,
Error: error,
}
if deviceName, ok := t.Metadata.GetValue("deviceName").(string); ok {
debug.DeviceName = deviceName
}
t.DeBugChan <- debug
}
func (t *Message) GetAllMap() map[string]interface{} {
data := make(map[string]interface{})
for msgKey, msgValue := range t.Msg {

View File

@@ -2,7 +2,6 @@ package nodes
import (
"encoding/json"
"github.com/sirupsen/logrus"
"pandax/apps/device/services"
"pandax/pkg/global"
"pandax/pkg/rule_engine/message"
@@ -21,7 +20,7 @@ type clearAlarmNode struct {
func (f clearAlarmNodeFactory) Name() string { return ClearAlarmNodeName }
func (f clearAlarmNodeFactory) Category() string { return NODE_CATEGORY_ACTION }
func (f clearAlarmNodeFactory) Labels() []string { return []string{"Cleared", "Failure"} }
func (f clearAlarmNodeFactory) Create(id string, meta Metadata) (Node, error) {
func (f clearAlarmNodeFactory) Create(id string, meta Properties) (Node, error) {
node := &clearAlarmNode{
bareNode: newBareNode(f.Name(), id, meta, f.Labels()),
}
@@ -29,28 +28,30 @@ func (f clearAlarmNodeFactory) Create(id string, meta Metadata) (Node, error) {
}
func (n *clearAlarmNode) Handle(msg *message.Message) error {
logrus.Infof("%s handle message '%s'", n.Name(), msg.MsgType)
n.Debug(msg, message.DEBUGIN, "")
cleared := n.GetLinkedNode("Cleared")
failure := n.GetLinkedNode("Failure")
alarm := services.DeviceAlarmModelDao.FindOneByType(msg.Metadata.GetValue("deviceId").(string), n.AlarmType, "0")
if alarm.DeviceId != "" {
var err error
alarm, err := services.DeviceAlarmModelDao.FindOneByType(msg.Metadata.GetValue("deviceId").(string), n.AlarmType, "0")
if err == nil {
alarm.State = global.CLEARED
marshal, _ := json.Marshal(msg.Msg)
alarm.Details = string(marshal)
err := services.DeviceAlarmModelDao.Update(*alarm)
if err != nil {
if failure != nil {
return failure.Handle(msg)
}
} else {
err = services.DeviceAlarmModelDao.Update(*alarm)
if err == nil {
if cleared != nil {
n.Debug(msg, message.DEBUGOUT, "")
return cleared.Handle(msg)
}
}
} else {
}
if err != nil {
n.Debug(msg, message.DEBUGOUT, err.Error())
if failure != nil {
return failure.Handle(msg)
} else {
return err
}
}
return nil

View File

@@ -2,7 +2,6 @@ package nodes
import (
"encoding/json"
"github.com/sirupsen/logrus"
"pandax/apps/device/entity"
"pandax/apps/device/services"
"pandax/pkg/global"
@@ -22,7 +21,7 @@ type createAlarmNodeFactory struct{}
func (f createAlarmNodeFactory) Name() string { return "CreateAlarmNode" }
func (f createAlarmNodeFactory) Category() string { return NODE_CATEGORY_ACTION }
func (f createAlarmNodeFactory) Labels() []string { return []string{"Created", "Updated", "Failure"} }
func (f createAlarmNodeFactory) Create(id string, meta Metadata) (Node, error) {
func (f createAlarmNodeFactory) Create(id string, meta Properties) (Node, error) {
node := &createAlarmNode{
bareNode: newBareNode(f.Name(), id, meta, f.Labels()),
}
@@ -30,23 +29,20 @@ func (f createAlarmNodeFactory) Create(id string, meta Metadata) (Node, error) {
}
func (n *createAlarmNode) Handle(msg *message.Message) error {
logrus.Infof("%s handle message '%s'", n.Name(), msg.MsgType)
n.Debug(msg, message.DEBUGIN, "")
created := n.GetLinkedNode("Created")
updated := n.GetLinkedNode("Updated")
failure := n.GetLinkedNode("Failure")
alarm := services.DeviceAlarmModelDao.FindOneByType(msg.Metadata.GetValue("deviceId").(string), n.AlarmType, "0")
if alarm.DeviceId != "" {
var err error
alarm, err := services.DeviceAlarmModelDao.FindOneByType(msg.Metadata.GetValue("deviceId").(string), n.AlarmType, "0")
if err == nil {
marshal, _ := json.Marshal(msg.Msg)
alarm.Details = string(marshal)
err := services.DeviceAlarmModelDao.Update(*alarm)
if err != nil {
if failure != nil {
return failure.Handle(msg)
}
} else {
err = services.DeviceAlarmModelDao.Update(*alarm)
if err == nil {
if updated != nil {
n.Debug(msg, message.DEBUGOUT, "")
return updated.Handle(msg)
}
}
@@ -64,16 +60,21 @@ func (n *createAlarmNode) Handle(msg *message.Message) error {
alarm.Owner = msg.Metadata.GetValue("owner").(string)
marshal, _ := json.Marshal(msg.Msg)
alarm.Details = string(marshal)
err := services.DeviceAlarmModelDao.Insert(*alarm)
if err != nil {
if failure != nil {
return failure.Handle(msg)
}
} else {
err = services.DeviceAlarmModelDao.Insert(*alarm)
if err == nil {
if created != nil {
n.Debug(msg, message.DEBUGOUT, "")
return created.Handle(msg)
}
}
}
if err != nil {
n.Debug(msg, message.DEBUGOUT, err.Error())
if failure != nil {
return failure.Handle(msg)
} else {
return err
}
}
return nil
}

View File

@@ -5,8 +5,6 @@ import (
"pandax/pkg/rule_engine/message"
"sync"
"time"
"github.com/sirupsen/logrus"
)
const DelayNodeName = "DelayNode"
@@ -25,7 +23,7 @@ type delayNodeFactory struct{}
func (f delayNodeFactory) Name() string { return DelayNodeName }
func (f delayNodeFactory) Category() string { return NODE_CATEGORY_ACTION }
func (f delayNodeFactory) Labels() []string { return []string{"Success", "Failure"} }
func (f delayNodeFactory) Create(id string, meta Metadata) (Node, error) {
func (f delayNodeFactory) Create(id string, meta Properties) (Node, error) {
node := &delayNode{
bareNode: newBareNode(f.Name(), id, meta, f.Labels()),
lock: sync.Mutex{},
@@ -36,8 +34,7 @@ func (f delayNodeFactory) Create(id string, meta Metadata) (Node, error) {
}
func (n *delayNode) Handle(msg *message.Message) error {
logrus.Infof("%s handle message '%s'", n.Name(), msg.MsgType)
n.Debug(msg, message.DEBUGIN, "")
successLabelNode := n.GetLinkedNode("Success")
failureLabelNode := n.GetLinkedNode("Failure")
if successLabelNode == nil || failureLabelNode == nil {

View File

@@ -1,63 +0,0 @@
package nodes
import (
"github.com/sirupsen/logrus"
"pandax/pkg/rule_engine/message"
"time"
)
type messageGeneratorNode struct {
bareNode
Script string `json:"script" yaml:"script"`
PeriodSecond int64 `json:"periodSecond" yaml:"periodSecond"` //周期
MessageCount int64 `json:"messageCount" yaml:"messageCount"`
}
type messageGeneratorNodeFactory struct{}
func (f messageGeneratorNodeFactory) Name() string { return "MessageGeneratorNode" }
func (f messageGeneratorNodeFactory) Category() string { return NODE_CATEGORY_ACTION }
func (f messageGeneratorNodeFactory) Labels() []string { return []string{"Success", "Failure"} }
func (f messageGeneratorNodeFactory) Create(id string, meta Metadata) (Node, error) {
node := &messageGeneratorNode{
bareNode: newBareNode(f.Name(), id, meta, f.Labels()),
}
return decodePath(meta, node)
}
func (n *messageGeneratorNode) Handle(msg *message.Message) error {
logrus.Infof("%s handle message '%s'", n.Name(), msg.MsgType)
successLabelNode := n.GetLinkedNode("Success")
failureLabelNode := n.GetLinkedNode("Failure")
ticker := time.NewTicker(time.Duration(n.PeriodSecond) * time.Second)
count := 0
go func() {
for {
<-ticker.C
count++
if int64(count) == n.MessageCount {
ticker.Stop()
return
}
scriptEngine := NewScriptEngine(*msg, "Generate", n.Script)
generate, err := scriptEngine.ScriptGenerate()
if err != nil {
if failureLabelNode != nil {
go failureLabelNode.Handle(msg)
}
return
}
msg.Msg = generate["msg"].(message.Msg)
msg.Metadata = generate["metadata"].(message.Metadata)
msg.MsgType = generate["msgType"].(string)
if successLabelNode != nil {
go successLabelNode.Handle(msg)
}
}
}()
return nil
}

View File

@@ -16,7 +16,7 @@ type logNodeFactory struct{}
func (f logNodeFactory) Name() string { return "LogNode" }
func (f logNodeFactory) Category() string { return NODE_CATEGORY_ACTION }
func (f logNodeFactory) Labels() []string { return []string{"Success", "Failure"} }
func (f logNodeFactory) Create(id string, meta Metadata) (Node, error) {
func (f logNodeFactory) Create(id string, meta Properties) (Node, error) {
node := &logNode{
bareNode: newBareNode(f.Name(), id, meta, f.Labels()),
}

View File

@@ -2,7 +2,6 @@ package nodes
import (
"errors"
"github.com/sirupsen/logrus"
"pandax/iothub/client/mqttclient"
"pandax/iothub/client/tcpclient"
"pandax/pkg/global"
@@ -20,7 +19,7 @@ type rpcRequestFromDeviceFactory struct{}
func (f rpcRequestFromDeviceFactory) Name() string { return "RpcRequestFromDeviceNode" }
func (f rpcRequestFromDeviceFactory) Category() string { return NODE_CATEGORY_ACTION }
func (f rpcRequestFromDeviceFactory) Labels() []string { return []string{"Success", "Failure"} }
func (f rpcRequestFromDeviceFactory) Create(id string, meta Metadata) (Node, error) {
func (f rpcRequestFromDeviceFactory) Create(id string, meta Properties) (Node, error) {
node := &rpcRequestFromDeviceNode{
bareNode: newBareNode(f.Name(), id, meta, f.Labels()),
}
@@ -28,7 +27,7 @@ func (f rpcRequestFromDeviceFactory) Create(id string, meta Metadata) (Node, err
}
func (n *rpcRequestFromDeviceNode) Handle(msg *message.Message) error {
logrus.Infof("%s handle message '%s'", n.Name(), msg.MsgType)
n.Debug(msg, message.DEBUGIN, "")
successLableNode := n.GetLinkedNode("Success")
failureLableNode := n.GetLinkedNode("Failure")
@@ -42,6 +41,7 @@ func (n *rpcRequestFromDeviceNode) Handle(msg *message.Message) error {
result, err := rpcp.GetRequestResult()
if err != nil {
if failureLableNode != nil {
n.Debug(msg, message.DEBUGOUT, err.Error())
return failureLableNode.Handle(msg)
} else {
return err
@@ -71,6 +71,7 @@ func (n *rpcRequestFromDeviceNode) Handle(msg *message.Message) error {
err = tcpclient.Send(deviceId, result)
}
if err != nil {
n.Debug(msg, message.DEBUGOUT, err.Error())
if failureLableNode != nil {
return failureLableNode.Handle(msg)
} else {
@@ -78,6 +79,7 @@ func (n *rpcRequestFromDeviceNode) Handle(msg *message.Message) error {
}
}
if successLableNode != nil {
n.Debug(msg, message.DEBUGOUT, "")
return successLableNode.Handle(msg)
}
return nil

View File

@@ -3,7 +3,6 @@ package nodes
import (
"encoding/json"
"errors"
"github.com/sirupsen/logrus"
"pandax/iothub/client/mqttclient"
"pandax/iothub/client/tcpclient"
"pandax/pkg/global"
@@ -21,7 +20,7 @@ type rpcRequestToDeviceNodeFactory struct{}
func (f rpcRequestToDeviceNodeFactory) Name() string { return "RpcRequestToDeviceNode" }
func (f rpcRequestToDeviceNodeFactory) Category() string { return NODE_CATEGORY_ACTION }
func (f rpcRequestToDeviceNodeFactory) Labels() []string { return []string{"Success", "Failure"} }
func (f rpcRequestToDeviceNodeFactory) Create(id string, meta Metadata) (Node, error) {
func (f rpcRequestToDeviceNodeFactory) Create(id string, meta Properties) (Node, error) {
node := &rpcRequestToDeviceNode{
bareNode: newBareNode(f.Name(), id, meta, f.Labels()),
}
@@ -29,7 +28,7 @@ func (f rpcRequestToDeviceNodeFactory) Create(id string, meta Metadata) (Node, e
}
func (n *rpcRequestToDeviceNode) Handle(msg *message.Message) error {
logrus.Infof("%s handle message '%s'", n.Name(), msg.MsgType)
n.Debug(msg, message.DEBUGIN, "")
successLableNode := n.GetLinkedNode("Success")
failureLableNode := n.GetLinkedNode("Failure")
if msg.Msg.GetValue("method") == nil || msg.Msg.GetValue("params") == nil {
@@ -60,6 +59,7 @@ func (n *rpcRequestToDeviceNode) Handle(msg *message.Message) error {
err = tcpclient.Send(deviceId, string(payload))
}
if err != nil {
n.Debug(msg, message.DEBUGOUT, err.Error())
if failureLableNode != nil {
return failureLableNode.Handle(msg)
} else {
@@ -67,6 +67,7 @@ func (n *rpcRequestToDeviceNode) Handle(msg *message.Message) error {
}
}
if successLableNode != nil {
n.Debug(msg, message.DEBUGOUT, "")
return successLableNode.Handle(msg)
}
return nil

View File

@@ -1,7 +1,6 @@
package nodes
import (
"github.com/sirupsen/logrus"
"pandax/pkg/global"
"pandax/pkg/rule_engine/message"
)
@@ -15,7 +14,7 @@ type saveAttributesNodeFactory struct{}
func (f saveAttributesNodeFactory) Name() string { return "SaveAttributesNode" }
func (f saveAttributesNodeFactory) Category() string { return NODE_CATEGORY_ACTION }
func (f saveAttributesNodeFactory) Labels() []string { return []string{"Success", "Failure"} }
func (f saveAttributesNodeFactory) Create(id string, meta Metadata) (Node, error) {
func (f saveAttributesNodeFactory) Create(id string, meta Properties) (Node, error) {
node := &saveAttributesNode{
bareNode: newBareNode(f.Name(), id, meta, f.Labels()),
}
@@ -23,7 +22,7 @@ func (f saveAttributesNodeFactory) Create(id string, meta Metadata) (Node, error
}
func (n *saveAttributesNode) Handle(msg *message.Message) error {
logrus.Infof("%s handle message '%s'", n.Name(), msg.MsgType)
n.Debug(msg, message.DEBUGIN, "")
successLabelNode := n.GetLinkedNode("Success")
failureLabelNode := n.GetLinkedNode("Failure")
/*if msg.MsgType != message.AttributesMes {
@@ -37,11 +36,15 @@ func (n *saveAttributesNode) Handle(msg *message.Message) error {
deviceName := msg.Metadata["deviceName"].(string)
err := global.TdDb.InsertDevice(deviceName+"_attributes", msg.Msg)
if err != nil {
n.Debug(msg, message.DEBUGOUT, err.Error())
if failureLabelNode != nil {
return failureLabelNode.Handle(msg)
} else {
return err
}
}
if successLabelNode != nil {
n.Debug(msg, message.DEBUGOUT, "")
return successLabelNode.Handle(msg)
}
return nil

View File

@@ -1,7 +1,6 @@
package nodes
import (
"github.com/sirupsen/logrus"
"pandax/pkg/global"
"pandax/pkg/rule_engine/message"
)
@@ -15,7 +14,7 @@ type saveTimeSeriesNodeFactory struct{}
func (f saveTimeSeriesNodeFactory) Name() string { return "SaveTimeSeriesNode" }
func (f saveTimeSeriesNodeFactory) Category() string { return NODE_CATEGORY_ACTION }
func (f saveTimeSeriesNodeFactory) Labels() []string { return []string{"Success", "Failure"} }
func (f saveTimeSeriesNodeFactory) Create(id string, meta Metadata) (Node, error) {
func (f saveTimeSeriesNodeFactory) Create(id string, meta Properties) (Node, error) {
node := &saveTimeSeriesNode{
bareNode: newBareNode(f.Name(), id, meta, f.Labels()),
}
@@ -23,7 +22,8 @@ func (f saveTimeSeriesNodeFactory) Create(id string, meta Metadata) (Node, error
}
func (n *saveTimeSeriesNode) Handle(msg *message.Message) error {
logrus.Infof("%s handle message '%s'", n.Name(), msg.MsgType)
n.Debug(msg, message.DEBUGIN, "")
successLabelNode := n.GetLinkedNode("Success")
failureLabelNode := n.GetLinkedNode("Failure")
/* if msg.MsgType != message.TelemetryMes && msg.MsgType != message.RowMes{
@@ -33,15 +33,18 @@ func (n *saveTimeSeriesNode) Handle(msg *message.Message) error {
return nil
}
}*/
//deviceId := msg.GetMetadata().GetValues()["deviceId"].(string)
deviceName := msg.Metadata["deviceName"].(string)
err := global.TdDb.InsertDevice(deviceName+"_telemetry", msg.Msg)
if err != nil {
n.Debug(msg, message.DEBUGOUT, err.Error())
if failureLabelNode != nil {
return failureLabelNode.Handle(msg)
} else {
return err
}
}
if successLabelNode != nil {
n.Debug(msg, message.DEBUGOUT, "")
return successLabelNode.Handle(msg)
}
return nil

View File

@@ -7,7 +7,6 @@ import (
"encoding/json"
"fmt"
"github.com/PandaXGO/PandaKit/httpclient"
"github.com/sirupsen/logrus"
"net/url"
"pandax/pkg/rule_engine/message"
"time"
@@ -28,7 +27,7 @@ type externalDingNodeFactory struct{}
func (f externalDingNodeFactory) Name() string { return "DingNode" }
func (f externalDingNodeFactory) Category() string { return NODE_CATEGORY_EXTERNAL }
func (f externalDingNodeFactory) Labels() []string { return []string{"Success", "Failure"} }
func (f externalDingNodeFactory) Create(id string, meta Metadata) (Node, error) {
func (f externalDingNodeFactory) Create(id string, meta Properties) (Node, error) {
node := &externalDingNode{
bareNode: newBareNode(f.Name(), id, meta, f.Labels()),
}
@@ -36,7 +35,7 @@ func (f externalDingNodeFactory) Create(id string, meta Metadata) (Node, error)
}
func (n *externalDingNode) Handle(msg *message.Message) error {
logrus.Infof("%s handle message '%s'", n.Name(), msg.MsgType)
n.Debug(msg, message.DEBUGIN, "")
successLabelNode := n.GetLinkedNode("Success")
failureLabelNode := n.GetLinkedNode("Failure")
@@ -63,6 +62,7 @@ func (n *externalDingNode) Handle(msg *message.Message) error {
postJson := httpclient.NewRequest(url).Header("Content-Type", "application/json").PostJson(string(marshal))
if postJson.StatusCode != 200 {
n.Debug(msg, message.DEBUGOUT, "钉钉机器人hook接口请求失败")
if failureLabelNode != nil {
return failureLabelNode.Handle(msg)
} else {
@@ -70,6 +70,7 @@ func (n *externalDingNode) Handle(msg *message.Message) error {
}
}
if successLabelNode != nil {
n.Debug(msg, message.DEBUGOUT, "")
return successLabelNode.Handle(msg)
}
return nil

View File

@@ -24,7 +24,7 @@ type externalKafkaNodeFactory struct{}
func (f externalKafkaNodeFactory) Name() string { return "KafkaNode" }
func (f externalKafkaNodeFactory) Category() string { return NODE_CATEGORY_EXTERNAL }
func (f externalKafkaNodeFactory) Labels() []string { return []string{"Success", "Failure"} }
func (f externalKafkaNodeFactory) Create(id string, meta Metadata) (Node, error) {
func (f externalKafkaNodeFactory) Create(id string, meta Properties) (Node, error) {
node := &externalKafkaNode{
bareNode: newBareNode(f.Name(), id, meta, f.Labels()),
}
@@ -46,7 +46,7 @@ func (f externalKafkaNodeFactory) Create(id string, meta Metadata) (Node, error)
}
func (n *externalKafkaNode) Handle(msg *message.Message) error {
logrus.Infof("%s handle message '%s'", n.Name(), msg.MsgType)
n.Debug(msg, message.DEBUGIN, "")
defer n.KafkaCli.Close()
successLabelNode := n.GetLinkedNode("Success")
failureLabelNode := n.GetLinkedNode("Failure")
@@ -71,6 +71,7 @@ func (n *externalKafkaNode) Handle(msg *message.Message) error {
}
_, _, err := n.KafkaCli.SendMessage(kafkaM)
if err != nil {
n.Debug(msg, message.DEBUGOUT, err.Error())
if failureLabelNode != nil {
return failureLabelNode.Handle(msg)
} else {
@@ -78,6 +79,7 @@ func (n *externalKafkaNode) Handle(msg *message.Message) error {
}
}
if successLabelNode != nil {
n.Debug(msg, message.DEBUGOUT, "")
return successLabelNode.Handle(msg)
}

View File

@@ -29,7 +29,7 @@ type externalMqttNodeFactory struct{}
func (f externalMqttNodeFactory) Name() string { return "MqttNode" }
func (f externalMqttNodeFactory) Category() string { return NODE_CATEGORY_EXTERNAL }
func (f externalMqttNodeFactory) Labels() []string { return []string{"Success", "Failure"} }
func (f externalMqttNodeFactory) Create(id string, meta Metadata) (Node, error) {
func (f externalMqttNodeFactory) Create(id string, meta Properties) (Node, error) {
node := &externalMqttNode{
bareNode: newBareNode(f.Name(), id, meta, f.Labels()),
}
@@ -60,7 +60,7 @@ func (f externalMqttNodeFactory) Create(id string, meta Metadata) (Node, error)
}
func (n *externalMqttNode) Handle(msg *message.Message) error {
logrus.Infof("%s handle message '%s'", n.Name(), msg.MsgType)
n.Debug(msg, message.DEBUGIN, "")
defer n.MqttCli.Disconnect(1000)
successLabelNode := n.GetLinkedNode("Success")
failureLabelNode := n.GetLinkedNode("Failure")
@@ -71,6 +71,7 @@ func (n *externalMqttNode) Handle(msg *message.Message) error {
}
token := n.MqttCli.Publish(topic, 1, false, sendmqttmsg)
if token.Wait() && token.Error() != nil {
n.Debug(msg, message.DEBUGOUT, token.Error().Error())
if failureLabelNode != nil {
return failureLabelNode.Handle(msg)
} else {
@@ -78,6 +79,7 @@ func (n *externalMqttNode) Handle(msg *message.Message) error {
}
}
if successLabelNode != nil {
n.Debug(msg, message.DEBUGOUT, "")
return successLabelNode.Handle(msg)
}
return nil

View File

@@ -2,7 +2,6 @@ package nodes
import (
"github.com/nats-io/nats.go"
"github.com/sirupsen/logrus"
"pandax/pkg/rule_engine/message"
)
@@ -19,7 +18,7 @@ type externalNatsNodeFactory struct{}
func (f externalNatsNodeFactory) Name() string { return "NatsNode" }
func (f externalNatsNodeFactory) Category() string { return NODE_CATEGORY_EXTERNAL }
func (f externalNatsNodeFactory) Labels() []string { return []string{"Success", "Failure"} }
func (f externalNatsNodeFactory) Create(id string, meta Metadata) (Node, error) {
func (f externalNatsNodeFactory) Create(id string, meta Properties) (Node, error) {
node := &externalNatsNode{
bareNode: newBareNode(f.Name(), id, meta, f.Labels()),
}
@@ -36,7 +35,7 @@ func (f externalNatsNodeFactory) Create(id string, meta Metadata) (Node, error)
}
func (n *externalNatsNode) Handle(msg *message.Message) error {
logrus.Infof("%s handle message '%s'", n.Name(), msg.MsgType)
n.Debug(msg, message.DEBUGIN, "")
defer n.client.Close()
successLabelNode := n.GetLinkedNode("Success")
failureLabelNode := n.GetLinkedNode("Failure")
@@ -46,6 +45,7 @@ func (n *externalNatsNode) Handle(msg *message.Message) error {
}
err = n.client.Publish(n.Subject, []byte(template))
if err != nil {
n.Debug(msg, message.DEBUGOUT, err.Error())
if failureLabelNode != nil {
return failureLabelNode.Handle(msg)
} else {
@@ -53,6 +53,7 @@ func (n *externalNatsNode) Handle(msg *message.Message) error {
}
}
if successLabelNode != nil {
n.Debug(msg, message.DEBUGOUT, "")
return successLabelNode.Handle(msg)
}
return nil

View File

@@ -4,7 +4,6 @@ import (
"encoding/json"
"errors"
"github.com/PandaXGO/PandaKit/httpclient"
"github.com/sirupsen/logrus"
"pandax/pkg/rule_engine/message"
)
@@ -20,7 +19,7 @@ type externalRestapiNodeFactory struct{}
func (f externalRestapiNodeFactory) Name() string { return "RestapiNode" }
func (f externalRestapiNodeFactory) Category() string { return NODE_CATEGORY_EXTERNAL }
func (f externalRestapiNodeFactory) Labels() []string { return []string{"Success", "Failure"} }
func (f externalRestapiNodeFactory) Create(id string, meta Metadata) (Node, error) {
func (f externalRestapiNodeFactory) Create(id string, meta Properties) (Node, error) {
node := &externalRestapiNode{
bareNode: newBareNode(f.Name(), id, meta, f.Labels()),
}
@@ -28,7 +27,7 @@ func (f externalRestapiNodeFactory) Create(id string, meta Metadata) (Node, erro
}
func (n *externalRestapiNode) Handle(msg *message.Message) error {
logrus.Infof("%s handle message '%s'", n.Name(), msg.MsgType)
n.Debug(msg, message.DEBUGIN, "")
successLableNode := n.GetLinkedNode("Success")
failureLableNode := n.GetLinkedNode("Failure")
if n.RequestMethod == "GET" {
@@ -39,6 +38,7 @@ func (n *externalRestapiNode) Handle(msg *message.Message) error {
var response map[string]interface{}
err := json.Unmarshal(resp.Body, &response)
if err != nil && failureLableNode != nil {
n.Debug(msg, message.DEBUGOUT, err.Error())
return failureLableNode.Handle(msg)
} else {
if successLableNode != nil {
@@ -47,6 +47,7 @@ func (n *externalRestapiNode) Handle(msg *message.Message) error {
metadata.SetValue(key, value)
}
msg.Metadata = metadata
n.Debug(msg, message.DEBUGOUT, "")
return successLableNode.Handle(msg)
}
}
@@ -60,42 +61,15 @@ func (n *externalRestapiNode) Handle(msg *message.Message) error {
resp := req.PostJson(string(binary))
if resp.StatusCode != 200 {
if failureLableNode != nil {
n.Debug(msg, message.DEBUGOUT, "接口请求失败")
return failureLableNode.Handle(msg)
}
} else {
if successLableNode != nil {
n.Debug(msg, message.DEBUGOUT, "")
return successLableNode.Handle(msg)
}
}
}
/*if n.RequestMethod == "PUT" {
binary, _ := msg.MarshalBinary()
req := httpclient.NewRequest(n.RestEndpointUrlPattern)
for key,value := range n.Headers {
req.Header(key,value)
}
_, err := http.HttpPut(n.RestEndpointUrlPattern, n.Headers, nil, binary)
if err != nil {
if failureLableNode != nil {
return failureLableNode.Handle(msg)
}
} else {
if successLableNode != nil {
return successLableNode.Handle(msg)
}
}
}
if n.RequestMethod == "DELETE" {
_, err := http.HttpDelete(n.RestEndpointUrlPattern)
if err != nil {
if failureLableNode != nil {
return failureLableNode.Handle(msg)
}
} else {
if successLableNode != nil {
return successLableNode.Handle(msg)
}
}
}*/
return nil
}

View File

@@ -19,7 +19,7 @@ type externalRuleChainNodeFactory struct{}
func (f externalRuleChainNodeFactory) Name() string { return "RuleChainNode" }
func (f externalRuleChainNodeFactory) Category() string { return NODE_CATEGORY_FLOWS }
func (f externalRuleChainNodeFactory) Labels() []string { return []string{} }
func (f externalRuleChainNodeFactory) Create(id string, meta Metadata) (Node, error) {
func (f externalRuleChainNodeFactory) Create(id string, meta Properties) (Node, error) {
node := &externalRuleChainNode{
bareNode: newBareNode(f.Name(), id, meta, f.Labels()),
}

View File

@@ -3,7 +3,6 @@ package nodes
import (
"crypto/tls"
"fmt"
"github.com/sirupsen/logrus"
"net/smtp"
"pandax/pkg/rule_engine/message"
"strings"
@@ -30,7 +29,7 @@ type externalSendEmailNodeFactory struct{}
func (f externalSendEmailNodeFactory) Name() string { return "SendEmailNode" }
func (f externalSendEmailNodeFactory) Category() string { return NODE_CATEGORY_EXTERNAL }
func (f externalSendEmailNodeFactory) Labels() []string { return []string{"Success", "Failure"} }
func (f externalSendEmailNodeFactory) Create(id string, meta Metadata) (Node, error) {
func (f externalSendEmailNodeFactory) Create(id string, meta Properties) (Node, error) {
node := &externalSendEmailNode{
bareNode: newBareNode(f.Name(), id, meta, f.Labels()),
@@ -39,7 +38,7 @@ func (f externalSendEmailNodeFactory) Create(id string, meta Metadata) (Node, er
}
func (n *externalSendEmailNode) Handle(msg *message.Message) error {
logrus.Infof("%s handle message '%s'", n.Name(), msg.MsgType)
n.Debug(msg, message.DEBUGIN, "")
successLabelNode := n.GetLinkedNode("Success")
failureLabelNode := n.GetLinkedNode("Failure")
@@ -50,6 +49,7 @@ func (n *externalSendEmailNode) Handle(msg *message.Message) error {
}
err := n.send(tos, *msg)
if err != nil {
n.Debug(msg, message.DEBUGOUT, err.Error())
if failureLabelNode != nil {
return failureLabelNode.Handle(msg)
} else {
@@ -57,6 +57,7 @@ func (n *externalSendEmailNode) Handle(msg *message.Message) error {
}
}
if successLabelNode != nil {
n.Debug(msg, message.DEBUGOUT, "")
return successLabelNode.Handle(msg)
}
return nil

View File

@@ -1,7 +1,6 @@
package nodes
import (
"github.com/sirupsen/logrus"
"pandax/pkg/rule_engine/message"
)
@@ -20,7 +19,7 @@ type externalSendSmsNodeFactory struct{}
func (f externalSendSmsNodeFactory) Name() string { return "SendSmsNode" }
func (f externalSendSmsNodeFactory) Category() string { return NODE_CATEGORY_EXTERNAL }
func (f externalSendSmsNodeFactory) Labels() []string { return []string{"Success", "Failure"} }
func (f externalSendSmsNodeFactory) Create(id string, meta Metadata) (Node, error) {
func (f externalSendSmsNodeFactory) Create(id string, meta Properties) (Node, error) {
node := &externalSendSmsNode{
bareNode: newBareNode(f.Name(), id, meta, f.Labels()),
}
@@ -28,10 +27,11 @@ func (f externalSendSmsNodeFactory) Create(id string, meta Metadata) (Node, erro
}
func (n *externalSendSmsNode) Handle(msg *message.Message) error {
logrus.Infof("%s handle message '%s'", n.Name(), msg.MsgType)
n.Debug(msg, message.DEBUGIN, "")
successLabelNode := n.GetLinkedNode("Success")
//failureLabelNode := n.GetLinkedNode("Failure")
n.Debug(msg, message.DEBUGOUT, "")
return successLabelNode.Handle(msg)
}

View File

@@ -3,7 +3,6 @@ package nodes
import (
"encoding/json"
"github.com/PandaXGO/PandaKit/httpclient"
"github.com/sirupsen/logrus"
"pandax/pkg/rule_engine/message"
)
@@ -21,7 +20,7 @@ type externalWechatNodeFactory struct{}
func (f externalWechatNodeFactory) Name() string { return "WechatNode" }
func (f externalWechatNodeFactory) Category() string { return NODE_CATEGORY_EXTERNAL }
func (f externalWechatNodeFactory) Labels() []string { return []string{"Success", "Failure"} }
func (f externalWechatNodeFactory) Create(id string, meta Metadata) (Node, error) {
func (f externalWechatNodeFactory) Create(id string, meta Properties) (Node, error) {
node := &externalWechatNode{
bareNode: newBareNode(f.Name(), id, meta, f.Labels()),
}
@@ -29,11 +28,12 @@ func (f externalWechatNodeFactory) Create(id string, meta Metadata) (Node, error
}
func (n *externalWechatNode) Handle(msg *message.Message) error {
logrus.Infof("%s handle message '%s'", n.Name(), msg.MsgType)
n.Debug(msg, message.DEBUGIN, "")
successLabelNode := n.GetLinkedNode("Success")
failureLabelNode := n.GetLinkedNode("Failure")
template, err := ParseTemplate(n.Content, msg.GetAllMap())
sendData := map[string]interface{}{
"msgtype": "text",
"text": map[string]interface{}{"content": template},
@@ -46,6 +46,7 @@ func (n *externalWechatNode) Handle(msg *message.Message) error {
marshal, _ := json.Marshal(sendData)
postJson := httpclient.NewRequest(n.WebHook).Header("Content-Type", "application/json").PostJson(string(marshal))
if postJson.StatusCode != 200 {
n.Debug(msg, message.DEBUGOUT, "请求微信机器人hook接口失败")
if failureLabelNode != nil {
return failureLabelNode.Handle(msg)
} else {
@@ -53,6 +54,7 @@ func (n *externalWechatNode) Handle(msg *message.Message) error {
}
}
if successLabelNode != nil {
n.Debug(msg, message.DEBUGOUT, "")
return successLabelNode.Handle(msg)
}
return nil

View File

@@ -18,7 +18,7 @@ type Factory interface {
Name() string
Category() string
Labels() []string
Create(id string, meta Metadata) (Node, error)
Create(id string, meta Properties) (Node, error)
}
var (
@@ -41,7 +41,7 @@ func RegisterFactory(f Factory) {
}
// NewNode is the only way to create a new node
func NewNode(nodeType string, id string, meta Metadata) (Node, error) {
func NewNode(nodeType string, id string, meta Properties) (Node, error) {
if f, found := allNodeFactories[nodeType]; found {
return f.Create(id, meta)
}

View File

@@ -1,7 +1,6 @@
package nodes
import (
"github.com/sirupsen/logrus"
"pandax/pkg/rule_engine/message"
)
@@ -18,7 +17,7 @@ func (f deviceTypeSwitchNodeFactory) Category() string { return NODE_CATEGORY_FI
func (f deviceTypeSwitchNodeFactory) Labels() []string {
return []string{message.DEVICE, message.GATEWAY}
}
func (f deviceTypeSwitchNodeFactory) Create(id string, meta Metadata) (Node, error) {
func (f deviceTypeSwitchNodeFactory) Create(id string, meta Properties) (Node, error) {
node := &deviceTypeSwitchNode{
bareNode: newBareNode(f.Name(), id, meta, f.Labels()),
}
@@ -26,20 +25,23 @@ func (f deviceTypeSwitchNodeFactory) Create(id string, meta Metadata) (Node, err
}
func (n *deviceTypeSwitchNode) Handle(msg *message.Message) error {
logrus.Infof("%s handle message '%s'", n.Name(), msg.MsgType)
n.Debug(msg, message.DEBUGIN, "")
deviceLabelNode := n.GetLinkedNode(message.DEVICE)
gatewayLabelNode := n.GetLinkedNode(message.GATEWAY)
if msg.Metadata.GetValue("deviceType").(string) == message.DEVICE {
deviceType := msg.Metadata.GetValue("deviceType").(string)
if deviceType == message.DEVICE {
if deviceLabelNode != nil {
n.Debug(msg, message.DEBUGOUT, "")
return deviceLabelNode.Handle(msg)
}
}
if msg.Metadata.GetValue("deviceType").(string) == message.GATEWAY {
} else if deviceType == message.GATEWAY {
if gatewayLabelNode != nil {
n.Debug(msg, message.DEBUGOUT, "")
return gatewayLabelNode.Handle(msg)
}
}
n.Debug(msg, message.DEBUGOUT, "没有匹配的设备类型")
return nil
}

View File

@@ -1,7 +1,6 @@
package nodes
import (
"github.com/sirupsen/logrus"
"pandax/pkg/rule_engine/message"
)
@@ -16,7 +15,7 @@ func (f messageTypeFilterNodeFactory) Name() string { return "MessageTypeNod
func (f messageTypeFilterNodeFactory) Category() string { return NODE_CATEGORY_FILTER }
func (f messageTypeFilterNodeFactory) Labels() []string { return []string{"True", "False"} }
func (f messageTypeFilterNodeFactory) Create(id string, meta Metadata) (Node, error) {
func (f messageTypeFilterNodeFactory) Create(id string, meta Properties) (Node, error) {
node := &messageTypeFilterNode{
bareNode: newBareNode(f.Name(), id, meta, f.Labels()),
MessageTypes: []string{},
@@ -25,19 +24,30 @@ func (f messageTypeFilterNodeFactory) Create(id string, meta Metadata) (Node, er
}
func (n *messageTypeFilterNode) Handle(msg *message.Message) error {
logrus.Infof("%s handle message '%s'", n.Name(), msg.MsgType)
n.Debug(msg, message.DEBUGIN, "")
trueLabelNode := n.GetLinkedNode("True")
falseLabelNode := n.GetLinkedNode("False")
messageType := msg.MsgType
for _, filterType := range n.MessageTypes {
if filterType == messageType && trueLabelNode != nil {
if n.containsType(msg.MsgType) {
if trueLabelNode != nil {
n.Debug(msg, message.DEBUGOUT, "")
return trueLabelNode.Handle(msg)
}
}
if falseLabelNode != nil {
return falseLabelNode.Handle(msg)
} else {
if falseLabelNode != nil {
n.Debug(msg, message.DEBUGOUT, "不包含消息类型")
return falseLabelNode.Handle(msg)
}
}
return nil
}
func (n *messageTypeFilterNode) containsType(messageType string) bool {
for _, filterType := range n.MessageTypes {
if filterType == messageType {
return true
}
}
return false
}

View File

@@ -1,7 +1,6 @@
package nodes
import (
"github.com/sirupsen/logrus"
"pandax/pkg/rule_engine/message"
)
@@ -26,7 +25,7 @@ func (f messageTypeSwitchNodeFactory) Labels() []string {
message.DisConnectMes,
}
}
func (f messageTypeSwitchNodeFactory) Create(id string, meta Metadata) (Node, error) {
func (f messageTypeSwitchNodeFactory) Create(id string, meta Properties) (Node, error) {
node := &messageTypeSwitchNode{
bareNode: newBareNode(f.Name(), id, meta, f.Labels()),
}
@@ -34,7 +33,7 @@ func (f messageTypeSwitchNodeFactory) Create(id string, meta Metadata) (Node, er
}
func (n *messageTypeSwitchNode) Handle(msg *message.Message) error {
logrus.Infof("%s handle message '%s'", n.Name(), msg.MsgType)
n.Debug(msg, message.DEBUGIN, "")
nodes := n.GetLinkedNodes()
messageType := msg.MsgType
for label, node := range nodes {
@@ -42,5 +41,6 @@ func (n *messageTypeSwitchNode) Handle(msg *message.Message) error {
return node.Handle(msg)
}
}
n.Debug(msg, message.DEBUGOUT, "消息类型不正确")
return nil
}

View File

@@ -1,7 +1,6 @@
package nodes
import (
"github.com/sirupsen/logrus"
"pandax/pkg/rule_engine/message"
)
@@ -16,8 +15,8 @@ type scriptFilterNodeFactory struct{}
func (f scriptFilterNodeFactory) Name() string { return ScriptFilterNodeName }
func (f scriptFilterNodeFactory) Category() string { return NODE_CATEGORY_FILTER }
func (f scriptFilterNodeFactory) Labels() []string { return []string{"True", "False"} }
func (f scriptFilterNodeFactory) Create(id string, meta Metadata) (Node, error) {
func (f scriptFilterNodeFactory) Labels() []string { return []string{"True", "False", "Failure"} }
func (f scriptFilterNodeFactory) Create(id string, meta Properties) (Node, error) {
node := &scriptFilterNode{
bareNode: newBareNode(f.Name(), id, meta, f.Labels()),
}
@@ -25,16 +24,26 @@ func (f scriptFilterNodeFactory) Create(id string, meta Metadata) (Node, error)
}
func (n *scriptFilterNode) Handle(msg *message.Message) error {
logrus.Infof("%s handle message '%s'", n.Name(), msg.MsgType)
n.Debug(msg, message.DEBUGIN, "")
trueLabelNode := n.GetLinkedNode("True")
falseLabelNode := n.GetLinkedNode("False")
failureLabelNode := n.GetLinkedNode("Failure")
scriptEngine := NewScriptEngine(*msg, "Filter", n.Script)
isTrue, error := scriptEngine.ScriptOnFilter()
if isTrue == true && error == nil && trueLabelNode != nil {
isTrue, err := scriptEngine.ScriptOnFilter()
if err != nil {
if failureLabelNode != nil {
n.Debug(msg, message.DEBUGOUT, err.Error())
return failureLabelNode.Handle(msg)
}
}
if isTrue == true && trueLabelNode != nil {
n.Debug(msg, message.DEBUGOUT, "")
return trueLabelNode.Handle(msg)
} else {
if falseLabelNode != nil {
n.Debug(msg, message.DEBUGOUT, "Script脚本执行失败")
return falseLabelNode.Handle(msg)
}
}

View File

@@ -1,7 +1,6 @@
package nodes
import (
"github.com/sirupsen/logrus"
"pandax/pkg/rule_engine/message"
)
@@ -15,20 +14,9 @@ type switchFilterNodeFactory struct{}
func (f switchFilterNodeFactory) Name() string { return "SwitchNode" }
func (f switchFilterNodeFactory) Category() string { return NODE_CATEGORY_FILTER }
func (f switchFilterNodeFactory) Labels() []string {
return []string{
"True", "False",
message.RowMes,
message.AttributesMes,
message.TelemetryMes,
message.RpcRequestFromDevice,
message.RpcRequestToDevice,
message.AlarmMes,
message.UpEventMes,
message.ConnectMes,
message.DisConnectMes,
}
return []string{"Failure"}
}
func (f switchFilterNodeFactory) Create(id string, meta Metadata) (Node, error) {
func (f switchFilterNodeFactory) Create(id string, meta Properties) (Node, error) {
node := &switchFilterNode{
bareNode: newBareNode(f.Name(), id, meta, f.Labels()),
}
@@ -36,12 +24,15 @@ func (f switchFilterNodeFactory) Create(id string, meta Metadata) (Node, error)
}
func (n *switchFilterNode) Handle(msg *message.Message) error {
logrus.Infof("%s handle message '%s'", n.Name(), msg.MsgType)
n.Debug(msg, message.DEBUGIN, "")
failureLabelNode := n.GetLinkedNode("Failure")
scriptEngine := NewScriptEngine(*msg, "Switch", n.Script)
SwitchResults, err := scriptEngine.ScriptOnSwitch()
if err != nil {
return err
n.Debug(msg, message.DEBUGOUT, err.Error())
return failureLabelNode.Handle(msg)
}
nodes := n.GetLinkedNodes()
for label, node := range nodes {
@@ -51,5 +42,6 @@ func (n *switchFilterNode) Handle(msg *message.Message) error {
}
}
}
n.Debug(msg, message.DEBUGOUT, "")
return nil
}

View File

@@ -16,7 +16,7 @@ type inputNodeFactory struct{}
func (f inputNodeFactory) Name() string { return "InputNode" }
func (f inputNodeFactory) Category() string { return NODE_CATEGORY_OTHERS }
func (f inputNodeFactory) Labels() []string { return []string{"True"} }
func (f inputNodeFactory) Create(id string, meta Metadata) (Node, error) {
func (f inputNodeFactory) Create(id string, meta Properties) (Node, error) {
node := &inputNode{
bareNode: newBareNode(InputNodeName, id, meta, f.Labels()),
}

View File

@@ -10,7 +10,7 @@ import (
type Node interface {
Name() string
Id() string
Metadata() Metadata
Properties() Properties
MustLabels() []string
Handle(*message.Message) error
@@ -23,11 +23,11 @@ type bareNode struct {
name string
id string
nodes map[string]Node
meta Metadata
meta Properties
labels []string
}
func newBareNode(name string, id string, meta Metadata, labels []string) bareNode {
func newBareNode(name string, id string, meta Properties, labels []string) bareNode {
return bareNode{
name: name,
id: id,
@@ -52,11 +52,23 @@ func (n *bareNode) GetLinkedNode(label string) Node {
func (n *bareNode) GetLinkedNodes() map[string]Node { return n.nodes }
func (n *bareNode) Metadata() Metadata { return n.meta }
func (n *bareNode) Properties() Properties { return n.meta }
func (n *bareNode) Handle(*message.Message) error { return errors.New("not implemented") }
func decodePath(meta Metadata, n Node) (Node, error) {
func (n *bareNode) Debug(msg *message.Message, debugType, error string) {
value, err := n.meta.Value("debugMode")
if err != nil {
return
}
if debugMode, ok := value.(bool); ok {
if debugMode {
msg.Debug(n.id, n.name, debugType, error)
}
}
}
func decodePath(meta Properties, n Node) (Node, error) {
if err := meta.DecodePath(n); err != nil {
return n, err
}
@@ -67,8 +79,8 @@ func GetNodes(m *manifest.Manifest) (map[string]Node, error) {
nodes := make(map[string]Node)
// Create All nodes
for _, n := range m.Nodes {
metadata := NewMetadataWithValues(n.Properties)
node, err := NewNode(n.Type, n.Id, metadata)
propertie := NewPropertiesWithValues(n.Properties)
node, err := NewNode(n.Type, n.Id, propertie)
if err != nil {
logrus.Errorf("new node '%s' failure", n.Id)
continue

View File

@@ -10,35 +10,35 @@ const (
NODE_CONFIG_ORIGINATOR_TYPE_KEY = "originatorTypeKey"
)
// Metadata 前端 参数 Properties
type Metadata interface {
// Properties 前端 参数 Properties
type Properties interface {
Keys() []string
With(key string, val interface{}) Metadata
With(key string, val interface{}) Properties
Value(key string) (interface{}, error)
DecodePath(rawVal interface{}) error
}
type nodeMetadata struct {
type nodeProperties struct {
keypairs map[string]interface{}
}
func NewMetadata() Metadata {
return &nodeMetadata{
func NewProperties() Properties {
return &nodeProperties{
keypairs: make(map[string]interface{}),
}
}
func NewMetadataWithString(vals string) Metadata {
return &nodeMetadata{}
func NewPropertiesWithString(vals string) Properties {
return &nodeProperties{}
}
func NewMetadataWithValues(vals map[string]interface{}) Metadata {
return &nodeMetadata{
func NewPropertiesWithValues(vals map[string]interface{}) Properties {
return &nodeProperties{
keypairs: vals,
}
}
func (c *nodeMetadata) Keys() []string {
func (c *nodeProperties) Keys() []string {
keys := []string{}
for key, _ := range c.keypairs {
keys = append(keys, key)
@@ -46,19 +46,19 @@ func (c *nodeMetadata) Keys() []string {
return keys
}
func (c *nodeMetadata) Value(key string) (interface{}, error) {
func (c *nodeProperties) Value(key string) (interface{}, error) {
if val, found := c.keypairs[key]; found {
return val, nil
}
return nil, fmt.Errorf("key '%s' not found", key)
}
func (c *nodeMetadata) With(key string, val interface{}) Metadata {
func (c *nodeProperties) With(key string, val interface{}) Properties {
c.keypairs[key] = val
return c
}
func (c *nodeMetadata) DecodePath(rawVal interface{}) error {
func (c *nodeProperties) DecodePath(rawVal interface{}) error {
//return utils.Map2Struct(c.keypairs, rawVal)
return mapstructure.Decode(c.keypairs, rawVal)
}

View File

@@ -1,7 +1,6 @@
package nodes
import (
"github.com/sirupsen/logrus"
"pandax/pkg/rule_engine/message"
"strings"
)
@@ -16,7 +15,7 @@ type transformDeleteKeyNodeFactory struct{}
func (f transformDeleteKeyNodeFactory) Name() string { return "DeleteKeyNode" }
func (f transformDeleteKeyNodeFactory) Category() string { return NODE_CATEGORY_TRANSFORM }
func (f transformDeleteKeyNodeFactory) Labels() []string { return []string{"Success", "Failure"} }
func (f transformDeleteKeyNodeFactory) Create(id string, meta Metadata) (Node, error) {
func (f transformDeleteKeyNodeFactory) Create(id string, meta Properties) (Node, error) {
node := &transformDeleteKeyNode{
bareNode: newBareNode(f.Name(), id, meta, f.Labels()),
}
@@ -24,7 +23,7 @@ func (f transformDeleteKeyNodeFactory) Create(id string, meta Metadata) (Node, e
}
func (n *transformDeleteKeyNode) Handle(msg *message.Message) error {
logrus.Infof("%s handle message '%s'", n.Name(), msg.MsgType)
n.Debug(msg, message.DEBUGIN, "")
successLabelNode := n.GetLinkedNode("Success")
failureLabelNode := n.GetLinkedNode("Failure")
@@ -47,10 +46,12 @@ func (n *transformDeleteKeyNode) Handle(msg *message.Message) error {
}
} else {
if failureLabelNode != nil {
n.Debug(msg, message.DEBUGOUT, "未识别FormType")
return failureLabelNode.Handle(msg)
}
}
if successLabelNode != nil {
n.Debug(msg, message.DEBUGOUT, "")
return successLabelNode.Handle(msg)
}
return nil

View File

@@ -1,7 +1,6 @@
package nodes
import (
"github.com/sirupsen/logrus"
"pandax/pkg/rule_engine/message"
)
@@ -19,7 +18,7 @@ type transformRenameKeyNodeFactory struct{}
func (f transformRenameKeyNodeFactory) Name() string { return "RenameKeyNode" }
func (f transformRenameKeyNodeFactory) Category() string { return NODE_CATEGORY_TRANSFORM }
func (f transformRenameKeyNodeFactory) Labels() []string { return []string{"Success", "Failure"} }
func (f transformRenameKeyNodeFactory) Create(id string, meta Metadata) (Node, error) {
func (f transformRenameKeyNodeFactory) Create(id string, meta Properties) (Node, error) {
node := &transformRenameKeyNode{
bareNode: newBareNode(f.Name(), id, meta, f.Labels()),
}
@@ -27,7 +26,7 @@ func (f transformRenameKeyNodeFactory) Create(id string, meta Metadata) (Node, e
}
func (n *transformRenameKeyNode) Handle(msg *message.Message) error {
logrus.Infof("%s handle message '%s'", n.Name(), msg.MsgType)
n.Debug(msg, message.DEBUGIN, "")
successLabelNode := n.GetLinkedNode("Success")
failureLabelNode := n.GetLinkedNode("Failure")
@@ -51,10 +50,12 @@ func (n *transformRenameKeyNode) Handle(msg *message.Message) error {
}
} else {
if failureLabelNode != nil {
n.Debug(msg, message.DEBUGOUT, "未识别FormType")
return failureLabelNode.Handle(msg)
}
}
if successLabelNode != nil {
n.Debug(msg, message.DEBUGOUT, "")
return successLabelNode.Handle(msg)
}
return nil

View File

@@ -1,7 +1,6 @@
package nodes
import (
"github.com/sirupsen/logrus"
"pandax/pkg/rule_engine/message"
)
@@ -15,7 +14,7 @@ type transformScriptNodeFactory struct{}
func (f transformScriptNodeFactory) Name() string { return "ScriptKeyNode" }
func (f transformScriptNodeFactory) Category() string { return NODE_CATEGORY_TRANSFORM }
func (f transformScriptNodeFactory) Labels() []string { return []string{"Success", "Failure"} }
func (f transformScriptNodeFactory) Create(id string, meta Metadata) (Node, error) {
func (f transformScriptNodeFactory) Create(id string, meta Properties) (Node, error) {
node := &transformScriptNode{
bareNode: newBareNode(f.Name(), id, meta, f.Labels()),
}
@@ -23,7 +22,7 @@ func (f transformScriptNodeFactory) Create(id string, meta Metadata) (Node, erro
}
func (n *transformScriptNode) Handle(msg *message.Message) error {
logrus.Infof("%s handle message '%s'", n.Name(), msg.MsgType)
n.Debug(msg, message.DEBUGIN, "")
successLabelNode := n.GetLinkedNode("Success")
failureLabelNode := n.GetLinkedNode("Failure")
@@ -31,6 +30,7 @@ func (n *transformScriptNode) Handle(msg *message.Message) error {
scriptEngine := NewScriptEngine(*msg, "Transform", n.Script)
newMessage, err := scriptEngine.ScriptOnMessage()
if err != nil {
n.Debug(msg, message.DEBUGOUT, err.Error())
if failureLabelNode != nil {
return failureLabelNode.Handle(msg)
} else {
@@ -38,6 +38,7 @@ func (n *transformScriptNode) Handle(msg *message.Message) error {
}
}
if successLabelNode != nil {
n.Debug(msg, message.DEBUGOUT, "")
return successLabelNode.Handle(newMessage)
}
return nil