diff --git a/apps/visual/api/visual_screen.go b/apps/visual/api/visual_screen.go index 89c0abf..11740cc 100644 --- a/apps/visual/api/visual_screen.go +++ b/apps/visual/api/visual_screen.go @@ -10,6 +10,7 @@ import ( "github.com/XM-GO/PandaKit/restfulx" "github.com/emicklei/go-restful/v3" "github.com/kakuilan/kgo" + "pandax/pkg/global" "strings" "pandax/apps/visual/entity" @@ -100,4 +101,5 @@ func (p *VisualScreenApi) ScreenTwin(request *restful.Request, response *restful pxSocket.OnMessage(newWebsocket, string(message)) } }() + global.Log.Info("Websocket连接成功") } diff --git a/pkg/rule_engine/instance.go b/pkg/rule_engine/instance.go index 81ca10b..614854b 100644 --- a/pkg/rule_engine/instance.go +++ b/pkg/rule_engine/instance.go @@ -2,12 +2,10 @@ package rule_engine import ( "context" - "fmt" "github.com/sirupsen/logrus" "pandax/pkg/rule_engine/manifest" "pandax/pkg/rule_engine/message" "pandax/pkg/rule_engine/nodes" - "strings" ) // ruleChainInstance is rulechain's runtime instance that manage all nodes in this chain, @@ -31,56 +29,15 @@ func NewRuleChainInstance(data []byte) (*ruleChainInstance, []error) { // newWithManifest create rule chain by user's manifest file func newInstanceWithManifest(m *manifest.Manifest) (*ruleChainInstance, []error) { errs := make([]error, 0) + nodes, err := nodes.GetNodes(m) + if err != nil { + errs = append(errs, err) + return nil, errs + } r := &ruleChainInstance{ firstRuleNodeId: m.FirstRuleNodeId, - nodes: make(map[string]nodes.Node), + nodes: nodes, } - // Create All nodes - for _, n := range m.Nodes { - metadata := nodes.NewMetadataWithValues(n.Properties) - node, err := nodes.NewNode(n.Type, n.Id, metadata) - if err != nil { - errs = append(errs, err) - continue - } - if _, found := r.nodes[n.Id]; found { - err := fmt.Errorf("node '%s' already exist in rulechain", n.Id) - errs = append(errs, err) - continue - } - r.nodes[n.Id] = node - } - for _, edge := range m.Edges { - originalNode, found := r.nodes[edge.SourceNodeId] - if !found { - err := fmt.Errorf("original node '%s' no exist in", originalNode.Name()) - errs = append(errs, err) - continue - } - targetNode, found := r.nodes[edge.TargetNodeId] - if !found { - err := fmt.Errorf("target node '%s' no exist in rulechain", targetNode.Name()) - errs = append(errs, err) - continue - } - //可以有多个类型 - split := strings.Split(edge.Properties["type"].(string), "/") - for _, ty := range split { - originalNode.AddLinkedNode(ty, targetNode) - } - } - for name, node := range r.nodes { - targetNodes := node.GetLinkedNodes() - mustLabels := node.MustLabels() - for _, label := range mustLabels { - if _, found := targetNodes[label]; !found { - err := fmt.Errorf("the label '%s' in node '%s' no exist'", label, name) - errs = append(errs, err) - continue - } - } - } - return r, errs } diff --git a/pkg/rule_engine/message/message.go b/pkg/rule_engine/message/message.go index ca9c40f..cab114e 100644 --- a/pkg/rule_engine/message/message.go +++ b/pkg/rule_engine/message/message.go @@ -31,12 +31,12 @@ type Metadata interface { // Predefined message types const ( - MessageTypePostAttributesRequest = "Post attributes" - MessageTypePostTelemetryRequest = "Post telemetry" - MessageTypeActivityEvent = "Activity event" - MessageTypeInactivityEvent = "Inactivity event" - MessageTypeConnectEvent = "Connect event" - MessageTypeDisconnectEvent = "Disconnect event" + EventConnectType = "connect" + EventDisConnectType = "disconnect" + EventUpEventType = "event" + EventAlarmType = "alarm" + EventTelemetryType = "telemetry" + EventAttributesType = "attributes" ) // NewMessage ... diff --git a/pkg/rule_engine/nodes/filter_device_type_switch_node.go b/pkg/rule_engine/nodes/filter_device_type_switch_node.go index 3f9df71..82d7638 100644 --- a/pkg/rule_engine/nodes/filter_device_type_switch_node.go +++ b/pkg/rule_engine/nodes/filter_device_type_switch_node.go @@ -11,8 +11,8 @@ const ( GATEWAY = "GATEWAY" ) -// 检查关联关系 -// 该消息来自与哪个实体或到那个实体 +//检查关联关系 +//该消息来自与哪个实体或到那个实体 type deviceTypeSwitchNode struct { bareNode } diff --git a/pkg/rule_engine/nodes/filter_message_type_node.go b/pkg/rule_engine/nodes/filter_message_type_node.go index e06263b..31bfa5c 100644 --- a/pkg/rule_engine/nodes/filter_message_type_node.go +++ b/pkg/rule_engine/nodes/filter_message_type_node.go @@ -12,7 +12,7 @@ type messageTypeFilterNode struct { type messageTypeFilterNodeFactory struct{} -func (f messageTypeFilterNodeFactory) Name() string { return "MessageTypeFilterNode" } +func (f messageTypeFilterNodeFactory) Name() string { return "MessageTypeNode" } func (f messageTypeFilterNodeFactory) Category() string { return NODE_CATEGORY_FILTER } func (f messageTypeFilterNodeFactory) Labels() []string { return []string{"True", "False"} } @@ -32,9 +32,12 @@ func (n *messageTypeFilterNode) Handle(msg message.Message) error { messageType := msg.GetType() for _, filterType := range n.MessageTypes { - if filterType == messageType { + if filterType == messageType && trueLabelNode != nil { return trueLabelNode.Handle(msg) } } - return falseLabelNode.Handle(msg) + if falseLabelNode != nil { + return falseLabelNode.Handle(msg) + } + return nil } diff --git a/pkg/rule_engine/nodes/filter_message_type_switch_node.go b/pkg/rule_engine/nodes/filter_message_type_switch_node.go index a5b3a6d..3cbc9b3 100644 --- a/pkg/rule_engine/nodes/filter_message_type_switch_node.go +++ b/pkg/rule_engine/nodes/filter_message_type_switch_node.go @@ -1,10 +1,9 @@ package nodes import ( - "fmt" - "pandax/pkg/rule_engine/message" - "github.com/sirupsen/logrus" + "log" + "pandax/pkg/rule_engine/message" ) type messageTypeSwitchNode struct { @@ -16,10 +15,12 @@ func (f messageTypeSwitchNodeFactory) Name() string { return "MessageTypeSwi func (f messageTypeSwitchNodeFactory) Category() string { return NODE_CATEGORY_FILTER } func (f messageTypeSwitchNodeFactory) Labels() []string { return []string{ - message.MessageTypePostAttributesRequest, - message.MessageTypePostTelemetryRequest, - message.MessageTypeConnectEvent, - message.MessageTypeDisconnectEvent, + message.EventAttributesType, + message.EventAlarmType, + message.EventTelemetryType, + message.EventUpEventType, + message.EventConnectType, + message.EventDisConnectType, "Other", } } @@ -35,7 +36,7 @@ func (n *messageTypeSwitchNode) Handle(msg message.Message) error { nodes := n.GetLinkedNodes() messageType := msg.GetType() - + log.Println("开始执行messageTypeSwitchNode") for label, node := range nodes { if messageType == label { return node.Handle(msg) @@ -46,5 +47,5 @@ func (n *messageTypeSwitchNode) Handle(msg message.Message) error { return node.Handle(msg) } // not found - return fmt.Errorf("%s no label to handle message", n.Name()) + return nil } diff --git a/pkg/rule_engine/nodes/filter_script_node.go b/pkg/rule_engine/nodes/filter_script_node.go index 6beeaa7..c9a1e68 100644 --- a/pkg/rule_engine/nodes/filter_script_node.go +++ b/pkg/rule_engine/nodes/filter_script_node.go @@ -2,6 +2,7 @@ package nodes import ( "github.com/sirupsen/logrus" + "log" "pandax/pkg/rule_engine/message" ) @@ -9,7 +10,7 @@ const ScriptFilterNodeName = "ScriptFilterNode" type scriptFilterNode struct { bareNode - Scripts string `json:"scripts" yaml:"scripts"` + Script string `json:"script" yaml:"script"` } type scriptFilterNodeFactory struct{} @@ -30,9 +31,14 @@ func (n *scriptFilterNode) Handle(msg message.Message) error { trueLabelNode := n.GetLinkedNode("True") falseLabelNode := n.GetLinkedNode("False") scriptEngine := NewScriptEngine() - isTrue, error := scriptEngine.ScriptOnFilter(msg, n.Scripts) - if isTrue == true && error == nil { + isTrue, error := scriptEngine.ScriptOnFilter(msg, n.Script) + log.Println(isTrue) + if isTrue == true && error == nil && trueLabelNode != nil { return trueLabelNode.Handle(msg) + } else { + if falseLabelNode != nil { + return falseLabelNode.Handle(msg) + } } - return falseLabelNode.Handle(msg) + return nil } diff --git a/pkg/rule_engine/nodes/filter_switch_node.go b/pkg/rule_engine/nodes/filter_switch_node.go index a3ceee7..0edb697 100644 --- a/pkg/rule_engine/nodes/filter_switch_node.go +++ b/pkg/rule_engine/nodes/filter_switch_node.go @@ -1,24 +1,14 @@ -// Licensed under the Apache License, Version 2.0 (the "License"); you may -// not use p file except in compliance with the License. You may obtain -// a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -// License for the specific language governing permissions and limitations -// under the License. package nodes import ( "github.com/sirupsen/logrus" + "log" "pandax/pkg/rule_engine/message" ) type switchFilterNode struct { bareNode - Scripts string `json:"scripts" yaml:"scripts"` + Script string `json:"script" yaml:"script"` } type switchFilterNodeFactory struct{} @@ -27,8 +17,13 @@ func (f switchFilterNodeFactory) Name() string { return "SwitchNode" } func (f switchFilterNodeFactory) Category() string { return NODE_CATEGORY_FILTER } func (f switchFilterNodeFactory) Labels() []string { return []string{ - "Failure", "True", "False", message.MessageTypePostTelemetryRequest, - message.MessageTypeConnectEvent, + "Failure", "True", "False", + message.EventAttributesType, + message.EventAlarmType, + message.EventTelemetryType, + message.EventUpEventType, + message.EventConnectType, + message.EventDisConnectType, } } func (f switchFilterNodeFactory) Create(id string, meta Metadata) (Node, error) { @@ -42,7 +37,8 @@ func (n *switchFilterNode) Handle(msg message.Message) error { logrus.Infof("%s handle message '%s'", n.Name(), msg.GetType()) scriptEngine := NewScriptEngine() - SwitchResults, err := scriptEngine.ScriptOnSwitch(msg, n.Scripts) + SwitchResults, err := scriptEngine.ScriptOnSwitch(msg, n.Script) + log.Println("开始执行switchFilterNode", SwitchResults) if err != nil { return nil } @@ -50,7 +46,7 @@ func (n *switchFilterNode) Handle(msg message.Message) error { for label, node := range nodes { for _, switchresult := range SwitchResults { if label == switchresult { - return node.Handle(msg) + node.Handle(msg) } } } diff --git a/pkg/rule_engine/nodes/node.go b/pkg/rule_engine/nodes/node.go index 7a875d2..5652ef5 100644 --- a/pkg/rule_engine/nodes/node.go +++ b/pkg/rule_engine/nodes/node.go @@ -2,7 +2,9 @@ package nodes import ( "errors" + "pandax/pkg/rule_engine/manifest" "pandax/pkg/rule_engine/message" + "strings" "github.com/sirupsen/logrus" ) @@ -63,3 +65,46 @@ func decodePath(meta Metadata, n Node) (Node, error) { } return n, nil } + +func GetNodes(m *manifest.Manifest) (map[string]Node, error) { + nodes := make(map[string]Node) + // Create All nodes + for _, n := range m.Nodes { + metadata := NewMetadataWithValues(n.Properties) + node, err := NewNode(n.Type, n.Id, metadata) + if err != nil { + continue + } + if _, found := nodes[n.Id]; found { + logrus.Errorf("node '%s' already exist in rulechain", n.Id) + continue + } + nodes[n.Id] = node + } + for _, edge := range m.Edges { + originalNode, found := nodes[edge.SourceNodeId] + if !found { + logrus.Errorf("original node '%s' no exist in", originalNode.Name()) + continue + } + targetNode, found := nodes[edge.TargetNodeId] + if !found { + logrus.Errorf("target node '%s' no exist in rulechain", targetNode.Name()) + continue + } + //可以有多个类型 + //可以有多个类型 + types := make([]string, 0) + if _, ok := edge.Properties["type"]; !ok { + types = append(types, "True") + } else { + types = strings.Split(edge.Properties["type"].(string), "/") + } + for _, ty := range types { + originalNode.AddLinkedNode(ty, targetNode) + } + + } + + return nodes, nil +} diff --git a/pkg/websocket/socket_server_pool.go b/pkg/websocket/socket_server_pool.go index 8ababe3..11f0aae 100644 --- a/pkg/websocket/socket_server_pool.go +++ b/pkg/websocket/socket_server_pool.go @@ -2,6 +2,7 @@ package websocket import ( "github.com/gorilla/websocket" + "log" "pandax/pkg/global" ) @@ -32,6 +33,7 @@ func RemoveWebSocket(screenId string) bool { func SendMessage(message, screenId string) { ws := GetWebSocketByScreenId(screenId) if ws != nil { + log.Println("发送消息", message) ws.Conn.WriteMessage(websocket.TextMessage, []byte(message)) } }