diff --git a/.idea/vcs.xml b/.idea/vcs.xml index b6b1c18..94a25f7 100644 --- a/.idea/vcs.xml +++ b/.idea/vcs.xml @@ -2,6 +2,5 @@ - \ No newline at end of file diff --git a/pkg/rule_engine/engine_data.go b/pkg/rule_engine/engine_data.go new file mode 100644 index 0000000..e993d53 --- /dev/null +++ b/pkg/rule_engine/engine_data.go @@ -0,0 +1,33 @@ +package rule_engine + +import ( + "pandax/pkg/rule_engine/message" + "pandax/pkg/rule_engine/nodes" +) + +func GetCategory() []map[string]interface{} { + return nodes.GetCategory() +} + +func GetDebugData(ruleId, nodeId string) []message.DebugData { + if data, ok := ruleChainDebugData.Data[ruleId]; ok { + return data.Get(nodeId).Items + } + return nil +} + +func GetDebugDataPage(page, pageSize int, ruleId, nodeId string) (int64, []message.DebugData) { + if page < 1 { + page = 1 + } + offset := pageSize * (page - 1) + if data, ok := ruleChainDebugData.Data[ruleId]; ok { + total := len(data.Get(nodeId).Items) + end := offset + pageSize + if end >= total { + end = total - 1 + } + return int64(total), data.Get(nodeId).Items[offset:end] + } + return 0, nil +} diff --git a/pkg/rule_engine/instance.go b/pkg/rule_engine/instance.go index f2c86ba..e93211b 100644 --- a/pkg/rule_engine/instance.go +++ b/pkg/rule_engine/instance.go @@ -3,17 +3,21 @@ package rule_engine import ( "context" "github.com/sirupsen/logrus" + "pandax/pkg/global" "pandax/pkg/rule_engine/manifest" "pandax/pkg/rule_engine/message" "pandax/pkg/rule_engine/nodes" ) +var ruleChainDebugData = message.NewRuleChainDebugData(100) + type ruleChainInstance struct { + ruleId string firstRuleNodeId string nodes map[string]nodes.Node } -func NewRuleChainInstance(data []byte) (*ruleChainInstance, []error) { +func NewRuleChainInstance(ruleId string, data []byte) (*ruleChainInstance, []error) { errors := make([]error, 0) manifest, err := manifest.New(data) @@ -22,7 +26,12 @@ func NewRuleChainInstance(data []byte) (*ruleChainInstance, []error) { logrus.WithError(err).Errorf("invalidi manifest file") return nil, errors } - return newInstanceWithManifest(manifest) + withManifest, errs := newInstanceWithManifest(manifest) + if len(errs) > 0 { + return nil, errs + } + withManifest.ruleId = ruleId + return withManifest, nil } // newWithManifest create rule chain by user's manifest file @@ -42,8 +51,24 @@ func newInstanceWithManifest(m *manifest.Manifest) (*ruleChainInstance, []error) // StartRuleChain TODO 是否需要添加context func (c *ruleChainInstance) StartRuleChain(context context.Context, message *message.Message) error { + // 处理debug的通道消息 + go func() { + for { + select { + case debugMsg := <-message.DeBugChan: + // 保存到tdengine时序数据库中 + ruleChainDebugData.Add(c.ruleId, debugMsg.NodeId, debugMsg) + case <-message.EndDeBugChan: + global.Log.Debug("规则链%s,执行结束", message.Id) + return + } + } + + }() if node, found := c.nodes[c.firstRuleNodeId]; found { - return node.Handle(message) + err := node.Handle(message) + message.EndDeBugChan <- struct{}{} + return err } return nil } diff --git a/pkg/rule_engine/message/message.go b/pkg/rule_engine/message/message.go index e292f49..c95e2b3 100644 --- a/pkg/rule_engine/message/message.go +++ b/pkg/rule_engine/message/message.go @@ -3,6 +3,7 @@ package message import ( "encoding/json" "github.com/google/uuid" + "github.com/sirupsen/logrus" "time" ) @@ -26,30 +27,59 @@ const ( MONITOR = "MONITOR" //监控 ) +const ( + DEBUGIN = "In" + DEBUGOUT = "Out" +) + type Msg map[string]interface{} type Metadata map[string]interface{} type Message struct { - Id string //uuid 消息Id - Ts time.Time //时间戳 - MsgType string //消息类型, attributes(参数),telemetry(遥测),Connect连接事件 - User string //客户 设备发布人 设备所有者 - Msg Msg //数据 数据结构JSON 设备原始数据 msg - Metadata Metadata //消息的元数据 包括设备Id,设备类型,产品ID等 + Id string //uuid 消息Id + Ts time.Time //时间戳 + MsgType string //消息类型, attributes(参数),telemetry(遥测),Connect连接事件 + User string //客户 设备发布人 设备所有者 + Msg Msg //数据 数据结构JSON 设备原始数据 msg + Metadata Metadata //消息的元数据 包括设备Id,设备类型,产品ID等 + DeBugChan chan DebugData + EndDeBugChan chan struct{} } // NewMessage ... func NewMessage(user, messageType string, msg Msg, metadata Metadata) *Message { return &Message{ - Id: uuid.New().String(), - Ts: time.Now(), - User: user, - MsgType: messageType, - Msg: msg, - Metadata: metadata, + Id: uuid.New().String(), + Ts: time.Now(), + User: user, + MsgType: messageType, + Msg: msg, + Metadata: metadata, + DeBugChan: make(chan DebugData, 100), + EndDeBugChan: make(chan struct{}), } } +func (t *Message) Debug(nodeId, nodeName, debugType, error string) { + if debugType == DEBUGIN { + logrus.Infof("%s handle message '%s'", nodeName, t.MsgType) + } + debug := DebugData{ + Ts: time.Now().Format("2006-01-02 15:04:05.000"), + NodeId: nodeId, + MsgId: t.Id, + DebugType: debugType, + MsgType: t.MsgType, + Msg: t.Msg, + Metadata: t.Metadata, + Error: error, + } + if deviceName, ok := t.Metadata.GetValue("deviceName").(string); ok { + debug.DeviceName = deviceName + } + t.DeBugChan <- debug +} + func (t *Message) GetAllMap() map[string]interface{} { data := make(map[string]interface{}) for msgKey, msgValue := range t.Msg { diff --git a/pkg/rule_engine/nodes/action_clear_alarm_node.go b/pkg/rule_engine/nodes/action_clear_alarm_node.go index 45f6bc8..b1b4dfa 100644 --- a/pkg/rule_engine/nodes/action_clear_alarm_node.go +++ b/pkg/rule_engine/nodes/action_clear_alarm_node.go @@ -2,7 +2,6 @@ package nodes import ( "encoding/json" - "github.com/sirupsen/logrus" "pandax/apps/device/services" "pandax/pkg/global" "pandax/pkg/rule_engine/message" @@ -21,7 +20,7 @@ type clearAlarmNode struct { func (f clearAlarmNodeFactory) Name() string { return ClearAlarmNodeName } func (f clearAlarmNodeFactory) Category() string { return NODE_CATEGORY_ACTION } func (f clearAlarmNodeFactory) Labels() []string { return []string{"Cleared", "Failure"} } -func (f clearAlarmNodeFactory) Create(id string, meta Metadata) (Node, error) { +func (f clearAlarmNodeFactory) Create(id string, meta Properties) (Node, error) { node := &clearAlarmNode{ bareNode: newBareNode(f.Name(), id, meta, f.Labels()), } @@ -29,28 +28,30 @@ func (f clearAlarmNodeFactory) Create(id string, meta Metadata) (Node, error) { } func (n *clearAlarmNode) Handle(msg *message.Message) error { - logrus.Infof("%s handle message '%s'", n.Name(), msg.MsgType) + n.Debug(msg, message.DEBUGIN, "") + cleared := n.GetLinkedNode("Cleared") failure := n.GetLinkedNode("Failure") - - alarm := services.DeviceAlarmModelDao.FindOneByType(msg.Metadata.GetValue("deviceId").(string), n.AlarmType, "0") - if alarm.DeviceId != "" { + var err error + alarm, err := services.DeviceAlarmModelDao.FindOneByType(msg.Metadata.GetValue("deviceId").(string), n.AlarmType, "0") + if err == nil { alarm.State = global.CLEARED marshal, _ := json.Marshal(msg.Msg) alarm.Details = string(marshal) - err := services.DeviceAlarmModelDao.Update(*alarm) - if err != nil { - if failure != nil { - return failure.Handle(msg) - } - } else { + err = services.DeviceAlarmModelDao.Update(*alarm) + if err == nil { if cleared != nil { + n.Debug(msg, message.DEBUGOUT, "") return cleared.Handle(msg) } } - } else { + } + if err != nil { + n.Debug(msg, message.DEBUGOUT, err.Error()) if failure != nil { return failure.Handle(msg) + } else { + return err } } return nil diff --git a/pkg/rule_engine/nodes/action_create_alarm_node.go b/pkg/rule_engine/nodes/action_create_alarm_node.go index dd9bdd5..cc32e5c 100644 --- a/pkg/rule_engine/nodes/action_create_alarm_node.go +++ b/pkg/rule_engine/nodes/action_create_alarm_node.go @@ -2,7 +2,6 @@ package nodes import ( "encoding/json" - "github.com/sirupsen/logrus" "pandax/apps/device/entity" "pandax/apps/device/services" "pandax/pkg/global" @@ -22,7 +21,7 @@ type createAlarmNodeFactory struct{} func (f createAlarmNodeFactory) Name() string { return "CreateAlarmNode" } func (f createAlarmNodeFactory) Category() string { return NODE_CATEGORY_ACTION } func (f createAlarmNodeFactory) Labels() []string { return []string{"Created", "Updated", "Failure"} } -func (f createAlarmNodeFactory) Create(id string, meta Metadata) (Node, error) { +func (f createAlarmNodeFactory) Create(id string, meta Properties) (Node, error) { node := &createAlarmNode{ bareNode: newBareNode(f.Name(), id, meta, f.Labels()), } @@ -30,23 +29,20 @@ func (f createAlarmNodeFactory) Create(id string, meta Metadata) (Node, error) { } func (n *createAlarmNode) Handle(msg *message.Message) error { - logrus.Infof("%s handle message '%s'", n.Name(), msg.MsgType) + n.Debug(msg, message.DEBUGIN, "") created := n.GetLinkedNode("Created") updated := n.GetLinkedNode("Updated") failure := n.GetLinkedNode("Failure") - - alarm := services.DeviceAlarmModelDao.FindOneByType(msg.Metadata.GetValue("deviceId").(string), n.AlarmType, "0") - if alarm.DeviceId != "" { + var err error + alarm, err := services.DeviceAlarmModelDao.FindOneByType(msg.Metadata.GetValue("deviceId").(string), n.AlarmType, "0") + if err == nil { marshal, _ := json.Marshal(msg.Msg) alarm.Details = string(marshal) - err := services.DeviceAlarmModelDao.Update(*alarm) - if err != nil { - if failure != nil { - return failure.Handle(msg) - } - } else { + err = services.DeviceAlarmModelDao.Update(*alarm) + if err == nil { if updated != nil { + n.Debug(msg, message.DEBUGOUT, "") return updated.Handle(msg) } } @@ -64,16 +60,21 @@ func (n *createAlarmNode) Handle(msg *message.Message) error { alarm.Owner = msg.Metadata.GetValue("owner").(string) marshal, _ := json.Marshal(msg.Msg) alarm.Details = string(marshal) - err := services.DeviceAlarmModelDao.Insert(*alarm) - if err != nil { - if failure != nil { - return failure.Handle(msg) - } - } else { + err = services.DeviceAlarmModelDao.Insert(*alarm) + if err == nil { if created != nil { + n.Debug(msg, message.DEBUGOUT, "") return created.Handle(msg) } } } + if err != nil { + n.Debug(msg, message.DEBUGOUT, err.Error()) + if failure != nil { + return failure.Handle(msg) + } else { + return err + } + } return nil } diff --git a/pkg/rule_engine/nodes/action_delay_node.go b/pkg/rule_engine/nodes/action_delay_node.go index f0e35b0..0d2c809 100644 --- a/pkg/rule_engine/nodes/action_delay_node.go +++ b/pkg/rule_engine/nodes/action_delay_node.go @@ -5,8 +5,6 @@ import ( "pandax/pkg/rule_engine/message" "sync" "time" - - "github.com/sirupsen/logrus" ) const DelayNodeName = "DelayNode" @@ -25,7 +23,7 @@ type delayNodeFactory struct{} func (f delayNodeFactory) Name() string { return DelayNodeName } func (f delayNodeFactory) Category() string { return NODE_CATEGORY_ACTION } func (f delayNodeFactory) Labels() []string { return []string{"Success", "Failure"} } -func (f delayNodeFactory) Create(id string, meta Metadata) (Node, error) { +func (f delayNodeFactory) Create(id string, meta Properties) (Node, error) { node := &delayNode{ bareNode: newBareNode(f.Name(), id, meta, f.Labels()), lock: sync.Mutex{}, @@ -36,8 +34,7 @@ func (f delayNodeFactory) Create(id string, meta Metadata) (Node, error) { } func (n *delayNode) Handle(msg *message.Message) error { - logrus.Infof("%s handle message '%s'", n.Name(), msg.MsgType) - + n.Debug(msg, message.DEBUGIN, "") successLabelNode := n.GetLinkedNode("Success") failureLabelNode := n.GetLinkedNode("Failure") if successLabelNode == nil || failureLabelNode == nil { diff --git a/pkg/rule_engine/nodes/action_generator_node.go b/pkg/rule_engine/nodes/action_generator_node.go deleted file mode 100644 index 84ea52c..0000000 --- a/pkg/rule_engine/nodes/action_generator_node.go +++ /dev/null @@ -1,63 +0,0 @@ -package nodes - -import ( - "github.com/sirupsen/logrus" - "pandax/pkg/rule_engine/message" - "time" -) - -type messageGeneratorNode struct { - bareNode - Script string `json:"script" yaml:"script"` - PeriodSecond int64 `json:"periodSecond" yaml:"periodSecond"` //周期 - MessageCount int64 `json:"messageCount" yaml:"messageCount"` -} - -type messageGeneratorNodeFactory struct{} - -func (f messageGeneratorNodeFactory) Name() string { return "MessageGeneratorNode" } -func (f messageGeneratorNodeFactory) Category() string { return NODE_CATEGORY_ACTION } -func (f messageGeneratorNodeFactory) Labels() []string { return []string{"Success", "Failure"} } -func (f messageGeneratorNodeFactory) Create(id string, meta Metadata) (Node, error) { - node := &messageGeneratorNode{ - bareNode: newBareNode(f.Name(), id, meta, f.Labels()), - } - return decodePath(meta, node) -} - -func (n *messageGeneratorNode) Handle(msg *message.Message) error { - logrus.Infof("%s handle message '%s'", n.Name(), msg.MsgType) - - successLabelNode := n.GetLinkedNode("Success") - failureLabelNode := n.GetLinkedNode("Failure") - - ticker := time.NewTicker(time.Duration(n.PeriodSecond) * time.Second) - count := 0 - - go func() { - for { - <-ticker.C - count++ - if int64(count) == n.MessageCount { - ticker.Stop() - return - } - scriptEngine := NewScriptEngine(*msg, "Generate", n.Script) - generate, err := scriptEngine.ScriptGenerate() - if err != nil { - if failureLabelNode != nil { - go failureLabelNode.Handle(msg) - } - return - } - msg.Msg = generate["msg"].(message.Msg) - msg.Metadata = generate["metadata"].(message.Metadata) - msg.MsgType = generate["msgType"].(string) - if successLabelNode != nil { - go successLabelNode.Handle(msg) - } - } - }() - - return nil -} diff --git a/pkg/rule_engine/nodes/action_log_node.go b/pkg/rule_engine/nodes/action_log_node.go index 6ca6953..20d7e17 100644 --- a/pkg/rule_engine/nodes/action_log_node.go +++ b/pkg/rule_engine/nodes/action_log_node.go @@ -16,7 +16,7 @@ type logNodeFactory struct{} func (f logNodeFactory) Name() string { return "LogNode" } func (f logNodeFactory) Category() string { return NODE_CATEGORY_ACTION } func (f logNodeFactory) Labels() []string { return []string{"Success", "Failure"} } -func (f logNodeFactory) Create(id string, meta Metadata) (Node, error) { +func (f logNodeFactory) Create(id string, meta Properties) (Node, error) { node := &logNode{ bareNode: newBareNode(f.Name(), id, meta, f.Labels()), } diff --git a/pkg/rule_engine/nodes/action_rpc_request_from_device_node.go b/pkg/rule_engine/nodes/action_rpc_request_from_device_node.go index bee9a3a..3aa21d4 100644 --- a/pkg/rule_engine/nodes/action_rpc_request_from_device_node.go +++ b/pkg/rule_engine/nodes/action_rpc_request_from_device_node.go @@ -2,7 +2,6 @@ package nodes import ( "errors" - "github.com/sirupsen/logrus" "pandax/iothub/client/mqttclient" "pandax/iothub/client/tcpclient" "pandax/pkg/global" @@ -20,7 +19,7 @@ type rpcRequestFromDeviceFactory struct{} func (f rpcRequestFromDeviceFactory) Name() string { return "RpcRequestFromDeviceNode" } func (f rpcRequestFromDeviceFactory) Category() string { return NODE_CATEGORY_ACTION } func (f rpcRequestFromDeviceFactory) Labels() []string { return []string{"Success", "Failure"} } -func (f rpcRequestFromDeviceFactory) Create(id string, meta Metadata) (Node, error) { +func (f rpcRequestFromDeviceFactory) Create(id string, meta Properties) (Node, error) { node := &rpcRequestFromDeviceNode{ bareNode: newBareNode(f.Name(), id, meta, f.Labels()), } @@ -28,7 +27,7 @@ func (f rpcRequestFromDeviceFactory) Create(id string, meta Metadata) (Node, err } func (n *rpcRequestFromDeviceNode) Handle(msg *message.Message) error { - logrus.Infof("%s handle message '%s'", n.Name(), msg.MsgType) + n.Debug(msg, message.DEBUGIN, "") successLableNode := n.GetLinkedNode("Success") failureLableNode := n.GetLinkedNode("Failure") @@ -42,6 +41,7 @@ func (n *rpcRequestFromDeviceNode) Handle(msg *message.Message) error { result, err := rpcp.GetRequestResult() if err != nil { if failureLableNode != nil { + n.Debug(msg, message.DEBUGOUT, err.Error()) return failureLableNode.Handle(msg) } else { return err @@ -71,6 +71,7 @@ func (n *rpcRequestFromDeviceNode) Handle(msg *message.Message) error { err = tcpclient.Send(deviceId, result) } if err != nil { + n.Debug(msg, message.DEBUGOUT, err.Error()) if failureLableNode != nil { return failureLableNode.Handle(msg) } else { @@ -78,6 +79,7 @@ func (n *rpcRequestFromDeviceNode) Handle(msg *message.Message) error { } } if successLableNode != nil { + n.Debug(msg, message.DEBUGOUT, "") return successLableNode.Handle(msg) } return nil diff --git a/pkg/rule_engine/nodes/action_rpc_request_to_device_node.go b/pkg/rule_engine/nodes/action_rpc_request_to_device_node.go index 8901313..88d4601 100644 --- a/pkg/rule_engine/nodes/action_rpc_request_to_device_node.go +++ b/pkg/rule_engine/nodes/action_rpc_request_to_device_node.go @@ -3,7 +3,6 @@ package nodes import ( "encoding/json" "errors" - "github.com/sirupsen/logrus" "pandax/iothub/client/mqttclient" "pandax/iothub/client/tcpclient" "pandax/pkg/global" @@ -21,7 +20,7 @@ type rpcRequestToDeviceNodeFactory struct{} func (f rpcRequestToDeviceNodeFactory) Name() string { return "RpcRequestToDeviceNode" } func (f rpcRequestToDeviceNodeFactory) Category() string { return NODE_CATEGORY_ACTION } func (f rpcRequestToDeviceNodeFactory) Labels() []string { return []string{"Success", "Failure"} } -func (f rpcRequestToDeviceNodeFactory) Create(id string, meta Metadata) (Node, error) { +func (f rpcRequestToDeviceNodeFactory) Create(id string, meta Properties) (Node, error) { node := &rpcRequestToDeviceNode{ bareNode: newBareNode(f.Name(), id, meta, f.Labels()), } @@ -29,7 +28,7 @@ func (f rpcRequestToDeviceNodeFactory) Create(id string, meta Metadata) (Node, e } func (n *rpcRequestToDeviceNode) Handle(msg *message.Message) error { - logrus.Infof("%s handle message '%s'", n.Name(), msg.MsgType) + n.Debug(msg, message.DEBUGIN, "") successLableNode := n.GetLinkedNode("Success") failureLableNode := n.GetLinkedNode("Failure") if msg.Msg.GetValue("method") == nil || msg.Msg.GetValue("params") == nil { @@ -60,6 +59,7 @@ func (n *rpcRequestToDeviceNode) Handle(msg *message.Message) error { err = tcpclient.Send(deviceId, string(payload)) } if err != nil { + n.Debug(msg, message.DEBUGOUT, err.Error()) if failureLableNode != nil { return failureLableNode.Handle(msg) } else { @@ -67,6 +67,7 @@ func (n *rpcRequestToDeviceNode) Handle(msg *message.Message) error { } } if successLableNode != nil { + n.Debug(msg, message.DEBUGOUT, "") return successLableNode.Handle(msg) } return nil diff --git a/pkg/rule_engine/nodes/action_save_attributes_node.go b/pkg/rule_engine/nodes/action_save_attributes_node.go index 41c6f77..20d5b5f 100644 --- a/pkg/rule_engine/nodes/action_save_attributes_node.go +++ b/pkg/rule_engine/nodes/action_save_attributes_node.go @@ -1,7 +1,6 @@ package nodes import ( - "github.com/sirupsen/logrus" "pandax/pkg/global" "pandax/pkg/rule_engine/message" ) @@ -15,7 +14,7 @@ type saveAttributesNodeFactory struct{} func (f saveAttributesNodeFactory) Name() string { return "SaveAttributesNode" } func (f saveAttributesNodeFactory) Category() string { return NODE_CATEGORY_ACTION } func (f saveAttributesNodeFactory) Labels() []string { return []string{"Success", "Failure"} } -func (f saveAttributesNodeFactory) Create(id string, meta Metadata) (Node, error) { +func (f saveAttributesNodeFactory) Create(id string, meta Properties) (Node, error) { node := &saveAttributesNode{ bareNode: newBareNode(f.Name(), id, meta, f.Labels()), } @@ -23,7 +22,7 @@ func (f saveAttributesNodeFactory) Create(id string, meta Metadata) (Node, error } func (n *saveAttributesNode) Handle(msg *message.Message) error { - logrus.Infof("%s handle message '%s'", n.Name(), msg.MsgType) + n.Debug(msg, message.DEBUGIN, "") successLabelNode := n.GetLinkedNode("Success") failureLabelNode := n.GetLinkedNode("Failure") /*if msg.MsgType != message.AttributesMes { @@ -37,11 +36,15 @@ func (n *saveAttributesNode) Handle(msg *message.Message) error { deviceName := msg.Metadata["deviceName"].(string) err := global.TdDb.InsertDevice(deviceName+"_attributes", msg.Msg) if err != nil { + n.Debug(msg, message.DEBUGOUT, err.Error()) if failureLabelNode != nil { return failureLabelNode.Handle(msg) + } else { + return err } } if successLabelNode != nil { + n.Debug(msg, message.DEBUGOUT, "") return successLabelNode.Handle(msg) } return nil diff --git a/pkg/rule_engine/nodes/action_save_timeseries_node.go b/pkg/rule_engine/nodes/action_save_timeseries_node.go index 9613cb1..35fa96d 100644 --- a/pkg/rule_engine/nodes/action_save_timeseries_node.go +++ b/pkg/rule_engine/nodes/action_save_timeseries_node.go @@ -1,7 +1,6 @@ package nodes import ( - "github.com/sirupsen/logrus" "pandax/pkg/global" "pandax/pkg/rule_engine/message" ) @@ -15,7 +14,7 @@ type saveTimeSeriesNodeFactory struct{} func (f saveTimeSeriesNodeFactory) Name() string { return "SaveTimeSeriesNode" } func (f saveTimeSeriesNodeFactory) Category() string { return NODE_CATEGORY_ACTION } func (f saveTimeSeriesNodeFactory) Labels() []string { return []string{"Success", "Failure"} } -func (f saveTimeSeriesNodeFactory) Create(id string, meta Metadata) (Node, error) { +func (f saveTimeSeriesNodeFactory) Create(id string, meta Properties) (Node, error) { node := &saveTimeSeriesNode{ bareNode: newBareNode(f.Name(), id, meta, f.Labels()), } @@ -23,7 +22,8 @@ func (f saveTimeSeriesNodeFactory) Create(id string, meta Metadata) (Node, error } func (n *saveTimeSeriesNode) Handle(msg *message.Message) error { - logrus.Infof("%s handle message '%s'", n.Name(), msg.MsgType) + n.Debug(msg, message.DEBUGIN, "") + successLabelNode := n.GetLinkedNode("Success") failureLabelNode := n.GetLinkedNode("Failure") /* if msg.MsgType != message.TelemetryMes && msg.MsgType != message.RowMes{ @@ -33,15 +33,18 @@ func (n *saveTimeSeriesNode) Handle(msg *message.Message) error { return nil } }*/ - //deviceId := msg.GetMetadata().GetValues()["deviceId"].(string) deviceName := msg.Metadata["deviceName"].(string) err := global.TdDb.InsertDevice(deviceName+"_telemetry", msg.Msg) if err != nil { + n.Debug(msg, message.DEBUGOUT, err.Error()) if failureLabelNode != nil { return failureLabelNode.Handle(msg) + } else { + return err } } if successLabelNode != nil { + n.Debug(msg, message.DEBUGOUT, "") return successLabelNode.Handle(msg) } return nil diff --git a/pkg/rule_engine/nodes/external_ding_node.go b/pkg/rule_engine/nodes/external_ding_node.go index cfd80a2..1db7ce7 100644 --- a/pkg/rule_engine/nodes/external_ding_node.go +++ b/pkg/rule_engine/nodes/external_ding_node.go @@ -7,7 +7,6 @@ import ( "encoding/json" "fmt" "github.com/PandaXGO/PandaKit/httpclient" - "github.com/sirupsen/logrus" "net/url" "pandax/pkg/rule_engine/message" "time" @@ -28,7 +27,7 @@ type externalDingNodeFactory struct{} func (f externalDingNodeFactory) Name() string { return "DingNode" } func (f externalDingNodeFactory) Category() string { return NODE_CATEGORY_EXTERNAL } func (f externalDingNodeFactory) Labels() []string { return []string{"Success", "Failure"} } -func (f externalDingNodeFactory) Create(id string, meta Metadata) (Node, error) { +func (f externalDingNodeFactory) Create(id string, meta Properties) (Node, error) { node := &externalDingNode{ bareNode: newBareNode(f.Name(), id, meta, f.Labels()), } @@ -36,7 +35,7 @@ func (f externalDingNodeFactory) Create(id string, meta Metadata) (Node, error) } func (n *externalDingNode) Handle(msg *message.Message) error { - logrus.Infof("%s handle message '%s'", n.Name(), msg.MsgType) + n.Debug(msg, message.DEBUGIN, "") successLabelNode := n.GetLinkedNode("Success") failureLabelNode := n.GetLinkedNode("Failure") @@ -63,6 +62,7 @@ func (n *externalDingNode) Handle(msg *message.Message) error { postJson := httpclient.NewRequest(url).Header("Content-Type", "application/json").PostJson(string(marshal)) if postJson.StatusCode != 200 { + n.Debug(msg, message.DEBUGOUT, "钉钉机器人hook接口请求失败") if failureLabelNode != nil { return failureLabelNode.Handle(msg) } else { @@ -70,6 +70,7 @@ func (n *externalDingNode) Handle(msg *message.Message) error { } } if successLabelNode != nil { + n.Debug(msg, message.DEBUGOUT, "") return successLabelNode.Handle(msg) } return nil diff --git a/pkg/rule_engine/nodes/external_kafka_node.go b/pkg/rule_engine/nodes/external_kafka_node.go index a235425..b65c2ef 100644 --- a/pkg/rule_engine/nodes/external_kafka_node.go +++ b/pkg/rule_engine/nodes/external_kafka_node.go @@ -24,7 +24,7 @@ type externalKafkaNodeFactory struct{} func (f externalKafkaNodeFactory) Name() string { return "KafkaNode" } func (f externalKafkaNodeFactory) Category() string { return NODE_CATEGORY_EXTERNAL } func (f externalKafkaNodeFactory) Labels() []string { return []string{"Success", "Failure"} } -func (f externalKafkaNodeFactory) Create(id string, meta Metadata) (Node, error) { +func (f externalKafkaNodeFactory) Create(id string, meta Properties) (Node, error) { node := &externalKafkaNode{ bareNode: newBareNode(f.Name(), id, meta, f.Labels()), } @@ -46,7 +46,7 @@ func (f externalKafkaNodeFactory) Create(id string, meta Metadata) (Node, error) } func (n *externalKafkaNode) Handle(msg *message.Message) error { - logrus.Infof("%s handle message '%s'", n.Name(), msg.MsgType) + n.Debug(msg, message.DEBUGIN, "") defer n.KafkaCli.Close() successLabelNode := n.GetLinkedNode("Success") failureLabelNode := n.GetLinkedNode("Failure") @@ -71,6 +71,7 @@ func (n *externalKafkaNode) Handle(msg *message.Message) error { } _, _, err := n.KafkaCli.SendMessage(kafkaM) if err != nil { + n.Debug(msg, message.DEBUGOUT, err.Error()) if failureLabelNode != nil { return failureLabelNode.Handle(msg) } else { @@ -78,6 +79,7 @@ func (n *externalKafkaNode) Handle(msg *message.Message) error { } } if successLabelNode != nil { + n.Debug(msg, message.DEBUGOUT, "") return successLabelNode.Handle(msg) } diff --git a/pkg/rule_engine/nodes/external_mqtt_node.go b/pkg/rule_engine/nodes/external_mqtt_node.go index 48527a9..22b27bd 100644 --- a/pkg/rule_engine/nodes/external_mqtt_node.go +++ b/pkg/rule_engine/nodes/external_mqtt_node.go @@ -29,7 +29,7 @@ type externalMqttNodeFactory struct{} func (f externalMqttNodeFactory) Name() string { return "MqttNode" } func (f externalMqttNodeFactory) Category() string { return NODE_CATEGORY_EXTERNAL } func (f externalMqttNodeFactory) Labels() []string { return []string{"Success", "Failure"} } -func (f externalMqttNodeFactory) Create(id string, meta Metadata) (Node, error) { +func (f externalMqttNodeFactory) Create(id string, meta Properties) (Node, error) { node := &externalMqttNode{ bareNode: newBareNode(f.Name(), id, meta, f.Labels()), } @@ -60,7 +60,7 @@ func (f externalMqttNodeFactory) Create(id string, meta Metadata) (Node, error) } func (n *externalMqttNode) Handle(msg *message.Message) error { - logrus.Infof("%s handle message '%s'", n.Name(), msg.MsgType) + n.Debug(msg, message.DEBUGIN, "") defer n.MqttCli.Disconnect(1000) successLabelNode := n.GetLinkedNode("Success") failureLabelNode := n.GetLinkedNode("Failure") @@ -71,6 +71,7 @@ func (n *externalMqttNode) Handle(msg *message.Message) error { } token := n.MqttCli.Publish(topic, 1, false, sendmqttmsg) if token.Wait() && token.Error() != nil { + n.Debug(msg, message.DEBUGOUT, token.Error().Error()) if failureLabelNode != nil { return failureLabelNode.Handle(msg) } else { @@ -78,6 +79,7 @@ func (n *externalMqttNode) Handle(msg *message.Message) error { } } if successLabelNode != nil { + n.Debug(msg, message.DEBUGOUT, "") return successLabelNode.Handle(msg) } return nil diff --git a/pkg/rule_engine/nodes/external_nats_node.go b/pkg/rule_engine/nodes/external_nats_node.go index 5298f10..113c3f2 100644 --- a/pkg/rule_engine/nodes/external_nats_node.go +++ b/pkg/rule_engine/nodes/external_nats_node.go @@ -2,7 +2,6 @@ package nodes import ( "github.com/nats-io/nats.go" - "github.com/sirupsen/logrus" "pandax/pkg/rule_engine/message" ) @@ -19,7 +18,7 @@ type externalNatsNodeFactory struct{} func (f externalNatsNodeFactory) Name() string { return "NatsNode" } func (f externalNatsNodeFactory) Category() string { return NODE_CATEGORY_EXTERNAL } func (f externalNatsNodeFactory) Labels() []string { return []string{"Success", "Failure"} } -func (f externalNatsNodeFactory) Create(id string, meta Metadata) (Node, error) { +func (f externalNatsNodeFactory) Create(id string, meta Properties) (Node, error) { node := &externalNatsNode{ bareNode: newBareNode(f.Name(), id, meta, f.Labels()), } @@ -36,7 +35,7 @@ func (f externalNatsNodeFactory) Create(id string, meta Metadata) (Node, error) } func (n *externalNatsNode) Handle(msg *message.Message) error { - logrus.Infof("%s handle message '%s'", n.Name(), msg.MsgType) + n.Debug(msg, message.DEBUGIN, "") defer n.client.Close() successLabelNode := n.GetLinkedNode("Success") failureLabelNode := n.GetLinkedNode("Failure") @@ -46,6 +45,7 @@ func (n *externalNatsNode) Handle(msg *message.Message) error { } err = n.client.Publish(n.Subject, []byte(template)) if err != nil { + n.Debug(msg, message.DEBUGOUT, err.Error()) if failureLabelNode != nil { return failureLabelNode.Handle(msg) } else { @@ -53,6 +53,7 @@ func (n *externalNatsNode) Handle(msg *message.Message) error { } } if successLabelNode != nil { + n.Debug(msg, message.DEBUGOUT, "") return successLabelNode.Handle(msg) } return nil diff --git a/pkg/rule_engine/nodes/external_restapi_node.go b/pkg/rule_engine/nodes/external_restapi_node.go index 9dff45b..32c4de2 100644 --- a/pkg/rule_engine/nodes/external_restapi_node.go +++ b/pkg/rule_engine/nodes/external_restapi_node.go @@ -4,7 +4,6 @@ import ( "encoding/json" "errors" "github.com/PandaXGO/PandaKit/httpclient" - "github.com/sirupsen/logrus" "pandax/pkg/rule_engine/message" ) @@ -20,7 +19,7 @@ type externalRestapiNodeFactory struct{} func (f externalRestapiNodeFactory) Name() string { return "RestapiNode" } func (f externalRestapiNodeFactory) Category() string { return NODE_CATEGORY_EXTERNAL } func (f externalRestapiNodeFactory) Labels() []string { return []string{"Success", "Failure"} } -func (f externalRestapiNodeFactory) Create(id string, meta Metadata) (Node, error) { +func (f externalRestapiNodeFactory) Create(id string, meta Properties) (Node, error) { node := &externalRestapiNode{ bareNode: newBareNode(f.Name(), id, meta, f.Labels()), } @@ -28,7 +27,7 @@ func (f externalRestapiNodeFactory) Create(id string, meta Metadata) (Node, erro } func (n *externalRestapiNode) Handle(msg *message.Message) error { - logrus.Infof("%s handle message '%s'", n.Name(), msg.MsgType) + n.Debug(msg, message.DEBUGIN, "") successLableNode := n.GetLinkedNode("Success") failureLableNode := n.GetLinkedNode("Failure") if n.RequestMethod == "GET" { @@ -39,6 +38,7 @@ func (n *externalRestapiNode) Handle(msg *message.Message) error { var response map[string]interface{} err := json.Unmarshal(resp.Body, &response) if err != nil && failureLableNode != nil { + n.Debug(msg, message.DEBUGOUT, err.Error()) return failureLableNode.Handle(msg) } else { if successLableNode != nil { @@ -47,6 +47,7 @@ func (n *externalRestapiNode) Handle(msg *message.Message) error { metadata.SetValue(key, value) } msg.Metadata = metadata + n.Debug(msg, message.DEBUGOUT, "") return successLableNode.Handle(msg) } } @@ -60,42 +61,15 @@ func (n *externalRestapiNode) Handle(msg *message.Message) error { resp := req.PostJson(string(binary)) if resp.StatusCode != 200 { if failureLableNode != nil { + n.Debug(msg, message.DEBUGOUT, "接口请求失败") return failureLableNode.Handle(msg) } } else { if successLableNode != nil { + n.Debug(msg, message.DEBUGOUT, "") return successLableNode.Handle(msg) } } } - /*if n.RequestMethod == "PUT" { - binary, _ := msg.MarshalBinary() - req := httpclient.NewRequest(n.RestEndpointUrlPattern) - for key,value := range n.Headers { - req.Header(key,value) - } - _, err := http.HttpPut(n.RestEndpointUrlPattern, n.Headers, nil, binary) - if err != nil { - if failureLableNode != nil { - return failureLableNode.Handle(msg) - } - } else { - if successLableNode != nil { - return successLableNode.Handle(msg) - } - } - } - if n.RequestMethod == "DELETE" { - _, err := http.HttpDelete(n.RestEndpointUrlPattern) - if err != nil { - if failureLableNode != nil { - return failureLableNode.Handle(msg) - } - } else { - if successLableNode != nil { - return successLableNode.Handle(msg) - } - } - }*/ return nil } diff --git a/pkg/rule_engine/nodes/external_rule_chain_node.go b/pkg/rule_engine/nodes/external_rule_chain_node.go index d099c71..ec68f4e 100644 --- a/pkg/rule_engine/nodes/external_rule_chain_node.go +++ b/pkg/rule_engine/nodes/external_rule_chain_node.go @@ -19,7 +19,7 @@ type externalRuleChainNodeFactory struct{} func (f externalRuleChainNodeFactory) Name() string { return "RuleChainNode" } func (f externalRuleChainNodeFactory) Category() string { return NODE_CATEGORY_FLOWS } func (f externalRuleChainNodeFactory) Labels() []string { return []string{} } -func (f externalRuleChainNodeFactory) Create(id string, meta Metadata) (Node, error) { +func (f externalRuleChainNodeFactory) Create(id string, meta Properties) (Node, error) { node := &externalRuleChainNode{ bareNode: newBareNode(f.Name(), id, meta, f.Labels()), } diff --git a/pkg/rule_engine/nodes/external_send_email_node.go b/pkg/rule_engine/nodes/external_send_email_node.go index 167b954..ab16908 100644 --- a/pkg/rule_engine/nodes/external_send_email_node.go +++ b/pkg/rule_engine/nodes/external_send_email_node.go @@ -3,7 +3,6 @@ package nodes import ( "crypto/tls" "fmt" - "github.com/sirupsen/logrus" "net/smtp" "pandax/pkg/rule_engine/message" "strings" @@ -30,7 +29,7 @@ type externalSendEmailNodeFactory struct{} func (f externalSendEmailNodeFactory) Name() string { return "SendEmailNode" } func (f externalSendEmailNodeFactory) Category() string { return NODE_CATEGORY_EXTERNAL } func (f externalSendEmailNodeFactory) Labels() []string { return []string{"Success", "Failure"} } -func (f externalSendEmailNodeFactory) Create(id string, meta Metadata) (Node, error) { +func (f externalSendEmailNodeFactory) Create(id string, meta Properties) (Node, error) { node := &externalSendEmailNode{ bareNode: newBareNode(f.Name(), id, meta, f.Labels()), @@ -39,7 +38,7 @@ func (f externalSendEmailNodeFactory) Create(id string, meta Metadata) (Node, er } func (n *externalSendEmailNode) Handle(msg *message.Message) error { - logrus.Infof("%s handle message '%s'", n.Name(), msg.MsgType) + n.Debug(msg, message.DEBUGIN, "") successLabelNode := n.GetLinkedNode("Success") failureLabelNode := n.GetLinkedNode("Failure") @@ -50,6 +49,7 @@ func (n *externalSendEmailNode) Handle(msg *message.Message) error { } err := n.send(tos, *msg) if err != nil { + n.Debug(msg, message.DEBUGOUT, err.Error()) if failureLabelNode != nil { return failureLabelNode.Handle(msg) } else { @@ -57,6 +57,7 @@ func (n *externalSendEmailNode) Handle(msg *message.Message) error { } } if successLabelNode != nil { + n.Debug(msg, message.DEBUGOUT, "") return successLabelNode.Handle(msg) } return nil diff --git a/pkg/rule_engine/nodes/external_send_sms_node.go b/pkg/rule_engine/nodes/external_send_sms_node.go index e0a6e5a..b2b82f0 100644 --- a/pkg/rule_engine/nodes/external_send_sms_node.go +++ b/pkg/rule_engine/nodes/external_send_sms_node.go @@ -1,7 +1,6 @@ package nodes import ( - "github.com/sirupsen/logrus" "pandax/pkg/rule_engine/message" ) @@ -20,7 +19,7 @@ type externalSendSmsNodeFactory struct{} func (f externalSendSmsNodeFactory) Name() string { return "SendSmsNode" } func (f externalSendSmsNodeFactory) Category() string { return NODE_CATEGORY_EXTERNAL } func (f externalSendSmsNodeFactory) Labels() []string { return []string{"Success", "Failure"} } -func (f externalSendSmsNodeFactory) Create(id string, meta Metadata) (Node, error) { +func (f externalSendSmsNodeFactory) Create(id string, meta Properties) (Node, error) { node := &externalSendSmsNode{ bareNode: newBareNode(f.Name(), id, meta, f.Labels()), } @@ -28,10 +27,11 @@ func (f externalSendSmsNodeFactory) Create(id string, meta Metadata) (Node, erro } func (n *externalSendSmsNode) Handle(msg *message.Message) error { - logrus.Infof("%s handle message '%s'", n.Name(), msg.MsgType) + n.Debug(msg, message.DEBUGIN, "") successLabelNode := n.GetLinkedNode("Success") //failureLabelNode := n.GetLinkedNode("Failure") + n.Debug(msg, message.DEBUGOUT, "") return successLabelNode.Handle(msg) } diff --git a/pkg/rule_engine/nodes/external_wechat_node.go b/pkg/rule_engine/nodes/external_wechat_node.go index 5bf4a3e..cdb8841 100644 --- a/pkg/rule_engine/nodes/external_wechat_node.go +++ b/pkg/rule_engine/nodes/external_wechat_node.go @@ -3,7 +3,6 @@ package nodes import ( "encoding/json" "github.com/PandaXGO/PandaKit/httpclient" - "github.com/sirupsen/logrus" "pandax/pkg/rule_engine/message" ) @@ -21,7 +20,7 @@ type externalWechatNodeFactory struct{} func (f externalWechatNodeFactory) Name() string { return "WechatNode" } func (f externalWechatNodeFactory) Category() string { return NODE_CATEGORY_EXTERNAL } func (f externalWechatNodeFactory) Labels() []string { return []string{"Success", "Failure"} } -func (f externalWechatNodeFactory) Create(id string, meta Metadata) (Node, error) { +func (f externalWechatNodeFactory) Create(id string, meta Properties) (Node, error) { node := &externalWechatNode{ bareNode: newBareNode(f.Name(), id, meta, f.Labels()), } @@ -29,11 +28,12 @@ func (f externalWechatNodeFactory) Create(id string, meta Metadata) (Node, error } func (n *externalWechatNode) Handle(msg *message.Message) error { - logrus.Infof("%s handle message '%s'", n.Name(), msg.MsgType) + n.Debug(msg, message.DEBUGIN, "") successLabelNode := n.GetLinkedNode("Success") failureLabelNode := n.GetLinkedNode("Failure") template, err := ParseTemplate(n.Content, msg.GetAllMap()) + sendData := map[string]interface{}{ "msgtype": "text", "text": map[string]interface{}{"content": template}, @@ -46,6 +46,7 @@ func (n *externalWechatNode) Handle(msg *message.Message) error { marshal, _ := json.Marshal(sendData) postJson := httpclient.NewRequest(n.WebHook).Header("Content-Type", "application/json").PostJson(string(marshal)) if postJson.StatusCode != 200 { + n.Debug(msg, message.DEBUGOUT, "请求微信机器人hook接口失败") if failureLabelNode != nil { return failureLabelNode.Handle(msg) } else { @@ -53,6 +54,7 @@ func (n *externalWechatNode) Handle(msg *message.Message) error { } } if successLabelNode != nil { + n.Debug(msg, message.DEBUGOUT, "") return successLabelNode.Handle(msg) } return nil diff --git a/pkg/rule_engine/nodes/factory.go b/pkg/rule_engine/nodes/factory.go index 042b241..cc3551d 100644 --- a/pkg/rule_engine/nodes/factory.go +++ b/pkg/rule_engine/nodes/factory.go @@ -18,7 +18,7 @@ type Factory interface { Name() string Category() string Labels() []string - Create(id string, meta Metadata) (Node, error) + Create(id string, meta Properties) (Node, error) } var ( @@ -41,7 +41,7 @@ func RegisterFactory(f Factory) { } // NewNode is the only way to create a new node -func NewNode(nodeType string, id string, meta Metadata) (Node, error) { +func NewNode(nodeType string, id string, meta Properties) (Node, error) { if f, found := allNodeFactories[nodeType]; found { return f.Create(id, meta) } diff --git a/pkg/rule_engine/nodes/filter_device_type_switch_node.go b/pkg/rule_engine/nodes/filter_device_type_switch_node.go index b0b821e..aa82b38 100644 --- a/pkg/rule_engine/nodes/filter_device_type_switch_node.go +++ b/pkg/rule_engine/nodes/filter_device_type_switch_node.go @@ -1,7 +1,6 @@ package nodes import ( - "github.com/sirupsen/logrus" "pandax/pkg/rule_engine/message" ) @@ -18,7 +17,7 @@ func (f deviceTypeSwitchNodeFactory) Category() string { return NODE_CATEGORY_FI func (f deviceTypeSwitchNodeFactory) Labels() []string { return []string{message.DEVICE, message.GATEWAY} } -func (f deviceTypeSwitchNodeFactory) Create(id string, meta Metadata) (Node, error) { +func (f deviceTypeSwitchNodeFactory) Create(id string, meta Properties) (Node, error) { node := &deviceTypeSwitchNode{ bareNode: newBareNode(f.Name(), id, meta, f.Labels()), } @@ -26,20 +25,23 @@ func (f deviceTypeSwitchNodeFactory) Create(id string, meta Metadata) (Node, err } func (n *deviceTypeSwitchNode) Handle(msg *message.Message) error { - logrus.Infof("%s handle message '%s'", n.Name(), msg.MsgType) + n.Debug(msg, message.DEBUGIN, "") deviceLabelNode := n.GetLinkedNode(message.DEVICE) gatewayLabelNode := n.GetLinkedNode(message.GATEWAY) - if msg.Metadata.GetValue("deviceType").(string) == message.DEVICE { + deviceType := msg.Metadata.GetValue("deviceType").(string) + if deviceType == message.DEVICE { if deviceLabelNode != nil { + n.Debug(msg, message.DEBUGOUT, "") return deviceLabelNode.Handle(msg) } - } - if msg.Metadata.GetValue("deviceType").(string) == message.GATEWAY { + } else if deviceType == message.GATEWAY { if gatewayLabelNode != nil { + n.Debug(msg, message.DEBUGOUT, "") return gatewayLabelNode.Handle(msg) } } + n.Debug(msg, message.DEBUGOUT, "没有匹配的设备类型") return nil } diff --git a/pkg/rule_engine/nodes/filter_message_type_node.go b/pkg/rule_engine/nodes/filter_message_type_node.go index 30b2ac2..df4b9bf 100644 --- a/pkg/rule_engine/nodes/filter_message_type_node.go +++ b/pkg/rule_engine/nodes/filter_message_type_node.go @@ -1,7 +1,6 @@ package nodes import ( - "github.com/sirupsen/logrus" "pandax/pkg/rule_engine/message" ) @@ -16,7 +15,7 @@ func (f messageTypeFilterNodeFactory) Name() string { return "MessageTypeNod func (f messageTypeFilterNodeFactory) Category() string { return NODE_CATEGORY_FILTER } func (f messageTypeFilterNodeFactory) Labels() []string { return []string{"True", "False"} } -func (f messageTypeFilterNodeFactory) Create(id string, meta Metadata) (Node, error) { +func (f messageTypeFilterNodeFactory) Create(id string, meta Properties) (Node, error) { node := &messageTypeFilterNode{ bareNode: newBareNode(f.Name(), id, meta, f.Labels()), MessageTypes: []string{}, @@ -25,19 +24,30 @@ func (f messageTypeFilterNodeFactory) Create(id string, meta Metadata) (Node, er } func (n *messageTypeFilterNode) Handle(msg *message.Message) error { - logrus.Infof("%s handle message '%s'", n.Name(), msg.MsgType) + n.Debug(msg, message.DEBUGIN, "") trueLabelNode := n.GetLinkedNode("True") falseLabelNode := n.GetLinkedNode("False") - messageType := msg.MsgType - for _, filterType := range n.MessageTypes { - if filterType == messageType && trueLabelNode != nil { + if n.containsType(msg.MsgType) { + if trueLabelNode != nil { + n.Debug(msg, message.DEBUGOUT, "") return trueLabelNode.Handle(msg) } - } - if falseLabelNode != nil { - return falseLabelNode.Handle(msg) + } else { + if falseLabelNode != nil { + n.Debug(msg, message.DEBUGOUT, "不包含消息类型") + return falseLabelNode.Handle(msg) + } } return nil } + +func (n *messageTypeFilterNode) containsType(messageType string) bool { + for _, filterType := range n.MessageTypes { + if filterType == messageType { + return true + } + } + return false +} diff --git a/pkg/rule_engine/nodes/filter_message_type_switch_node.go b/pkg/rule_engine/nodes/filter_message_type_switch_node.go index 5fa84c6..f2757f0 100644 --- a/pkg/rule_engine/nodes/filter_message_type_switch_node.go +++ b/pkg/rule_engine/nodes/filter_message_type_switch_node.go @@ -1,7 +1,6 @@ package nodes import ( - "github.com/sirupsen/logrus" "pandax/pkg/rule_engine/message" ) @@ -26,7 +25,7 @@ func (f messageTypeSwitchNodeFactory) Labels() []string { message.DisConnectMes, } } -func (f messageTypeSwitchNodeFactory) Create(id string, meta Metadata) (Node, error) { +func (f messageTypeSwitchNodeFactory) Create(id string, meta Properties) (Node, error) { node := &messageTypeSwitchNode{ bareNode: newBareNode(f.Name(), id, meta, f.Labels()), } @@ -34,7 +33,7 @@ func (f messageTypeSwitchNodeFactory) Create(id string, meta Metadata) (Node, er } func (n *messageTypeSwitchNode) Handle(msg *message.Message) error { - logrus.Infof("%s handle message '%s'", n.Name(), msg.MsgType) + n.Debug(msg, message.DEBUGIN, "") nodes := n.GetLinkedNodes() messageType := msg.MsgType for label, node := range nodes { @@ -42,5 +41,6 @@ func (n *messageTypeSwitchNode) Handle(msg *message.Message) error { return node.Handle(msg) } } + n.Debug(msg, message.DEBUGOUT, "消息类型不正确") return nil } diff --git a/pkg/rule_engine/nodes/filter_script_node.go b/pkg/rule_engine/nodes/filter_script_node.go index 369b48a..0e79165 100644 --- a/pkg/rule_engine/nodes/filter_script_node.go +++ b/pkg/rule_engine/nodes/filter_script_node.go @@ -1,7 +1,6 @@ package nodes import ( - "github.com/sirupsen/logrus" "pandax/pkg/rule_engine/message" ) @@ -16,8 +15,8 @@ type scriptFilterNodeFactory struct{} func (f scriptFilterNodeFactory) Name() string { return ScriptFilterNodeName } func (f scriptFilterNodeFactory) Category() string { return NODE_CATEGORY_FILTER } -func (f scriptFilterNodeFactory) Labels() []string { return []string{"True", "False"} } -func (f scriptFilterNodeFactory) Create(id string, meta Metadata) (Node, error) { +func (f scriptFilterNodeFactory) Labels() []string { return []string{"True", "False", "Failure"} } +func (f scriptFilterNodeFactory) Create(id string, meta Properties) (Node, error) { node := &scriptFilterNode{ bareNode: newBareNode(f.Name(), id, meta, f.Labels()), } @@ -25,16 +24,26 @@ func (f scriptFilterNodeFactory) Create(id string, meta Metadata) (Node, error) } func (n *scriptFilterNode) Handle(msg *message.Message) error { - logrus.Infof("%s handle message '%s'", n.Name(), msg.MsgType) - + n.Debug(msg, message.DEBUGIN, "") trueLabelNode := n.GetLinkedNode("True") falseLabelNode := n.GetLinkedNode("False") + failureLabelNode := n.GetLinkedNode("Failure") + scriptEngine := NewScriptEngine(*msg, "Filter", n.Script) - isTrue, error := scriptEngine.ScriptOnFilter() - if isTrue == true && error == nil && trueLabelNode != nil { + isTrue, err := scriptEngine.ScriptOnFilter() + if err != nil { + if failureLabelNode != nil { + n.Debug(msg, message.DEBUGOUT, err.Error()) + return failureLabelNode.Handle(msg) + } + } + + if isTrue == true && trueLabelNode != nil { + n.Debug(msg, message.DEBUGOUT, "") return trueLabelNode.Handle(msg) } else { if falseLabelNode != nil { + n.Debug(msg, message.DEBUGOUT, "Script脚本执行失败") return falseLabelNode.Handle(msg) } } diff --git a/pkg/rule_engine/nodes/filter_switch_node.go b/pkg/rule_engine/nodes/filter_switch_node.go index 62f1db5..d97e1a8 100644 --- a/pkg/rule_engine/nodes/filter_switch_node.go +++ b/pkg/rule_engine/nodes/filter_switch_node.go @@ -1,7 +1,6 @@ package nodes import ( - "github.com/sirupsen/logrus" "pandax/pkg/rule_engine/message" ) @@ -15,20 +14,9 @@ type switchFilterNodeFactory struct{} func (f switchFilterNodeFactory) Name() string { return "SwitchNode" } func (f switchFilterNodeFactory) Category() string { return NODE_CATEGORY_FILTER } func (f switchFilterNodeFactory) Labels() []string { - return []string{ - "True", "False", - message.RowMes, - message.AttributesMes, - message.TelemetryMes, - message.RpcRequestFromDevice, - message.RpcRequestToDevice, - message.AlarmMes, - message.UpEventMes, - message.ConnectMes, - message.DisConnectMes, - } + return []string{"Failure"} } -func (f switchFilterNodeFactory) Create(id string, meta Metadata) (Node, error) { +func (f switchFilterNodeFactory) Create(id string, meta Properties) (Node, error) { node := &switchFilterNode{ bareNode: newBareNode(f.Name(), id, meta, f.Labels()), } @@ -36,12 +24,15 @@ func (f switchFilterNodeFactory) Create(id string, meta Metadata) (Node, error) } func (n *switchFilterNode) Handle(msg *message.Message) error { - logrus.Infof("%s handle message '%s'", n.Name(), msg.MsgType) + n.Debug(msg, message.DEBUGIN, "") + + failureLabelNode := n.GetLinkedNode("Failure") scriptEngine := NewScriptEngine(*msg, "Switch", n.Script) SwitchResults, err := scriptEngine.ScriptOnSwitch() if err != nil { - return err + n.Debug(msg, message.DEBUGOUT, err.Error()) + return failureLabelNode.Handle(msg) } nodes := n.GetLinkedNodes() for label, node := range nodes { @@ -51,5 +42,6 @@ func (n *switchFilterNode) Handle(msg *message.Message) error { } } } + n.Debug(msg, message.DEBUGOUT, "") return nil } diff --git a/pkg/rule_engine/nodes/input_node.go b/pkg/rule_engine/nodes/input_node.go index 34e1211..74763f2 100644 --- a/pkg/rule_engine/nodes/input_node.go +++ b/pkg/rule_engine/nodes/input_node.go @@ -16,7 +16,7 @@ type inputNodeFactory struct{} func (f inputNodeFactory) Name() string { return "InputNode" } func (f inputNodeFactory) Category() string { return NODE_CATEGORY_OTHERS } func (f inputNodeFactory) Labels() []string { return []string{"True"} } -func (f inputNodeFactory) Create(id string, meta Metadata) (Node, error) { +func (f inputNodeFactory) Create(id string, meta Properties) (Node, error) { node := &inputNode{ bareNode: newBareNode(InputNodeName, id, meta, f.Labels()), } diff --git a/pkg/rule_engine/nodes/node.go b/pkg/rule_engine/nodes/node.go index 28ed96e..92fb4fa 100644 --- a/pkg/rule_engine/nodes/node.go +++ b/pkg/rule_engine/nodes/node.go @@ -10,7 +10,7 @@ import ( type Node interface { Name() string Id() string - Metadata() Metadata + Properties() Properties MustLabels() []string Handle(*message.Message) error @@ -23,11 +23,11 @@ type bareNode struct { name string id string nodes map[string]Node - meta Metadata + meta Properties labels []string } -func newBareNode(name string, id string, meta Metadata, labels []string) bareNode { +func newBareNode(name string, id string, meta Properties, labels []string) bareNode { return bareNode{ name: name, id: id, @@ -52,11 +52,23 @@ func (n *bareNode) GetLinkedNode(label string) Node { func (n *bareNode) GetLinkedNodes() map[string]Node { return n.nodes } -func (n *bareNode) Metadata() Metadata { return n.meta } +func (n *bareNode) Properties() Properties { return n.meta } func (n *bareNode) Handle(*message.Message) error { return errors.New("not implemented") } -func decodePath(meta Metadata, n Node) (Node, error) { +func (n *bareNode) Debug(msg *message.Message, debugType, error string) { + value, err := n.meta.Value("debugMode") + if err != nil { + return + } + if debugMode, ok := value.(bool); ok { + if debugMode { + msg.Debug(n.id, n.name, debugType, error) + } + } +} + +func decodePath(meta Properties, n Node) (Node, error) { if err := meta.DecodePath(n); err != nil { return n, err } @@ -67,8 +79,8 @@ func GetNodes(m *manifest.Manifest) (map[string]Node, error) { nodes := make(map[string]Node) // Create All nodes for _, n := range m.Nodes { - metadata := NewMetadataWithValues(n.Properties) - node, err := NewNode(n.Type, n.Id, metadata) + propertie := NewPropertiesWithValues(n.Properties) + node, err := NewNode(n.Type, n.Id, propertie) if err != nil { logrus.Errorf("new node '%s' failure", n.Id) continue diff --git a/pkg/rule_engine/nodes/metadata.go b/pkg/rule_engine/nodes/properties.go similarity index 54% rename from pkg/rule_engine/nodes/metadata.go rename to pkg/rule_engine/nodes/properties.go index a723021..3e3b031 100644 --- a/pkg/rule_engine/nodes/metadata.go +++ b/pkg/rule_engine/nodes/properties.go @@ -10,35 +10,35 @@ const ( NODE_CONFIG_ORIGINATOR_TYPE_KEY = "originatorTypeKey" ) -// Metadata 前端 参数 Properties -type Metadata interface { +// Properties 前端 参数 Properties +type Properties interface { Keys() []string - With(key string, val interface{}) Metadata + With(key string, val interface{}) Properties Value(key string) (interface{}, error) DecodePath(rawVal interface{}) error } -type nodeMetadata struct { +type nodeProperties struct { keypairs map[string]interface{} } -func NewMetadata() Metadata { - return &nodeMetadata{ +func NewProperties() Properties { + return &nodeProperties{ keypairs: make(map[string]interface{}), } } -func NewMetadataWithString(vals string) Metadata { - return &nodeMetadata{} +func NewPropertiesWithString(vals string) Properties { + return &nodeProperties{} } -func NewMetadataWithValues(vals map[string]interface{}) Metadata { - return &nodeMetadata{ +func NewPropertiesWithValues(vals map[string]interface{}) Properties { + return &nodeProperties{ keypairs: vals, } } -func (c *nodeMetadata) Keys() []string { +func (c *nodeProperties) Keys() []string { keys := []string{} for key, _ := range c.keypairs { keys = append(keys, key) @@ -46,19 +46,19 @@ func (c *nodeMetadata) Keys() []string { return keys } -func (c *nodeMetadata) Value(key string) (interface{}, error) { +func (c *nodeProperties) Value(key string) (interface{}, error) { if val, found := c.keypairs[key]; found { return val, nil } return nil, fmt.Errorf("key '%s' not found", key) } -func (c *nodeMetadata) With(key string, val interface{}) Metadata { +func (c *nodeProperties) With(key string, val interface{}) Properties { c.keypairs[key] = val return c } -func (c *nodeMetadata) DecodePath(rawVal interface{}) error { +func (c *nodeProperties) DecodePath(rawVal interface{}) error { //return utils.Map2Struct(c.keypairs, rawVal) return mapstructure.Decode(c.keypairs, rawVal) } diff --git a/pkg/rule_engine/nodes/transform_delete_key_node.go b/pkg/rule_engine/nodes/transform_delete_key_node.go index 7e45be5..101070f 100644 --- a/pkg/rule_engine/nodes/transform_delete_key_node.go +++ b/pkg/rule_engine/nodes/transform_delete_key_node.go @@ -1,7 +1,6 @@ package nodes import ( - "github.com/sirupsen/logrus" "pandax/pkg/rule_engine/message" "strings" ) @@ -16,7 +15,7 @@ type transformDeleteKeyNodeFactory struct{} func (f transformDeleteKeyNodeFactory) Name() string { return "DeleteKeyNode" } func (f transformDeleteKeyNodeFactory) Category() string { return NODE_CATEGORY_TRANSFORM } func (f transformDeleteKeyNodeFactory) Labels() []string { return []string{"Success", "Failure"} } -func (f transformDeleteKeyNodeFactory) Create(id string, meta Metadata) (Node, error) { +func (f transformDeleteKeyNodeFactory) Create(id string, meta Properties) (Node, error) { node := &transformDeleteKeyNode{ bareNode: newBareNode(f.Name(), id, meta, f.Labels()), } @@ -24,7 +23,7 @@ func (f transformDeleteKeyNodeFactory) Create(id string, meta Metadata) (Node, e } func (n *transformDeleteKeyNode) Handle(msg *message.Message) error { - logrus.Infof("%s handle message '%s'", n.Name(), msg.MsgType) + n.Debug(msg, message.DEBUGIN, "") successLabelNode := n.GetLinkedNode("Success") failureLabelNode := n.GetLinkedNode("Failure") @@ -47,10 +46,12 @@ func (n *transformDeleteKeyNode) Handle(msg *message.Message) error { } } else { if failureLabelNode != nil { + n.Debug(msg, message.DEBUGOUT, "未识别FormType") return failureLabelNode.Handle(msg) } } if successLabelNode != nil { + n.Debug(msg, message.DEBUGOUT, "") return successLabelNode.Handle(msg) } return nil diff --git a/pkg/rule_engine/nodes/transform_rename_key_node.go b/pkg/rule_engine/nodes/transform_rename_key_node.go index 6730d7e..ef57623 100644 --- a/pkg/rule_engine/nodes/transform_rename_key_node.go +++ b/pkg/rule_engine/nodes/transform_rename_key_node.go @@ -1,7 +1,6 @@ package nodes import ( - "github.com/sirupsen/logrus" "pandax/pkg/rule_engine/message" ) @@ -19,7 +18,7 @@ type transformRenameKeyNodeFactory struct{} func (f transformRenameKeyNodeFactory) Name() string { return "RenameKeyNode" } func (f transformRenameKeyNodeFactory) Category() string { return NODE_CATEGORY_TRANSFORM } func (f transformRenameKeyNodeFactory) Labels() []string { return []string{"Success", "Failure"} } -func (f transformRenameKeyNodeFactory) Create(id string, meta Metadata) (Node, error) { +func (f transformRenameKeyNodeFactory) Create(id string, meta Properties) (Node, error) { node := &transformRenameKeyNode{ bareNode: newBareNode(f.Name(), id, meta, f.Labels()), } @@ -27,7 +26,7 @@ func (f transformRenameKeyNodeFactory) Create(id string, meta Metadata) (Node, e } func (n *transformRenameKeyNode) Handle(msg *message.Message) error { - logrus.Infof("%s handle message '%s'", n.Name(), msg.MsgType) + n.Debug(msg, message.DEBUGIN, "") successLabelNode := n.GetLinkedNode("Success") failureLabelNode := n.GetLinkedNode("Failure") @@ -51,10 +50,12 @@ func (n *transformRenameKeyNode) Handle(msg *message.Message) error { } } else { if failureLabelNode != nil { + n.Debug(msg, message.DEBUGOUT, "未识别FormType") return failureLabelNode.Handle(msg) } } if successLabelNode != nil { + n.Debug(msg, message.DEBUGOUT, "") return successLabelNode.Handle(msg) } return nil diff --git a/pkg/rule_engine/nodes/transform_script_node.go b/pkg/rule_engine/nodes/transform_script_node.go index ea12a73..28ade3b 100644 --- a/pkg/rule_engine/nodes/transform_script_node.go +++ b/pkg/rule_engine/nodes/transform_script_node.go @@ -1,7 +1,6 @@ package nodes import ( - "github.com/sirupsen/logrus" "pandax/pkg/rule_engine/message" ) @@ -15,7 +14,7 @@ type transformScriptNodeFactory struct{} func (f transformScriptNodeFactory) Name() string { return "ScriptKeyNode" } func (f transformScriptNodeFactory) Category() string { return NODE_CATEGORY_TRANSFORM } func (f transformScriptNodeFactory) Labels() []string { return []string{"Success", "Failure"} } -func (f transformScriptNodeFactory) Create(id string, meta Metadata) (Node, error) { +func (f transformScriptNodeFactory) Create(id string, meta Properties) (Node, error) { node := &transformScriptNode{ bareNode: newBareNode(f.Name(), id, meta, f.Labels()), } @@ -23,7 +22,7 @@ func (f transformScriptNodeFactory) Create(id string, meta Metadata) (Node, erro } func (n *transformScriptNode) Handle(msg *message.Message) error { - logrus.Infof("%s handle message '%s'", n.Name(), msg.MsgType) + n.Debug(msg, message.DEBUGIN, "") successLabelNode := n.GetLinkedNode("Success") failureLabelNode := n.GetLinkedNode("Failure") @@ -31,6 +30,7 @@ func (n *transformScriptNode) Handle(msg *message.Message) error { scriptEngine := NewScriptEngine(*msg, "Transform", n.Script) newMessage, err := scriptEngine.ScriptOnMessage() if err != nil { + n.Debug(msg, message.DEBUGOUT, err.Error()) if failureLabelNode != nil { return failureLabelNode.Handle(msg) } else { @@ -38,6 +38,7 @@ func (n *transformScriptNode) Handle(msg *message.Message) error { } } if successLabelNode != nil { + n.Debug(msg, message.DEBUGOUT, "") return successLabelNode.Handle(newMessage) } return nil