规则链

This commit is contained in:
XM-GO
2023-04-17 17:15:36 +08:00
parent 8b6f18ef6b
commit 9a7bab6cf3
10 changed files with 101 additions and 89 deletions

View File

@@ -2,12 +2,10 @@ package rule_engine
import (
"context"
"fmt"
"github.com/sirupsen/logrus"
"pandax/pkg/rule_engine/manifest"
"pandax/pkg/rule_engine/message"
"pandax/pkg/rule_engine/nodes"
"strings"
)
// ruleChainInstance is rulechain's runtime instance that manage all nodes in this chain,
@@ -31,56 +29,15 @@ func NewRuleChainInstance(data []byte) (*ruleChainInstance, []error) {
// newWithManifest create rule chain by user's manifest file
func newInstanceWithManifest(m *manifest.Manifest) (*ruleChainInstance, []error) {
errs := make([]error, 0)
nodes, err := nodes.GetNodes(m)
if err != nil {
errs = append(errs, err)
return nil, errs
}
r := &ruleChainInstance{
firstRuleNodeId: m.FirstRuleNodeId,
nodes: make(map[string]nodes.Node),
nodes: nodes,
}
// Create All nodes
for _, n := range m.Nodes {
metadata := nodes.NewMetadataWithValues(n.Properties)
node, err := nodes.NewNode(n.Type, n.Id, metadata)
if err != nil {
errs = append(errs, err)
continue
}
if _, found := r.nodes[n.Id]; found {
err := fmt.Errorf("node '%s' already exist in rulechain", n.Id)
errs = append(errs, err)
continue
}
r.nodes[n.Id] = node
}
for _, edge := range m.Edges {
originalNode, found := r.nodes[edge.SourceNodeId]
if !found {
err := fmt.Errorf("original node '%s' no exist in", originalNode.Name())
errs = append(errs, err)
continue
}
targetNode, found := r.nodes[edge.TargetNodeId]
if !found {
err := fmt.Errorf("target node '%s' no exist in rulechain", targetNode.Name())
errs = append(errs, err)
continue
}
//可以有多个类型
split := strings.Split(edge.Properties["type"].(string), "/")
for _, ty := range split {
originalNode.AddLinkedNode(ty, targetNode)
}
}
for name, node := range r.nodes {
targetNodes := node.GetLinkedNodes()
mustLabels := node.MustLabels()
for _, label := range mustLabels {
if _, found := targetNodes[label]; !found {
err := fmt.Errorf("the label '%s' in node '%s' no exist'", label, name)
errs = append(errs, err)
continue
}
}
}
return r, errs
}

View File

@@ -31,12 +31,12 @@ type Metadata interface {
// Predefined message types
const (
MessageTypePostAttributesRequest = "Post attributes"
MessageTypePostTelemetryRequest = "Post telemetry"
MessageTypeActivityEvent = "Activity event"
MessageTypeInactivityEvent = "Inactivity event"
MessageTypeConnectEvent = "Connect event"
MessageTypeDisconnectEvent = "Disconnect event"
EventConnectType = "connect"
EventDisConnectType = "disconnect"
EventUpEventType = "event"
EventAlarmType = "alarm"
EventTelemetryType = "telemetry"
EventAttributesType = "attributes"
)
// NewMessage ...

View File

@@ -11,8 +11,8 @@ const (
GATEWAY = "GATEWAY"
)
// 检查关联关系
// 该消息来自与哪个实体或到那个实体
//检查关联关系
//该消息来自与哪个实体或到那个实体
type deviceTypeSwitchNode struct {
bareNode
}

View File

@@ -12,7 +12,7 @@ type messageTypeFilterNode struct {
type messageTypeFilterNodeFactory struct{}
func (f messageTypeFilterNodeFactory) Name() string { return "MessageTypeFilterNode" }
func (f messageTypeFilterNodeFactory) Name() string { return "MessageTypeNode" }
func (f messageTypeFilterNodeFactory) Category() string { return NODE_CATEGORY_FILTER }
func (f messageTypeFilterNodeFactory) Labels() []string { return []string{"True", "False"} }
@@ -32,9 +32,12 @@ func (n *messageTypeFilterNode) Handle(msg message.Message) error {
messageType := msg.GetType()
for _, filterType := range n.MessageTypes {
if filterType == messageType {
if filterType == messageType && trueLabelNode != nil {
return trueLabelNode.Handle(msg)
}
}
return falseLabelNode.Handle(msg)
if falseLabelNode != nil {
return falseLabelNode.Handle(msg)
}
return nil
}

View File

@@ -1,10 +1,9 @@
package nodes
import (
"fmt"
"pandax/pkg/rule_engine/message"
"github.com/sirupsen/logrus"
"log"
"pandax/pkg/rule_engine/message"
)
type messageTypeSwitchNode struct {
@@ -16,10 +15,12 @@ func (f messageTypeSwitchNodeFactory) Name() string { return "MessageTypeSwi
func (f messageTypeSwitchNodeFactory) Category() string { return NODE_CATEGORY_FILTER }
func (f messageTypeSwitchNodeFactory) Labels() []string {
return []string{
message.MessageTypePostAttributesRequest,
message.MessageTypePostTelemetryRequest,
message.MessageTypeConnectEvent,
message.MessageTypeDisconnectEvent,
message.EventAttributesType,
message.EventAlarmType,
message.EventTelemetryType,
message.EventUpEventType,
message.EventConnectType,
message.EventDisConnectType,
"Other",
}
}
@@ -35,7 +36,7 @@ func (n *messageTypeSwitchNode) Handle(msg message.Message) error {
nodes := n.GetLinkedNodes()
messageType := msg.GetType()
log.Println("开始执行messageTypeSwitchNode")
for label, node := range nodes {
if messageType == label {
return node.Handle(msg)
@@ -46,5 +47,5 @@ func (n *messageTypeSwitchNode) Handle(msg message.Message) error {
return node.Handle(msg)
}
// not found
return fmt.Errorf("%s no label to handle message", n.Name())
return nil
}

View File

@@ -2,6 +2,7 @@ package nodes
import (
"github.com/sirupsen/logrus"
"log"
"pandax/pkg/rule_engine/message"
)
@@ -9,7 +10,7 @@ const ScriptFilterNodeName = "ScriptFilterNode"
type scriptFilterNode struct {
bareNode
Scripts string `json:"scripts" yaml:"scripts"`
Script string `json:"script" yaml:"script"`
}
type scriptFilterNodeFactory struct{}
@@ -30,9 +31,14 @@ func (n *scriptFilterNode) Handle(msg message.Message) error {
trueLabelNode := n.GetLinkedNode("True")
falseLabelNode := n.GetLinkedNode("False")
scriptEngine := NewScriptEngine()
isTrue, error := scriptEngine.ScriptOnFilter(msg, n.Scripts)
if isTrue == true && error == nil {
isTrue, error := scriptEngine.ScriptOnFilter(msg, n.Script)
log.Println(isTrue)
if isTrue == true && error == nil && trueLabelNode != nil {
return trueLabelNode.Handle(msg)
} else {
if falseLabelNode != nil {
return falseLabelNode.Handle(msg)
}
}
return falseLabelNode.Handle(msg)
return nil
}

View File

@@ -1,24 +1,14 @@
// Licensed under the Apache License, Version 2.0 (the "License"); you may
// not use p file except in compliance with the License. You may obtain
// a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
// License for the specific language governing permissions and limitations
// under the License.
package nodes
import (
"github.com/sirupsen/logrus"
"log"
"pandax/pkg/rule_engine/message"
)
type switchFilterNode struct {
bareNode
Scripts string `json:"scripts" yaml:"scripts"`
Script string `json:"script" yaml:"script"`
}
type switchFilterNodeFactory struct{}
@@ -27,8 +17,13 @@ func (f switchFilterNodeFactory) Name() string { return "SwitchNode" }
func (f switchFilterNodeFactory) Category() string { return NODE_CATEGORY_FILTER }
func (f switchFilterNodeFactory) Labels() []string {
return []string{
"Failure", "True", "False", message.MessageTypePostTelemetryRequest,
message.MessageTypeConnectEvent,
"Failure", "True", "False",
message.EventAttributesType,
message.EventAlarmType,
message.EventTelemetryType,
message.EventUpEventType,
message.EventConnectType,
message.EventDisConnectType,
}
}
func (f switchFilterNodeFactory) Create(id string, meta Metadata) (Node, error) {
@@ -42,7 +37,8 @@ func (n *switchFilterNode) Handle(msg message.Message) error {
logrus.Infof("%s handle message '%s'", n.Name(), msg.GetType())
scriptEngine := NewScriptEngine()
SwitchResults, err := scriptEngine.ScriptOnSwitch(msg, n.Scripts)
SwitchResults, err := scriptEngine.ScriptOnSwitch(msg, n.Script)
log.Println("开始执行switchFilterNode", SwitchResults)
if err != nil {
return nil
}
@@ -50,7 +46,7 @@ func (n *switchFilterNode) Handle(msg message.Message) error {
for label, node := range nodes {
for _, switchresult := range SwitchResults {
if label == switchresult {
return node.Handle(msg)
node.Handle(msg)
}
}
}

View File

@@ -2,7 +2,9 @@ package nodes
import (
"errors"
"pandax/pkg/rule_engine/manifest"
"pandax/pkg/rule_engine/message"
"strings"
"github.com/sirupsen/logrus"
)
@@ -63,3 +65,46 @@ func decodePath(meta Metadata, n Node) (Node, error) {
}
return n, nil
}
func GetNodes(m *manifest.Manifest) (map[string]Node, error) {
nodes := make(map[string]Node)
// Create All nodes
for _, n := range m.Nodes {
metadata := NewMetadataWithValues(n.Properties)
node, err := NewNode(n.Type, n.Id, metadata)
if err != nil {
continue
}
if _, found := nodes[n.Id]; found {
logrus.Errorf("node '%s' already exist in rulechain", n.Id)
continue
}
nodes[n.Id] = node
}
for _, edge := range m.Edges {
originalNode, found := nodes[edge.SourceNodeId]
if !found {
logrus.Errorf("original node '%s' no exist in", originalNode.Name())
continue
}
targetNode, found := nodes[edge.TargetNodeId]
if !found {
logrus.Errorf("target node '%s' no exist in rulechain", targetNode.Name())
continue
}
//可以有多个类型
//可以有多个类型
types := make([]string, 0)
if _, ok := edge.Properties["type"]; !ok {
types = append(types, "True")
} else {
types = strings.Split(edge.Properties["type"].(string), "/")
}
for _, ty := range types {
originalNode.AddLinkedNode(ty, targetNode)
}
}
return nodes, nil
}