mirror of
https://gitee.com/XM-GO/PandaX.git
synced 2026-04-23 02:48:34 +08:00
规则引擎
This commit is contained in:
@@ -2,6 +2,8 @@ package rule_engine
|
||||
|
||||
import (
|
||||
"io/ioutil"
|
||||
"pandax/pkg/rule_engine/message"
|
||||
"pandax/pkg/rule_engine/nodes"
|
||||
"testing"
|
||||
)
|
||||
|
||||
@@ -15,3 +17,26 @@ func TestNewRuleChainInstance(t *testing.T) {
|
||||
t.Error(errs[0])
|
||||
}
|
||||
}
|
||||
|
||||
func TestScriptEngine(t *testing.T) {
|
||||
metadata := message.NewDefaultMetadata(map[string]interface{}{"device": "aa"})
|
||||
msg := message.NewMessageWithDetail("1", message.MessageTypeConnectEvent, []byte{}, metadata)
|
||||
scriptEngine := nodes.NewScriptEngine()
|
||||
const script = `
|
||||
function Switch(msg, metadata, msgType) {
|
||||
function nextRelation(metadata, msg) {
|
||||
return ['one','nine'];
|
||||
}
|
||||
if(msgType === 'Post telemetry') {
|
||||
return ['two'];
|
||||
}
|
||||
return nextRelation(metadata, msg);
|
||||
}
|
||||
`
|
||||
SwitchResults, err := scriptEngine.ScriptOnSwitch(msg, script)
|
||||
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
t.Log(SwitchResults)
|
||||
}
|
||||
|
||||
@@ -43,21 +43,22 @@ func NewMessage() Message {
|
||||
type defaultMessage struct {
|
||||
id string //uuid
|
||||
ts int64 //时间戳
|
||||
msgType string //消息类型,数据来源
|
||||
msgType string //消息类型, attributes(参数),telemetry(遥测),连接事件
|
||||
originator string //数据发布者
|
||||
customerId string //客户Id UUID
|
||||
entityId string //实体Id UUID
|
||||
data []byte //数据
|
||||
dataType string //数据类型 JSON
|
||||
metadata Metadata //数据的元数据
|
||||
entityType string //实体类型 设备、资产,用户、规则链
|
||||
data []byte //数据 数据结构JSON 设备原始数据
|
||||
metadata Metadata //消息的元数据 包括,设备名称,设备类型,命名空间,时间戳等
|
||||
}
|
||||
|
||||
// NewMessageWithDetail ...
|
||||
func NewMessageWithDetail(originator string, messageType string, msg []byte) Message {
|
||||
func NewMessageWithDetail(originator string, messageType string, msg []byte, metadata Metadata) Message {
|
||||
return &defaultMessage{
|
||||
originator: originator,
|
||||
msgType: messageType,
|
||||
data: msg,
|
||||
metadata: metadata,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -73,7 +74,6 @@ func (t *defaultMessage) SetMetadata(metadata Metadata) { t.metadata = metadat
|
||||
func (t *defaultMessage) MarshalBinary() ([]byte, error) { return nil, nil }
|
||||
func (t *defaultMessage) UnmarshalBinary(b []byte) error { return nil }
|
||||
|
||||
// NewMetadata ...
|
||||
func NewMetadata() Metadata {
|
||||
return &defaultMetadata{
|
||||
values: make(map[string]interface{}),
|
||||
@@ -84,7 +84,7 @@ type defaultMetadata struct {
|
||||
values map[string]interface{}
|
||||
}
|
||||
|
||||
func newDefaultMetadata(vals map[string]interface{}) Metadata {
|
||||
func NewDefaultMetadata(vals map[string]interface{}) Metadata {
|
||||
return &defaultMetadata{
|
||||
values: vals,
|
||||
}
|
||||
|
||||
@@ -35,11 +35,10 @@ type delayNodeFactory struct{}
|
||||
|
||||
func (f delayNodeFactory) Name() string { return DelayNodeName }
|
||||
func (f delayNodeFactory) Category() string { return NODE_CATEGORY_ACTION }
|
||||
|
||||
func (f delayNodeFactory) Labels() []string { return []string{"Success", "Failure"} }
|
||||
func (f delayNodeFactory) Create(id string, meta Metadata) (Node, error) {
|
||||
labels := []string{"Success", "Failure"}
|
||||
node := &delayNode{
|
||||
bareNode: newBareNode(f.Name(), id, meta, labels),
|
||||
bareNode: newBareNode(f.Name(), id, meta, f.Labels()),
|
||||
lock: sync.Mutex{},
|
||||
}
|
||||
_, err := decodePath(meta, node)
|
||||
|
||||
@@ -20,6 +20,7 @@ const (
|
||||
type Factory interface {
|
||||
Name() string
|
||||
Category() string
|
||||
Labels() []string
|
||||
Create(id string, meta Metadata) (Node, error)
|
||||
}
|
||||
|
||||
@@ -28,7 +29,7 @@ var (
|
||||
allNodeFactories map[string]Factory = make(map[string]Factory)
|
||||
|
||||
// allNodeCategories hold node's metadata by category
|
||||
allNodeCategories map[string][]string = make(map[string][]string)
|
||||
allNodeCategories map[string][]map[string]interface{} = make(map[string][]map[string]interface{})
|
||||
)
|
||||
|
||||
// RegisterFactory add a new node factory and classify its category for
|
||||
@@ -37,9 +38,9 @@ func RegisterFactory(f Factory) {
|
||||
allNodeFactories[f.Name()] = f
|
||||
|
||||
if allNodeCategories[f.Category()] == nil {
|
||||
allNodeCategories[f.Category()] = []string{}
|
||||
allNodeCategories[f.Category()] = []map[string]interface{}{}
|
||||
}
|
||||
allNodeCategories[f.Category()] = append(allNodeCategories[f.Category()], f.Name())
|
||||
allNodeCategories[f.Category()] = append(allNodeCategories[f.Category()], map[string]interface{}{"name": f.Name(), "labels": f.Labels()})
|
||||
}
|
||||
|
||||
// NewNode is the only way to create a new node
|
||||
@@ -51,4 +52,4 @@ func NewNode(nodeType string, id string, meta Metadata) (Node, error) {
|
||||
}
|
||||
|
||||
// GetCategoryNodes return specified category's all nodes
|
||||
func GetCategoryNodes() map[string][]string { return allNodeCategories }
|
||||
func GetCategoryNodes() map[string][]map[string]interface{} { return allNodeCategories }
|
||||
|
||||
@@ -14,11 +14,10 @@ type messageTypeSwitchNodeFactory struct{}
|
||||
|
||||
func (f messageTypeSwitchNodeFactory) Name() string { return "MessageTypeSwitchNode" }
|
||||
func (f messageTypeSwitchNodeFactory) Category() string { return NODE_CATEGORY_FILTER }
|
||||
|
||||
func (f messageTypeSwitchNodeFactory) Labels() []string { return []string{"True", "False"} }
|
||||
func (f messageTypeSwitchNodeFactory) Create(id string, meta Metadata) (Node, error) {
|
||||
labels := []string{"True", "False"}
|
||||
node := &messageTypeSwitchNode{
|
||||
bareNode: newBareNode(f.Name(), id, meta, labels),
|
||||
bareNode: newBareNode(f.Name(), id, meta, f.Labels()),
|
||||
}
|
||||
return decodePath(meta, node)
|
||||
}
|
||||
|
||||
@@ -25,11 +25,15 @@ type switchFilterNodeFactory struct{}
|
||||
|
||||
func (f switchFilterNodeFactory) Name() string { return "SwitchNode" }
|
||||
func (f switchFilterNodeFactory) Category() string { return NODE_CATEGORY_FILTER }
|
||||
|
||||
func (f switchFilterNodeFactory) Labels() []string {
|
||||
return []string{
|
||||
"True", "False", message.MessageTypePostTelemetryRequest,
|
||||
message.MessageTypeConnectEvent,
|
||||
}
|
||||
}
|
||||
func (f switchFilterNodeFactory) Create(id string, meta Metadata) (Node, error) {
|
||||
labels := []string{}
|
||||
node := &switchFilterNode{
|
||||
bareNode: newBareNode(f.Name(), id, meta, labels),
|
||||
bareNode: newBareNode(f.Name(), id, meta, f.Labels()),
|
||||
}
|
||||
return decodePath(meta, node)
|
||||
}
|
||||
@@ -40,7 +44,7 @@ func (n *switchFilterNode) Handle(msg message.Message) error {
|
||||
scriptEngine := NewScriptEngine()
|
||||
SwitchResults, err := scriptEngine.ScriptOnSwitch(msg, n.Scripts)
|
||||
if err != nil {
|
||||
return nil
|
||||
return err
|
||||
}
|
||||
nodes := n.GetLinkedNodes()
|
||||
for label, node := range nodes {
|
||||
|
||||
@@ -15,11 +15,10 @@ type inputNodeFactory struct{}
|
||||
|
||||
func (f inputNodeFactory) Name() string { return "InputNode" }
|
||||
func (f inputNodeFactory) Category() string { return NODE_CATEGORY_OTHERS }
|
||||
|
||||
func (f inputNodeFactory) Labels() []string { return []string{} }
|
||||
func (f inputNodeFactory) Create(id string, meta Metadata) (Node, error) {
|
||||
labels := []string{}
|
||||
node := &inputNode{
|
||||
bareNode: newBareNode(InputNodeName, id, meta, labels),
|
||||
bareNode: newBareNode(InputNodeName, id, meta, f.Labels()),
|
||||
}
|
||||
return node, nil
|
||||
}
|
||||
|
||||
@@ -1,6 +1,10 @@
|
||||
package nodes
|
||||
|
||||
import "pandax/pkg/rule_engine/message"
|
||||
import (
|
||||
"github.com/dop251/goja"
|
||||
"github.com/sirupsen/logrus"
|
||||
"pandax/pkg/rule_engine/message"
|
||||
)
|
||||
|
||||
type ScriptEngine interface {
|
||||
ScriptOnMessage(msg message.Message, script string) (message.Message, error)
|
||||
@@ -24,8 +28,20 @@ func (bse *baseScriptEngine) ScriptOnMessage(msg message.Message, script string)
|
||||
}
|
||||
|
||||
func (bse *baseScriptEngine) ScriptOnSwitch(msg message.Message, script string) ([]string, error) {
|
||||
|
||||
return nil, nil
|
||||
vm := goja.New()
|
||||
_, err := vm.RunString(script)
|
||||
if err != nil {
|
||||
logrus.Info("JS代码有问题")
|
||||
return nil, err
|
||||
}
|
||||
var fn func(message.Message, message.Metadata, string) []string
|
||||
err = vm.ExportTo(vm.Get("Switch"), &fn)
|
||||
if err != nil {
|
||||
logrus.Info("Js函数映射到 Go 函数失败!")
|
||||
return nil, err
|
||||
}
|
||||
datas := fn(msg, msg.GetMetadata(), msg.GetType())
|
||||
return datas, nil
|
||||
}
|
||||
|
||||
func (bse *baseScriptEngine) ScriptOnFilter(msg message.Message, script string) (bool, error) {
|
||||
|
||||
Reference in New Issue
Block a user