mirror of
https://gitee.com/XM-GO/PandaX.git
synced 2026-04-29 09:21:26 +08:00
规则引擎
This commit is contained in:
@@ -15,8 +15,8 @@ type ruleChainInstance struct {
|
||||
nodes map[string]nodes.Node
|
||||
}
|
||||
|
||||
func newRuleChainInstance(data []byte) (*ruleChainInstance, []error) {
|
||||
errors := []error{}
|
||||
func NewRuleChainInstance(data []byte) (*ruleChainInstance, []error) {
|
||||
errors := make([]error, 0)
|
||||
|
||||
manifest, err := manifest.New(data)
|
||||
if err != nil {
|
||||
@@ -29,7 +29,7 @@ func newRuleChainInstance(data []byte) (*ruleChainInstance, []error) {
|
||||
|
||||
// newWithManifest create rule chain by user's manifest file
|
||||
func newInstanceWithManifest(m *manifest.Manifest) (*ruleChainInstance, []error) {
|
||||
errs := []error{}
|
||||
errs := make([]error, 0)
|
||||
r := &ruleChainInstance{
|
||||
firstRuleNodeId: m.FirstRuleNodeId,
|
||||
nodes: make(map[string]nodes.Node),
|
||||
@@ -49,7 +49,6 @@ func newInstanceWithManifest(m *manifest.Manifest) (*ruleChainInstance, []error)
|
||||
}
|
||||
r.nodes[n.Id] = node
|
||||
}
|
||||
|
||||
for _, edge := range m.Edges {
|
||||
originalNode, found := r.nodes[edge.SourceNodeId]
|
||||
if !found {
|
||||
@@ -63,7 +62,7 @@ func newInstanceWithManifest(m *manifest.Manifest) (*ruleChainInstance, []error)
|
||||
errs = append(errs, err)
|
||||
continue
|
||||
}
|
||||
originalNode.AddLinkedNode(edge.Type, targetNode)
|
||||
originalNode.AddLinkedNode(edge.Properties["type"].(string), targetNode)
|
||||
}
|
||||
for name, node := range r.nodes {
|
||||
targetNodes := node.GetLinkedNodes()
|
||||
|
||||
17
pkg/rule_engine/instance_test.go
Normal file
17
pkg/rule_engine/instance_test.go
Normal file
@@ -0,0 +1,17 @@
|
||||
package rule_engine
|
||||
|
||||
import (
|
||||
"io/ioutil"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestNewRuleChainInstance(t *testing.T) {
|
||||
buf, err := ioutil.ReadFile("./manifest/manifest_sample.json")
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
_, errs := NewRuleChainInstance(buf)
|
||||
if len(errs) > 0 {
|
||||
t.Error(errs[0])
|
||||
}
|
||||
}
|
||||
@@ -12,9 +12,10 @@ type Node struct {
|
||||
}
|
||||
|
||||
type Edge struct {
|
||||
SourceNodeId string `json:"sourceNodeId" yaml:"sourceNodeId"`
|
||||
TargetNodeId string `json:"targetNodeId" yaml:"targetNodeId"`
|
||||
Type string `json:"type" yaml:"type"` //success or fail
|
||||
SourceNodeId string `json:"sourceNodeId" yaml:"sourceNodeId"`
|
||||
TargetNodeId string `json:"targetNodeId" yaml:"targetNodeId"`
|
||||
Type string `json:"type" yaml:"type"` //success or fail
|
||||
Properties map[string]interface{} `json:"properties" yaml:"properties"` //debugMode
|
||||
}
|
||||
|
||||
type Manifest struct {
|
||||
@@ -26,12 +27,13 @@ type Manifest struct {
|
||||
func New(data []byte) (*Manifest, error) {
|
||||
firstRuleNodeId := ""
|
||||
manifest := make(map[string]interface{})
|
||||
if err := json.Unmarshal(data, manifest); err != nil {
|
||||
if err := json.Unmarshal(data, &manifest); err != nil {
|
||||
logrus.WithError(err).Errorf("invalid node chain manifest file")
|
||||
return nil, err
|
||||
}
|
||||
nodes := make([]Node, 0)
|
||||
for _, node := range manifest["nodes"].([]map[string]interface{}) {
|
||||
for _, mn := range manifest["nodes"].([]interface{}) {
|
||||
node := mn.(map[string]interface{})
|
||||
if node["type"].(string) == "InputNode" {
|
||||
firstRuleNodeId = node["id"].(string)
|
||||
}
|
||||
@@ -42,9 +44,11 @@ func New(data []byte) (*Manifest, error) {
|
||||
})
|
||||
}
|
||||
edges := make([]Edge, 0)
|
||||
for _, edge := range manifest["edges"].([]map[string]interface{}) {
|
||||
for _, en := range manifest["edges"].([]interface{}) {
|
||||
edge := en.(map[string]interface{})
|
||||
edges = append(edges, Edge{
|
||||
Type: edge["type"].(string),
|
||||
Properties: edge["properties"].(map[string]interface{}),
|
||||
SourceNodeId: edge["sourceNodeId"].(string),
|
||||
TargetNodeId: edge["targetNodeId"].(string),
|
||||
})
|
||||
|
||||
@@ -7,8 +7,7 @@
|
||||
"y": 280,
|
||||
"properties": {
|
||||
"icon": "/src/assets/icon_module/svg/start.svg",
|
||||
"debugMode": false,
|
||||
"status": false
|
||||
"debugMode": false
|
||||
},
|
||||
"zIndex": 1002,
|
||||
"text": {
|
||||
@@ -24,8 +23,7 @@
|
||||
"y": 160,
|
||||
"properties": {
|
||||
"icon": "/src/assets/icon_module/svg/function.svg",
|
||||
"debugMode": false,
|
||||
"status": false
|
||||
"debugMode": false
|
||||
},
|
||||
"zIndex": 1004,
|
||||
"text": {
|
||||
@@ -42,7 +40,7 @@
|
||||
"properties": {
|
||||
"icon": "/src/assets/icon_module/svg/switch.svg",
|
||||
"debugMode": false,
|
||||
"status": false
|
||||
"scripts": "return {\n msg: msg,\n metadata: metadata,\n msgType: msgType\n};"
|
||||
},
|
||||
"zIndex": 1006,
|
||||
"text": {
|
||||
@@ -66,7 +64,9 @@
|
||||
"x": 540,
|
||||
"y": 160
|
||||
},
|
||||
"properties": {},
|
||||
"properties": {
|
||||
"type": "Success"
|
||||
},
|
||||
"zIndex": 1007,
|
||||
"pointsList": [
|
||||
{
|
||||
@@ -100,7 +100,9 @@
|
||||
"x": 540,
|
||||
"y": 460
|
||||
},
|
||||
"properties": {},
|
||||
"properties": {
|
||||
"type": "Failure"
|
||||
},
|
||||
"zIndex": 1008,
|
||||
"pointsList": [
|
||||
{
|
||||
|
||||
@@ -1,17 +1,28 @@
|
||||
package message
|
||||
|
||||
import "github.com/sirupsen/logrus"
|
||||
|
||||
// Message ...
|
||||
type Message interface {
|
||||
GetOriginator() string
|
||||
GetType() string
|
||||
GetPayload() []byte
|
||||
GetData() []byte
|
||||
GetMetadata() Metadata
|
||||
SetType(string)
|
||||
SetPayload([]byte)
|
||||
SetData([]byte)
|
||||
SetOriginator(string)
|
||||
SetMetadata(Metadata)
|
||||
MarshalBinary() ([]byte, error)
|
||||
UnmarshalBinary(b []byte) error
|
||||
}
|
||||
|
||||
// Metadata ...
|
||||
type Metadata interface {
|
||||
Keys() []string
|
||||
GetKeyValue(key string) interface{}
|
||||
SetKeyValue(key string, val interface{})
|
||||
}
|
||||
|
||||
// Predefined message types
|
||||
const (
|
||||
MessageTypePostAttributesRequest = "Post attributes"
|
||||
@@ -25,31 +36,75 @@ const (
|
||||
// NewMessage ...
|
||||
func NewMessage() Message {
|
||||
return &defaultMessage{
|
||||
payload: []byte{},
|
||||
data: []byte{},
|
||||
}
|
||||
}
|
||||
|
||||
type defaultMessage struct {
|
||||
originator string //数据发布者
|
||||
messageType string //数据类型,数据来源
|
||||
payload []byte //二进制数据
|
||||
id string //uuid
|
||||
ts int64 //时间戳
|
||||
msgType string //消息类型,数据来源
|
||||
originator string //数据发布者
|
||||
customerId string //客户Id UUID
|
||||
entityId string //实体Id UUID
|
||||
data []byte //数据
|
||||
dataType string //数据类型 JSON
|
||||
metadata Metadata //数据的元数据
|
||||
}
|
||||
|
||||
// NewMessageWithDetail ...
|
||||
func NewMessageWithDetail(originator string, messageType string, payload []byte) Message {
|
||||
func NewMessageWithDetail(originator string, messageType string, msg []byte) Message {
|
||||
return &defaultMessage{
|
||||
originator: originator,
|
||||
messageType: messageType,
|
||||
payload: payload,
|
||||
originator: originator,
|
||||
msgType: messageType,
|
||||
data: msg,
|
||||
}
|
||||
}
|
||||
|
||||
func (t *defaultMessage) GetOriginator() string { return t.originator }
|
||||
func (t *defaultMessage) GetType() string { return t.messageType }
|
||||
func (t *defaultMessage) GetPayload() []byte { return t.payload }
|
||||
func (t *defaultMessage) SetType(messageType string) { t.messageType = messageType }
|
||||
func (t *defaultMessage) SetPayload(payload []byte) { t.payload = payload }
|
||||
func (t *defaultMessage) GetType() string { return t.msgType }
|
||||
func (t *defaultMessage) GetData() []byte { return t.data }
|
||||
func (t *defaultMessage) GetMetadata() Metadata { return t.metadata }
|
||||
func (t *defaultMessage) SetType(msgType string) { t.msgType = msgType }
|
||||
func (t *defaultMessage) SetData(data []byte) { t.data = data }
|
||||
func (t *defaultMessage) SetOriginator(originator string) { t.originator = originator }
|
||||
func (t *defaultMessage) SetMetadata(metadata Metadata) { t.metadata = metadata }
|
||||
|
||||
func (t *defaultMessage) MarshalBinary() ([]byte, error) { return nil, nil }
|
||||
func (t *defaultMessage) UnmarshalBinary(b []byte) error { return nil }
|
||||
|
||||
// NewMetadata ...
|
||||
func NewMetadata() Metadata {
|
||||
return &defaultMetadata{
|
||||
values: make(map[string]interface{}),
|
||||
}
|
||||
}
|
||||
|
||||
type defaultMetadata struct {
|
||||
values map[string]interface{}
|
||||
}
|
||||
|
||||
func newDefaultMetadata(vals map[string]interface{}) Metadata {
|
||||
return &defaultMetadata{
|
||||
values: vals,
|
||||
}
|
||||
}
|
||||
|
||||
func (t *defaultMetadata) Keys() []string {
|
||||
keys := make([]string, 0)
|
||||
for key := range t.values {
|
||||
keys = append(keys, key)
|
||||
}
|
||||
return keys
|
||||
}
|
||||
|
||||
func (t *defaultMetadata) GetKeyValue(key string) interface{} {
|
||||
if _, found := t.values[key]; !found {
|
||||
logrus.Fatalf("no key '%s' in metadata", key)
|
||||
}
|
||||
return t.values[key]
|
||||
}
|
||||
|
||||
func (t *defaultMetadata) SetKeyValue(key string, val interface{}) {
|
||||
t.values[key] = val
|
||||
}
|
||||
|
||||
13
pkg/rule_engine/nodes/action_assign_to_customer_node.go
Normal file
13
pkg/rule_engine/nodes/action_assign_to_customer_node.go
Normal file
@@ -0,0 +1,13 @@
|
||||
package nodes
|
||||
|
||||
type assignToCustomerNode struct {
|
||||
bareNode
|
||||
}
|
||||
|
||||
type assignToCustomerFactory struct{}
|
||||
|
||||
func (f assignToCustomerFactory) Name() string { return "AssignCustomerFactoryNode" }
|
||||
func (f assignToCustomerFactory) Category() string { return NODE_CATEGORY_ACTION }
|
||||
func (f assignToCustomerFactory) Create(id string, meta Metadata) (Node, error) {
|
||||
return nil, nil
|
||||
}
|
||||
37
pkg/rule_engine/nodes/action_clear_alarm_node.go
Normal file
37
pkg/rule_engine/nodes/action_clear_alarm_node.go
Normal file
@@ -0,0 +1,37 @@
|
||||
package nodes
|
||||
|
||||
import (
|
||||
"pandax/pkg/rule_engine/message"
|
||||
"time"
|
||||
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
const ClearAlarmNodeName = "ClearAlarmNode"
|
||||
|
||||
type clearAlarmNodeFactory struct{}
|
||||
|
||||
type clearAlarmNode struct {
|
||||
bareNode
|
||||
DetailBuilderScript string `json:"detailBuilderScript" yaml:"detailBuilderScript"`
|
||||
AlarmType string `json:"alarmType" yaml:"alarmType"`
|
||||
AlarmSeverity string `json:"alarmSeverity" yaml:"alarmSeverity"`
|
||||
Propagate string `json:"propagate" yaml:"propagate"`
|
||||
AlarmStartTime *time.Time `json:"alarmStartTime" yaml:"alarmStartTime"`
|
||||
AlarmEndTime *time.Time `json:"alarmEndTime" yaml:"alarmEndTime"`
|
||||
}
|
||||
|
||||
func (f clearAlarmNodeFactory) Name() string { return ClearAlarmNodeName }
|
||||
func (f clearAlarmNodeFactory) Category() string { return NODE_CATEGORY_ACTION }
|
||||
func (f clearAlarmNodeFactory) Create(id string, meta Metadata) (Node, error) {
|
||||
labels := []string{"Created", "Updated", "Failure"}
|
||||
node := &clearAlarmNode{
|
||||
bareNode: newBareNode(f.Name(), id, meta, labels),
|
||||
}
|
||||
return decodePath(meta, node)
|
||||
}
|
||||
|
||||
func (n *clearAlarmNode) Handle(msg message.Message) error {
|
||||
logrus.Infof("%s handle message '%s'", n.Name(), msg.GetType())
|
||||
return nil
|
||||
}
|
||||
43
pkg/rule_engine/nodes/action_create_alarm_node.go
Normal file
43
pkg/rule_engine/nodes/action_create_alarm_node.go
Normal file
@@ -0,0 +1,43 @@
|
||||
package nodes
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"pandax/pkg/rule_engine/message"
|
||||
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
type createAlarmNode struct {
|
||||
bareNode
|
||||
DetailBuilderScript string `json:"detailBuilderScript" yaml:"detailBuilderScript"`
|
||||
AlarmType string `json:"alarmType" yaml:"alarmType"`
|
||||
AlarmSeverity string `json:"alarmSeverity" yaml:"alarmSeverity"`
|
||||
Propagate string `json:"propagate" yaml:"propagate"`
|
||||
AlarmStartTime string `json:"alarmStartTime" yaml:"alarmStartTime"`
|
||||
AlarmEndTime string `json:"alarmEndTime" yaml:"alarmEndTime"`
|
||||
}
|
||||
|
||||
type createAlarmNodeFactory struct{}
|
||||
|
||||
func (f createAlarmNodeFactory) Name() string { return "CreateAlarmNode" }
|
||||
func (f createAlarmNodeFactory) Category() string { return NODE_CATEGORY_ACTION }
|
||||
func (f createAlarmNodeFactory) Create(id string, meta Metadata) (Node, error) {
|
||||
labels := []string{"Created", "Updated", "Failure"}
|
||||
node := &createAlarmNode{
|
||||
bareNode: newBareNode(f.Name(), id, meta, labels),
|
||||
}
|
||||
return decodePath(meta, node)
|
||||
}
|
||||
|
||||
func (n *createAlarmNode) Handle(msg message.Message) error {
|
||||
logrus.Infof("%s handle message '%s'", n.Name(), msg.GetType())
|
||||
|
||||
node1 := n.GetLinkedNode("Created")
|
||||
node2 := n.GetLinkedNode("Updated")
|
||||
node3 := n.GetLinkedNode("Failure")
|
||||
if node1 == nil || node2 == nil || node3 == nil {
|
||||
return fmt.Errorf("no valid label linked node in %s", n.Name())
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
34
pkg/rule_engine/nodes/action_create_relation_node.go
Normal file
34
pkg/rule_engine/nodes/action_create_relation_node.go
Normal file
@@ -0,0 +1,34 @@
|
||||
package nodes
|
||||
|
||||
import (
|
||||
"pandax/pkg/rule_engine/message"
|
||||
)
|
||||
|
||||
type createRelationNode struct {
|
||||
bareNode
|
||||
Direction string
|
||||
RelationType string
|
||||
EntityType string
|
||||
EntityNamePattern string
|
||||
EntityTypePattern string
|
||||
EntityCacheExpiration int64
|
||||
CreateEntityIfNotExists bool
|
||||
ChangeOriginatorToRelatedEntity bool
|
||||
RemoveCurrentRelations bool
|
||||
}
|
||||
|
||||
type createRelationNodeFactory struct{}
|
||||
|
||||
func (f createRelationNodeFactory) Name() string { return "CreateRelationNode" }
|
||||
func (f createRelationNodeFactory) Category() string { return NODE_CATEGORY_ACTION }
|
||||
func (f createRelationNodeFactory) Create(id string, meta Metadata) (Node, error) {
|
||||
labels := []string{"Success", "Failure"}
|
||||
node := &createRelationNode{
|
||||
bareNode: newBareNode(f.Name(), id, meta, labels),
|
||||
}
|
||||
return decodePath(meta, node)
|
||||
}
|
||||
|
||||
func (n *createRelationNode) Handle(msg message.Message) error {
|
||||
return nil
|
||||
}
|
||||
@@ -1,3 +1,14 @@
|
||||
// Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
// not use p file except in compliance with the License. You may obtain
|
||||
// a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
// License for the specific language governing permissions and limitations
|
||||
// under the License.
|
||||
package nodes
|
||||
|
||||
import (
|
||||
|
||||
9
pkg/rule_engine/nodes/action_delete_relation_node.go
Normal file
9
pkg/rule_engine/nodes/action_delete_relation_node.go
Normal file
@@ -0,0 +1,9 @@
|
||||
package nodes
|
||||
|
||||
type deleteRelationNodeFactory struct{}
|
||||
|
||||
func (f deleteRelationNodeFactory) Name() string { return "DeleteRelationNode" }
|
||||
func (f deleteRelationNodeFactory) Category() string { return NODE_CATEGORY_ACTION }
|
||||
func (f deleteRelationNodeFactory) Create(id string, meta Metadata) (Node, error) {
|
||||
return nil, nil
|
||||
}
|
||||
38
pkg/rule_engine/nodes/action_generator_node.go
Normal file
38
pkg/rule_engine/nodes/action_generator_node.go
Normal file
@@ -0,0 +1,38 @@
|
||||
package nodes
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/sirupsen/logrus"
|
||||
"pandax/pkg/rule_engine/message"
|
||||
)
|
||||
|
||||
type messageGeneratorNode struct {
|
||||
bareNode
|
||||
DetailBuilderScript string `json:"detail_builder_script" yaml:"detail_builder_script"`
|
||||
FrequenceInSecond int32 `json:"frequency" yaml:"frequency"`
|
||||
}
|
||||
|
||||
type messageGeneratorNodeFactory struct{}
|
||||
|
||||
func (f messageGeneratorNodeFactory) Name() string { return "MessageGeneratorNode" }
|
||||
func (f messageGeneratorNodeFactory) Category() string { return NODE_CATEGORY_ACTION }
|
||||
|
||||
func (f messageGeneratorNodeFactory) Create(id string, meta Metadata) (Node, error) {
|
||||
labels := []string{"Created", "Updated"}
|
||||
node := &messageGeneratorNode{
|
||||
bareNode: newBareNode(f.Name(), id, meta, labels),
|
||||
}
|
||||
return decodePath(meta, node)
|
||||
}
|
||||
|
||||
func (n *messageGeneratorNode) Handle(msg message.Message) error {
|
||||
logrus.Infof("%s handle message '%s'", n.Name(), msg.GetType())
|
||||
|
||||
createdLabelNode := n.GetLinkedNode("Created")
|
||||
updatedLabelNode := n.GetLinkedNode("Updated")
|
||||
if createdLabelNode == nil || updatedLabelNode == nil {
|
||||
return fmt.Errorf("no valid label linked node in %s", n.Name())
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
41
pkg/rule_engine/nodes/action_log_node.go
Normal file
41
pkg/rule_engine/nodes/action_log_node.go
Normal file
@@ -0,0 +1,41 @@
|
||||
package nodes
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
"pandax/pkg/rule_engine/message"
|
||||
)
|
||||
|
||||
type logNode struct {
|
||||
bareNode
|
||||
Script string
|
||||
}
|
||||
|
||||
type logNodeFactory struct{}
|
||||
|
||||
func (f logNodeFactory) Name() string { return "LogNode" }
|
||||
func (f logNodeFactory) Category() string { return NODE_CATEGORY_ACTION }
|
||||
func (f logNodeFactory) Create(id string, meta Metadata) (Node, error) {
|
||||
labels := []string{"Success", "Failure"}
|
||||
node := &logNode{
|
||||
bareNode: newBareNode(f.Name(), id, meta, labels),
|
||||
}
|
||||
return decodePath(meta, node)
|
||||
}
|
||||
|
||||
func (n *logNode) Handle(msg message.Message) error {
|
||||
successLableNode := n.GetLinkedNode("Success")
|
||||
failureLableNode := n.GetLinkedNode("Failure")
|
||||
|
||||
scriptEngine := NewScriptEngine()
|
||||
logMessage, err := scriptEngine.ScriptToString(msg, n.Script)
|
||||
|
||||
if successLableNode == nil || failureLableNode == nil {
|
||||
return fmt.Errorf("no valid label linked node in %s", n.Name())
|
||||
}
|
||||
if err != nil {
|
||||
return failureLableNode.Handle(msg)
|
||||
}
|
||||
log.Println(logMessage)
|
||||
return successLableNode.Handle(msg)
|
||||
}
|
||||
9
pkg/rule_engine/nodes/action_rpc_call_reply_node.go
Normal file
9
pkg/rule_engine/nodes/action_rpc_call_reply_node.go
Normal file
@@ -0,0 +1,9 @@
|
||||
package nodes
|
||||
|
||||
type rpcCallReplyNodeFactory struct{}
|
||||
|
||||
func (f rpcCallReplyNodeFactory) Name() string { return "RPCCallReplyNode" }
|
||||
func (f rpcCallReplyNodeFactory) Category() string { return NODE_CATEGORY_ACTION }
|
||||
func (f rpcCallReplyNodeFactory) Create(id string, meta Metadata) (Node, error) {
|
||||
return nil, nil
|
||||
}
|
||||
14
pkg/rule_engine/nodes/action_rpc_call_request_node.go
Normal file
14
pkg/rule_engine/nodes/action_rpc_call_request_node.go
Normal file
@@ -0,0 +1,14 @@
|
||||
package nodes
|
||||
|
||||
type rPCCallRequestNode struct {
|
||||
bareNode
|
||||
TimeoutInSeconds int
|
||||
}
|
||||
|
||||
type rpcCallRequestNodeFactory struct{}
|
||||
|
||||
func (f rpcCallRequestNodeFactory) Name() string { return "RPCCallRequestNode" }
|
||||
func (f rpcCallRequestNodeFactory) Category() string { return NODE_CATEGORY_ACTION }
|
||||
func (f rpcCallRequestNodeFactory) Create(id string, meta Metadata) (Node, error) {
|
||||
return nil, nil
|
||||
}
|
||||
35
pkg/rule_engine/nodes/action_save_attributes_node.go
Normal file
35
pkg/rule_engine/nodes/action_save_attributes_node.go
Normal file
@@ -0,0 +1,35 @@
|
||||
package nodes
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"pandax/pkg/rule_engine/message"
|
||||
)
|
||||
|
||||
type SaveAttributesNode struct {
|
||||
bareNode
|
||||
}
|
||||
|
||||
type saveAttributesNodeFactory struct{}
|
||||
|
||||
func (f saveAttributesNodeFactory) Name() string { return "SaveAttributesNode" }
|
||||
func (f saveAttributesNodeFactory) Category() string { return NODE_CATEGORY_ACTION }
|
||||
func (f saveAttributesNodeFactory) Create(id string, meta Metadata) (Node, error) {
|
||||
labels := []string{"Success", "Failure"}
|
||||
node := &SaveAttributesNode{
|
||||
bareNode: newBareNode(f.Name(), id, meta, labels),
|
||||
}
|
||||
return decodePath(meta, node)
|
||||
}
|
||||
|
||||
func (n *SaveAttributesNode) Handle(msg message.Message) error {
|
||||
successLableNode := n.GetLinkedNode("Success")
|
||||
failureLableNode := n.GetLinkedNode("Failure")
|
||||
if successLableNode == nil || failureLableNode == nil {
|
||||
return fmt.Errorf("no valid label linked node in %s", n.Name())
|
||||
}
|
||||
if msg.GetType() != "POST_ATTRIBUTES_REQUEST" {
|
||||
return failureLableNode.Handle(msg)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
9
pkg/rule_engine/nodes/action_save_timeseries_node.go
Normal file
9
pkg/rule_engine/nodes/action_save_timeseries_node.go
Normal file
@@ -0,0 +1,9 @@
|
||||
package nodes
|
||||
|
||||
type saveTimeSeriesNodeFactory struct{}
|
||||
|
||||
func (f saveTimeSeriesNodeFactory) Name() string { return "SaveTimeSeriesNode" }
|
||||
func (f saveTimeSeriesNodeFactory) Category() string { return NODE_CATEGORY_ACTION }
|
||||
func (f saveTimeSeriesNodeFactory) Create(id string, meta Metadata) (Node, error) {
|
||||
return nil, nil
|
||||
}
|
||||
@@ -0,0 +1,9 @@
|
||||
package nodes
|
||||
|
||||
type unassignFromCustomerNodeFactory struct{}
|
||||
|
||||
func (f unassignFromCustomerNodeFactory) Name() string { return "UnassignFromCustomerNode" }
|
||||
func (f unassignFromCustomerNodeFactory) Category() string { return NODE_CATEGORY_ACTION }
|
||||
func (f unassignFromCustomerNodeFactory) Create(id string, meta Metadata) (Node, error) {
|
||||
return nil, nil
|
||||
}
|
||||
23
pkg/rule_engine/nodes/enrichment_customer_attr_node.go
Normal file
23
pkg/rule_engine/nodes/enrichment_customer_attr_node.go
Normal file
@@ -0,0 +1,23 @@
|
||||
package nodes
|
||||
|
||||
import "pandax/pkg/rule_engine/message"
|
||||
|
||||
type enrichmentCustomerNode struct {
|
||||
bareNode
|
||||
}
|
||||
|
||||
type enrichmentCustomerAttrNodeFactory struct{}
|
||||
|
||||
func (f enrichmentCustomerAttrNodeFactory) Name() string { return "EnrichmentCustomerNode" }
|
||||
func (f enrichmentCustomerAttrNodeFactory) Category() string { return NODE_CATEGORY_ENRICHMENT }
|
||||
func (f enrichmentCustomerAttrNodeFactory) Create(id string, meta Metadata) (Node, error) {
|
||||
labels := []string{"Success", "Failure"}
|
||||
node := &enrichmentCustomerNode{
|
||||
bareNode: newBareNode(f.Name(), id, meta, labels),
|
||||
}
|
||||
return decodePath(meta, node)
|
||||
}
|
||||
|
||||
func (n *enrichmentCustomerNode) Handle(msg message.Message) error {
|
||||
return nil
|
||||
}
|
||||
9
pkg/rule_engine/nodes/enrichment_device_attr_node.go
Normal file
9
pkg/rule_engine/nodes/enrichment_device_attr_node.go
Normal file
@@ -0,0 +1,9 @@
|
||||
package nodes
|
||||
|
||||
type enrichmentDeviceAttrNodeFactory struct{}
|
||||
|
||||
func (f enrichmentDeviceAttrNodeFactory) Name() string { return "EnrichmentDeviceAttrbute" }
|
||||
func (f enrichmentDeviceAttrNodeFactory) Category() string { return NODE_CATEGORY_ENRICHMENT }
|
||||
func (f enrichmentDeviceAttrNodeFactory) Create(id string, meta Metadata) (Node, error) {
|
||||
return nil, nil
|
||||
}
|
||||
9
pkg/rule_engine/nodes/enrichment_originator_attr_node.go
Normal file
9
pkg/rule_engine/nodes/enrichment_originator_attr_node.go
Normal file
@@ -0,0 +1,9 @@
|
||||
package nodes
|
||||
|
||||
type enrichmentOriginatorAttrNodeFactory struct{}
|
||||
|
||||
func (f enrichmentOriginatorAttrNodeFactory) Name() string { return "EnrichmentOriginatorAttribute" }
|
||||
func (f enrichmentOriginatorAttrNodeFactory) Category() string { return NODE_CATEGORY_ENRICHMENT }
|
||||
func (f enrichmentOriginatorAttrNodeFactory) Create(id string, meta Metadata) (Node, error) {
|
||||
return nil, nil
|
||||
}
|
||||
@@ -0,0 +1,9 @@
|
||||
package nodes
|
||||
|
||||
type enrichmentOriginatorFieldsNodeFactory struct{}
|
||||
|
||||
func (f enrichmentOriginatorFieldsNodeFactory) Name() string { return "EnrichmentOriginatorFieldsNode" }
|
||||
func (f enrichmentOriginatorFieldsNodeFactory) Category() string { return NODE_CATEGORY_ENRICHMENT }
|
||||
func (f enrichmentOriginatorFieldsNodeFactory) Create(id string, meta Metadata) (Node, error) {
|
||||
return nil, nil
|
||||
}
|
||||
@@ -0,0 +1,11 @@
|
||||
package nodes
|
||||
|
||||
type enrichmentOriginatorTelemetryNodeFactory struct{}
|
||||
|
||||
func (f enrichmentOriginatorTelemetryNodeFactory) Name() string {
|
||||
return "EnrichmentOriginatorTelemetryNode"
|
||||
}
|
||||
func (f enrichmentOriginatorTelemetryNodeFactory) Category() string { return NODE_CATEGORY_ENRICHMENT }
|
||||
func (f enrichmentOriginatorTelemetryNodeFactory) Create(id string, meta Metadata) (Node, error) {
|
||||
return nil, nil
|
||||
}
|
||||
17
pkg/rule_engine/nodes/enrichment_tenant_attr.go
Normal file
17
pkg/rule_engine/nodes/enrichment_tenant_attr.go
Normal file
@@ -0,0 +1,17 @@
|
||||
package nodes
|
||||
|
||||
type enrichmentTenantNode struct {
|
||||
bareNode
|
||||
}
|
||||
|
||||
type enrichmentTenantNodeFactory struct{}
|
||||
|
||||
func (f enrichmentTenantNodeFactory) Name() string { return "EnrichmentTenantNode" }
|
||||
func (f enrichmentTenantNodeFactory) Category() string { return NODE_CATEGORY_ENRICHMENT }
|
||||
func (f enrichmentTenantNodeFactory) Create(id string, meta Metadata) (Node, error) {
|
||||
labels := []string{"Success", "Failure"}
|
||||
node := &enrichmentTenantNode{
|
||||
bareNode: newBareNode(f.Name(), id, meta, labels),
|
||||
}
|
||||
return decodePath(meta, node)
|
||||
}
|
||||
64
pkg/rule_engine/nodes/external_mqtt_node.go
Normal file
64
pkg/rule_engine/nodes/external_mqtt_node.go
Normal file
@@ -0,0 +1,64 @@
|
||||
package nodes
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"pandax/pkg/rule_engine/message"
|
||||
"time"
|
||||
|
||||
mqtt "github.com/eclipse/paho.mqtt.golang"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
type externalMqttNode struct {
|
||||
bareNode
|
||||
TopicPattern string
|
||||
Host string
|
||||
Port string
|
||||
ConnectTimeoutSec int
|
||||
ClientId string
|
||||
CleanSession bool
|
||||
Ssl bool
|
||||
MqttCli mqtt.Client
|
||||
}
|
||||
|
||||
type externalMqttNodeFactory struct{}
|
||||
|
||||
func (f externalMqttNodeFactory) Name() string { return "ExternalMqttNode" }
|
||||
func (f externalMqttNodeFactory) Category() string { return NODE_CATEGORY_EXTERNAL }
|
||||
func (f externalMqttNodeFactory) Create(id string, meta Metadata) (Node, error) {
|
||||
labels := []string{"Success", "Failure"}
|
||||
|
||||
node := &externalMqttNode{
|
||||
bareNode: newBareNode(f.Name(), id, meta, labels),
|
||||
}
|
||||
broker := fmt.Sprintf("tcp://%s:%s", node.Host, node.Port)
|
||||
opts := mqtt.NewClientOptions().AddBroker(broker)
|
||||
opts.SetClientID(node.ClientId)
|
||||
opts.SetCleanSession(node.CleanSession)
|
||||
opts.SetConnectTimeout(time.Duration(node.ConnectTimeoutSec) * time.Second)
|
||||
node.MqttCli = mqtt.NewClient(opts)
|
||||
|
||||
if token := node.MqttCli.Connect(); token.Wait() && token.Error() != nil {
|
||||
logrus.WithError(token.Error())
|
||||
return nil, token.Error()
|
||||
}
|
||||
return decodePath(meta, node)
|
||||
}
|
||||
|
||||
func (n *externalMqttNode) Handle(msg message.Message) error {
|
||||
logrus.Infof("%s handle message '%s'", n.Name(), msg.GetType())
|
||||
|
||||
successLabelNode := n.GetLinkedNode("Success")
|
||||
failureLabelNode := n.GetLinkedNode("Failure")
|
||||
topic := n.TopicPattern //need fix add msg.metadata in it
|
||||
sendmqttmsg, err := msg.MarshalBinary()
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
token := n.MqttCli.Publish(topic, 1, false, sendmqttmsg)
|
||||
if token.Wait() && token.Error() != nil {
|
||||
fmt.Println(token.Error())
|
||||
return failureLabelNode.Handle(msg)
|
||||
}
|
||||
return successLabelNode.Handle(msg)
|
||||
}
|
||||
26
pkg/rule_engine/nodes/external_restapi_node.go
Normal file
26
pkg/rule_engine/nodes/external_restapi_node.go
Normal file
@@ -0,0 +1,26 @@
|
||||
package nodes
|
||||
|
||||
type externalRestapiNode struct {
|
||||
bareNode
|
||||
RestEndpointUrlPattern string
|
||||
RequestMethod string
|
||||
headers map[string]string
|
||||
UseSimpleClientHttpFactory bool
|
||||
ReadTimeoutMs int
|
||||
MaxParallelRequestsCount int
|
||||
UseRedisQueueForMsgPersistence bool
|
||||
trimQueue bool
|
||||
MaxQueueSize int
|
||||
}
|
||||
|
||||
type externalRestapiNodeFactory struct{}
|
||||
|
||||
func (f externalRestapiNodeFactory) Name() string { return "ExternalRestapiNode" }
|
||||
func (f externalRestapiNodeFactory) Category() string { return NODE_CATEGORY_EXTERNAL }
|
||||
func (f externalRestapiNodeFactory) Create(id string, meta Metadata) (Node, error) {
|
||||
labels := []string{"True", "False"}
|
||||
node := &externalRestapiNode{
|
||||
bareNode: newBareNode(f.Name(), id, meta, labels),
|
||||
}
|
||||
return decodePath(meta, node)
|
||||
}
|
||||
@@ -11,6 +11,7 @@ const (
|
||||
NODE_CATEGORY_TRANSFORM = "transform"
|
||||
NODE_CATEGORY_EXTERNAL = "external"
|
||||
NODE_CATEGORY_OTHERS = "others"
|
||||
NODE_CATEGORY_FLOWS = "flows"
|
||||
)
|
||||
|
||||
// Factory is node's factory to create node based on metadata
|
||||
|
||||
34
pkg/rule_engine/nodes/filter_check_existence_fields_node.go
Normal file
34
pkg/rule_engine/nodes/filter_check_existence_fields_node.go
Normal file
@@ -0,0 +1,34 @@
|
||||
package nodes
|
||||
|
||||
import (
|
||||
"github.com/sirupsen/logrus"
|
||||
"pandax/pkg/rule_engine/message"
|
||||
)
|
||||
|
||||
//检查关联关系
|
||||
//该消息来自与哪个实体或到那个实体
|
||||
type checkExistenceFieldsNode struct {
|
||||
bareNode
|
||||
}
|
||||
|
||||
type checkExistenceFieldsNodeFactory struct{}
|
||||
|
||||
func (f checkExistenceFieldsNodeFactory) Name() string { return "CheckExistenceFieldsNode" }
|
||||
func (f checkExistenceFieldsNodeFactory) Category() string { return NODE_CATEGORY_FILTER }
|
||||
|
||||
func (f checkExistenceFieldsNodeFactory) Create(id string, meta Metadata) (Node, error) {
|
||||
labels := []string{"True", "False"}
|
||||
node := &checkExistenceFieldsNode{
|
||||
bareNode: newBareNode(f.Name(), id, meta, labels),
|
||||
}
|
||||
return decodePath(meta, node)
|
||||
}
|
||||
|
||||
func (n *checkExistenceFieldsNode) Handle(msg message.Message) error {
|
||||
logrus.Infof("%s handle message '%s'", n.Name(), msg.GetType())
|
||||
|
||||
//trueLabelNode := n.GetLinkedNode("True")
|
||||
//falseLabelNode := n.GetLinkedNode("False")
|
||||
|
||||
return nil
|
||||
}
|
||||
67
pkg/rule_engine/nodes/filter_check_relation_node.go
Normal file
67
pkg/rule_engine/nodes/filter_check_relation_node.go
Normal file
@@ -0,0 +1,67 @@
|
||||
package nodes
|
||||
|
||||
import (
|
||||
"github.com/sirupsen/logrus"
|
||||
"pandax/pkg/rule_engine/message"
|
||||
)
|
||||
|
||||
const (
|
||||
RelationTypeContains = "Contains"
|
||||
RelationTypeNotContains = "NotContains"
|
||||
)
|
||||
|
||||
//检查关联关系
|
||||
//该消息来自与哪个实体或到那个实体
|
||||
type checkRelationFilterNode struct {
|
||||
bareNode
|
||||
Direction string `json:"direction" yaml:"direction"`
|
||||
RelationType string `json:"relationType" yaml:"relationType"`
|
||||
InstanceType string `json:"instanceType" yaml:"instanceType"`
|
||||
Values []string `json:"values" yaml:"values"`
|
||||
}
|
||||
|
||||
type checkRelationFilterNodeFactory struct{}
|
||||
|
||||
func (f checkRelationFilterNodeFactory) Name() string { return "CheckRelationFilterNode" }
|
||||
func (f checkRelationFilterNodeFactory) Category() string { return NODE_CATEGORY_FILTER }
|
||||
|
||||
func (f checkRelationFilterNodeFactory) Create(id string, meta Metadata) (Node, error) {
|
||||
labels := []string{"True", "False"}
|
||||
node := &checkRelationFilterNode{
|
||||
bareNode: newBareNode(f.Name(), id, meta, labels),
|
||||
Values: []string{},
|
||||
}
|
||||
return decodePath(meta, node)
|
||||
}
|
||||
|
||||
func (n *checkRelationFilterNode) Handle(msg message.Message) error {
|
||||
logrus.Infof("%s handle message '%s'", n.Name(), msg.GetType())
|
||||
|
||||
trueLabelNode := n.GetLinkedNode("True")
|
||||
falseLabelNode := n.GetLinkedNode("False")
|
||||
|
||||
//direction := msg.GetDirection()
|
||||
attr := msg.GetMetadata().GetKeyValue(n.InstanceType)
|
||||
switch n.RelationType {
|
||||
case RelationTypeContains:
|
||||
for _, val := range n.Values {
|
||||
// specified attribute exist in names
|
||||
if attr == val {
|
||||
return trueLabelNode.Handle(msg)
|
||||
}
|
||||
}
|
||||
// not found
|
||||
return falseLabelNode.Handle(msg)
|
||||
|
||||
case RelationTypeNotContains:
|
||||
for _, val := range n.Values {
|
||||
// specified attribute exist in names
|
||||
if attr == val {
|
||||
return falseLabelNode.Handle(msg)
|
||||
}
|
||||
}
|
||||
// not found
|
||||
return trueLabelNode.Handle(msg)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
47
pkg/rule_engine/nodes/filter_message_type_node.go
Normal file
47
pkg/rule_engine/nodes/filter_message_type_node.go
Normal file
@@ -0,0 +1,47 @@
|
||||
package nodes
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/sirupsen/logrus"
|
||||
"pandax/pkg/rule_engine/message"
|
||||
)
|
||||
|
||||
type messageTypeFilterNode struct {
|
||||
bareNode
|
||||
MessageTypes []string `json:"messageTypes" yaml:"messageTypes"`
|
||||
}
|
||||
|
||||
type messageTypeFilterNodeFactory struct{}
|
||||
|
||||
func (f messageTypeFilterNodeFactory) Name() string { return "MessageTypeFilterNode" }
|
||||
func (f messageTypeFilterNodeFactory) Category() string { return NODE_CATEGORY_FILTER }
|
||||
|
||||
func (f messageTypeFilterNodeFactory) Create(id string, meta Metadata) (Node, error) {
|
||||
labels := []string{"True", "False"}
|
||||
node := &messageTypeFilterNode{
|
||||
bareNode: newBareNode(f.Name(), id, meta, labels),
|
||||
MessageTypes: []string{},
|
||||
}
|
||||
return decodePath(meta, node)
|
||||
}
|
||||
|
||||
func (n *messageTypeFilterNode) Handle(msg message.Message) error {
|
||||
logrus.Infof("%s handle message '%s'", n.Name(), msg.GetType())
|
||||
|
||||
trueLabelNode := n.GetLinkedNode("True")
|
||||
falseLabelNode := n.GetLinkedNode("False")
|
||||
if trueLabelNode == nil || falseLabelNode == nil {
|
||||
return fmt.Errorf("no true or false label linked node in %s", n.Name())
|
||||
}
|
||||
messageType := msg.GetType()
|
||||
|
||||
// TODO: how to resolve user customized message type dynamically
|
||||
//userMessageType := msg.GetMetadata().GetKeyValue(n.Metadata().MessageTypeKey)
|
||||
userMessageType := "TODO"
|
||||
for _, filterType := range n.MessageTypes {
|
||||
if filterType == messageType || filterType == userMessageType {
|
||||
return trueLabelNode.Handle(msg)
|
||||
}
|
||||
}
|
||||
return falseLabelNode.Handle(msg)
|
||||
}
|
||||
46
pkg/rule_engine/nodes/filter_message_type_switch_node.go
Normal file
46
pkg/rule_engine/nodes/filter_message_type_switch_node.go
Normal file
@@ -0,0 +1,46 @@
|
||||
package nodes
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"pandax/pkg/rule_engine/message"
|
||||
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
type messageTypeSwitchNode struct {
|
||||
bareNode
|
||||
}
|
||||
type messageTypeSwitchNodeFactory struct{}
|
||||
|
||||
func (f messageTypeSwitchNodeFactory) Name() string { return "MessageTypeSwitchNode" }
|
||||
func (f messageTypeSwitchNodeFactory) Category() string { return NODE_CATEGORY_FILTER }
|
||||
|
||||
func (f messageTypeSwitchNodeFactory) Create(id string, meta Metadata) (Node, error) {
|
||||
labels := []string{"True", "False"}
|
||||
node := &messageTypeSwitchNode{
|
||||
bareNode: newBareNode(f.Name(), id, meta, labels),
|
||||
}
|
||||
return decodePath(meta, node)
|
||||
}
|
||||
|
||||
func (n *messageTypeSwitchNode) Handle(msg message.Message) error {
|
||||
logrus.Infof("%s handle message '%s'", n.Name(), msg.GetType())
|
||||
|
||||
nodes := n.GetLinkedNodes()
|
||||
messageType := msg.GetType()
|
||||
messageTypeKey, _ := n.Metadata().Value(NODE_CONFIG_MESSAGE_TYPE_KEY)
|
||||
userMessageType := msg.GetMetadata().GetKeyValue(messageTypeKey.(string))
|
||||
|
||||
for label, node := range nodes {
|
||||
if messageType == label || userMessageType == label {
|
||||
return node.Handle(msg)
|
||||
}
|
||||
}
|
||||
// if other label exist
|
||||
if node := n.GetLinkedNode("Other"); node != nil {
|
||||
return node.Handle(msg)
|
||||
}
|
||||
|
||||
// not found
|
||||
return fmt.Errorf("%s no label to handle message", n.Name())
|
||||
}
|
||||
43
pkg/rule_engine/nodes/filter_originator_type_node.go
Normal file
43
pkg/rule_engine/nodes/filter_originator_type_node.go
Normal file
@@ -0,0 +1,43 @@
|
||||
package nodes
|
||||
|
||||
import (
|
||||
"github.com/sirupsen/logrus"
|
||||
"pandax/pkg/rule_engine/message"
|
||||
)
|
||||
|
||||
type originatorTypeFilterNode struct {
|
||||
bareNode
|
||||
Filters []string `json:"filters" yaml:"filters"`
|
||||
}
|
||||
|
||||
type originatorFilterNodeFactory struct{}
|
||||
|
||||
func (f originatorFilterNodeFactory) Name() string { return "OriginatorFilterNode" }
|
||||
func (f originatorFilterNodeFactory) Category() string { return NODE_CATEGORY_FILTER }
|
||||
|
||||
func (f originatorFilterNodeFactory) Create(id string, meta Metadata) (Node, error) {
|
||||
labels := []string{"True", "False"}
|
||||
node := &originatorTypeFilterNode{
|
||||
bareNode: newBareNode(f.Name(), id, meta, labels),
|
||||
Filters: []string{},
|
||||
}
|
||||
return decodePath(meta, node)
|
||||
}
|
||||
|
||||
func (n *originatorTypeFilterNode) Handle(msg message.Message) error {
|
||||
logrus.Infof("%s handle message '%s'", n.Name(), msg.GetType())
|
||||
|
||||
trueLabelNode := n.GetLinkedNode("True")
|
||||
falseLabelNode := n.GetLinkedNode("False")
|
||||
|
||||
//links := n.GetLinks()
|
||||
originatorType := msg.GetOriginator()
|
||||
|
||||
for _, filter := range n.Filters {
|
||||
if originatorType == filter {
|
||||
return trueLabelNode.Handle(msg)
|
||||
}
|
||||
}
|
||||
// not found
|
||||
return falseLabelNode.Handle(msg)
|
||||
}
|
||||
39
pkg/rule_engine/nodes/filter_originator_type_switch_node.go
Normal file
39
pkg/rule_engine/nodes/filter_originator_type_switch_node.go
Normal file
@@ -0,0 +1,39 @@
|
||||
package nodes
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/sirupsen/logrus"
|
||||
"pandax/pkg/rule_engine/message"
|
||||
)
|
||||
|
||||
type originatorTypeSwitchNode struct {
|
||||
bareNode
|
||||
}
|
||||
|
||||
type originatorTypeSwitchNodeFactory struct{}
|
||||
|
||||
func (f originatorTypeSwitchNodeFactory) Name() string { return "OriginatorTypeSwitchNode" }
|
||||
func (f originatorTypeSwitchNodeFactory) Category() string { return NODE_CATEGORY_FILTER }
|
||||
|
||||
func (f originatorTypeSwitchNodeFactory) Create(id string, meta Metadata) (Node, error) {
|
||||
labels := []string{}
|
||||
node := &originatorTypeSwitchNode{
|
||||
bareNode: newBareNode(f.Name(), id, meta, labels),
|
||||
}
|
||||
return decodePath(meta, node)
|
||||
}
|
||||
|
||||
func (n *originatorTypeSwitchNode) Handle(msg message.Message) error {
|
||||
logrus.Infof("%s handle message '%s'", n.Name(), msg.GetType())
|
||||
|
||||
nodes := n.GetLinkedNodes()
|
||||
originatorType := msg.GetOriginator()
|
||||
|
||||
for label, node := range nodes {
|
||||
if originatorType == label {
|
||||
return node.Handle(msg)
|
||||
}
|
||||
}
|
||||
// not found
|
||||
return fmt.Errorf("%s no label to handle message", n.Name())
|
||||
}
|
||||
39
pkg/rule_engine/nodes/filter_script_node.go
Normal file
39
pkg/rule_engine/nodes/filter_script_node.go
Normal file
@@ -0,0 +1,39 @@
|
||||
package nodes
|
||||
|
||||
import (
|
||||
"github.com/sirupsen/logrus"
|
||||
"pandax/pkg/rule_engine/message"
|
||||
)
|
||||
|
||||
const ScriptFilterNodeName = "ScriptFilterNode"
|
||||
|
||||
type scriptFilterNode struct {
|
||||
bareNode
|
||||
Scripts string `json:"scripts" yaml:"scripts"`
|
||||
}
|
||||
|
||||
type scriptFilterNodeFactory struct{}
|
||||
|
||||
func (f scriptFilterNodeFactory) Name() string { return "ScriptFilterNode" }
|
||||
func (f scriptFilterNodeFactory) Category() string { return NODE_CATEGORY_FILTER }
|
||||
|
||||
func (f scriptFilterNodeFactory) Create(id string, meta Metadata) (Node, error) {
|
||||
labels := []string{"True", "False"}
|
||||
node := &scriptFilterNode{
|
||||
bareNode: newBareNode(f.Name(), id, meta, labels),
|
||||
}
|
||||
return decodePath(meta, node)
|
||||
}
|
||||
|
||||
func (n *scriptFilterNode) Handle(msg message.Message) error {
|
||||
logrus.Infof("%s handle message '%s'", n.Name(), msg.GetType())
|
||||
|
||||
trueLabelNode := n.GetLinkedNode("True")
|
||||
falseLabelNode := n.GetLinkedNode("False")
|
||||
scriptEngine := NewScriptEngine()
|
||||
isTrue, error := scriptEngine.ScriptOnFilter(msg, n.Scripts)
|
||||
if isTrue == true && error == nil {
|
||||
return trueLabelNode.Handle(msg)
|
||||
}
|
||||
return falseLabelNode.Handle(msg)
|
||||
}
|
||||
@@ -1,3 +1,14 @@
|
||||
// Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
// not use p file except in compliance with the License. You may obtain
|
||||
// a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
// License for the specific language governing permissions and limitations
|
||||
// under the License.
|
||||
package nodes
|
||||
|
||||
import (
|
||||
@@ -26,7 +37,7 @@ func (f switchFilterNodeFactory) Create(id string, meta Metadata) (Node, error)
|
||||
func (n *switchFilterNode) Handle(msg message.Message) error {
|
||||
logrus.Infof("%s handle message '%s'", n.Name(), msg.GetType())
|
||||
|
||||
/*scriptEngine := NewScriptEngine()
|
||||
scriptEngine := NewScriptEngine()
|
||||
SwitchResults, err := scriptEngine.ScriptOnSwitch(msg, n.Scripts)
|
||||
if err != nil {
|
||||
return nil
|
||||
@@ -38,6 +49,6 @@ func (n *switchFilterNode) Handle(msg message.Message) error {
|
||||
return node.Handle(msg)
|
||||
}
|
||||
}
|
||||
}*/
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -3,4 +3,6 @@ package nodes
|
||||
// init register all node's factory
|
||||
func init() {
|
||||
RegisterFactory(inputNodeFactory{})
|
||||
RegisterFactory(switchFilterNodeFactory{})
|
||||
RegisterFactory(delayNodeFactory{})
|
||||
}
|
||||
|
||||
@@ -2,7 +2,7 @@ package nodes
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/XM-GO/PandaKit/utils"
|
||||
"github.com/mitchellh/mapstructure"
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -58,5 +58,6 @@ func (c *nodeMetadata) With(key string, val interface{}) Metadata {
|
||||
}
|
||||
|
||||
func (c *nodeMetadata) DecodePath(rawVal interface{}) error {
|
||||
return utils.Map2Struct(c.keypairs, rawVal)
|
||||
//return utils.Map2Struct(c.keypairs, rawVal)
|
||||
return mapstructure.Decode(c.keypairs, rawVal)
|
||||
}
|
||||
|
||||
39
pkg/rule_engine/nodes/script_engine.go
Normal file
39
pkg/rule_engine/nodes/script_engine.go
Normal file
@@ -0,0 +1,39 @@
|
||||
package nodes
|
||||
|
||||
import "pandax/pkg/rule_engine/message"
|
||||
|
||||
type ScriptEngine interface {
|
||||
ScriptOnMessage(msg message.Message, script string) (message.Message, error)
|
||||
//used by filter_switch_node
|
||||
ScriptOnSwitch(msg message.Message, script string) ([]string, error)
|
||||
//used by filter_script_node
|
||||
ScriptOnFilter(msg message.Message, script string) (bool, error)
|
||||
ScriptToString(msg message.Message, script string) (string, error)
|
||||
}
|
||||
|
||||
type baseScriptEngine struct {
|
||||
}
|
||||
|
||||
func NewScriptEngine() ScriptEngine {
|
||||
return &baseScriptEngine{}
|
||||
}
|
||||
|
||||
func (bse *baseScriptEngine) ScriptOnMessage(msg message.Message, script string) (message.Message, error) {
|
||||
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (bse *baseScriptEngine) ScriptOnSwitch(msg message.Message, script string) ([]string, error) {
|
||||
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (bse *baseScriptEngine) ScriptOnFilter(msg message.Message, script string) (bool, error) {
|
||||
|
||||
return false, nil
|
||||
}
|
||||
|
||||
func (bse *baseScriptEngine) ScriptToString(msg message.Message, script string) (string, error) {
|
||||
|
||||
return "", nil
|
||||
}
|
||||
43
pkg/rule_engine/nodes/transform_change_originator_node.go
Normal file
43
pkg/rule_engine/nodes/transform_change_originator_node.go
Normal file
@@ -0,0 +1,43 @@
|
||||
package nodes
|
||||
|
||||
import (
|
||||
"github.com/sirupsen/logrus"
|
||||
"pandax/pkg/rule_engine/message"
|
||||
)
|
||||
|
||||
type transformChangeOriginatorNode struct {
|
||||
bareNode
|
||||
OriginatorSource string `json:"originatorSource" yaml:"originatorSource"`
|
||||
Direction string `json:"direction" yaml:"direction"`
|
||||
MaxRelationLevel int `json:"maxRelationLevel" yaml:"maxRelationLevel"`
|
||||
//RelationFilters []runtime.RelationFilter `json:"relationFilters" yaml:"relationFilters"`
|
||||
}
|
||||
|
||||
type transformChangeOriginatorNodeFactory struct{}
|
||||
|
||||
func (f transformChangeOriginatorNodeFactory) Name() string { return "TransformChangeOriginatorNode" }
|
||||
func (f transformChangeOriginatorNodeFactory) Category() string { return NODE_CATEGORY_TRANSFORM }
|
||||
|
||||
func (f transformChangeOriginatorNodeFactory) Create(id string, meta Metadata) (Node, error) {
|
||||
labels := []string{"Success", "Failure"}
|
||||
node := &transformChangeOriginatorNode{
|
||||
bareNode: newBareNode(f.Name(), id, meta, labels),
|
||||
//RelationFilters: []runtime.RelationFilter{},
|
||||
}
|
||||
return decodePath(meta, node)
|
||||
}
|
||||
|
||||
func (n *transformChangeOriginatorNode) Handle(msg message.Message) error {
|
||||
logrus.Infof("%s handle message '%s'", n.Name(), msg.GetType())
|
||||
|
||||
//successLabelNode := n.GetLinkedNode("Sucess")
|
||||
failureLabelNode := n.GetLinkedNode("Failure")
|
||||
//relationQuery := runtime.NewRelationQuery()
|
||||
|
||||
//entities := relationQuery.QueryEntities(n.Direction, n.MaxRelationLevel, n.RelationFilters)
|
||||
/*if len(entities) > 0 && entities[0] == msg.GetOriginator() {
|
||||
msg.SetOriginator(entities[0])
|
||||
return successLabelNode.Handle(msg)
|
||||
}*/
|
||||
return failureLabelNode.Handle(msg)
|
||||
}
|
||||
38
pkg/rule_engine/nodes/transform_script_node.go
Normal file
38
pkg/rule_engine/nodes/transform_script_node.go
Normal file
@@ -0,0 +1,38 @@
|
||||
package nodes
|
||||
|
||||
import (
|
||||
"github.com/sirupsen/logrus"
|
||||
"pandax/pkg/rule_engine/message"
|
||||
)
|
||||
|
||||
type transformScriptNode struct {
|
||||
bareNode
|
||||
Script string `json:"script" yaml:"script"`
|
||||
}
|
||||
|
||||
type transformScriptNodeFactory struct{}
|
||||
|
||||
func (f transformScriptNodeFactory) Name() string { return "TransformScriptNode" }
|
||||
func (f transformScriptNodeFactory) Category() string { return NODE_CATEGORY_TRANSFORM }
|
||||
|
||||
func (f transformScriptNodeFactory) Create(id string, meta Metadata) (Node, error) {
|
||||
labels := []string{"Success", "Failure"}
|
||||
node := &transformScriptNode{
|
||||
bareNode: newBareNode(f.Name(), id, meta, labels),
|
||||
}
|
||||
return decodePath(meta, node)
|
||||
}
|
||||
|
||||
func (n *transformScriptNode) Handle(msg message.Message) error {
|
||||
logrus.Infof("%s handle message '%s'", n.Name(), msg.GetType())
|
||||
|
||||
successLabelNode := n.GetLinkedNode("Success")
|
||||
failureLabelNode := n.GetLinkedNode("Failure")
|
||||
|
||||
scriptEngine := NewScriptEngine()
|
||||
newMessage, err := scriptEngine.ScriptOnMessage(msg, n.Script)
|
||||
if err != nil {
|
||||
return failureLabelNode.Handle(msg)
|
||||
}
|
||||
return successLabelNode.Handle(newMessage)
|
||||
}
|
||||
51
pkg/rule_engine/nodes/transform_to_email_node.go
Normal file
51
pkg/rule_engine/nodes/transform_to_email_node.go
Normal file
@@ -0,0 +1,51 @@
|
||||
package nodes
|
||||
|
||||
import (
|
||||
"github.com/sirupsen/logrus"
|
||||
"pandax/pkg/rule_engine/message"
|
||||
)
|
||||
|
||||
type transformToEmailNode struct {
|
||||
bareNode
|
||||
From string `json:"from" yaml:"from"`
|
||||
To string `json:"to" yaml:"to"`
|
||||
Cc string `json:"cc" yaml:"cc"`
|
||||
Bcc string `json:"bcc" yaml:"bcc"`
|
||||
Subject string `json:"subject" yaml:"subject"`
|
||||
Body string `json:"body" yaml:"body"`
|
||||
}
|
||||
|
||||
type transformToEmailNodeFactory struct{}
|
||||
|
||||
func (f transformToEmailNodeFactory) Name() string { return "TransformToEmailNode" }
|
||||
func (f transformToEmailNodeFactory) Category() string { return NODE_CATEGORY_TRANSFORM }
|
||||
|
||||
func (f transformToEmailNodeFactory) Create(id string, meta Metadata) (Node, error) {
|
||||
labels := []string{"Success", "Failure"}
|
||||
|
||||
node := &transformToEmailNode{
|
||||
bareNode: newBareNode(f.Name(), id, meta, labels),
|
||||
}
|
||||
return decodePath(meta, node)
|
||||
}
|
||||
|
||||
func (n *transformToEmailNode) Handle(msg message.Message) error {
|
||||
logrus.Infof("%s handle message '%s'", n.Name(), msg.GetType())
|
||||
|
||||
successLabelNode := n.GetLinkedNode("Success")
|
||||
//failureLabelNode := n.GetLinkedNode("Failure")
|
||||
|
||||
/*dialer := runtime.NewDialer(runtime.EMAIL)
|
||||
variables := map[string]string{
|
||||
"from": n.From,
|
||||
"to": n.To,
|
||||
"cc": n.Cc,
|
||||
"bcc": n.Bcc,
|
||||
"subject": n.Subject,
|
||||
"body": n.Body,
|
||||
}
|
||||
if err := dialer.DialAndSend(msg.GetMetadata(), variables); err != nil {
|
||||
return failureLabelNode.Handle(msg)
|
||||
}*/
|
||||
return successLabelNode.Handle(msg)
|
||||
}
|
||||
Reference in New Issue
Block a user