工作流

This commit is contained in:
PandaGoAdmin
2023-03-31 11:39:11 +08:00
parent b247985bbe
commit f7f9e67c95
16 changed files with 951 additions and 0 deletions

View File

@@ -0,0 +1,93 @@
package flow_engine
import (
"context"
"fmt"
"github.com/sirupsen/logrus"
"pandax/pkg/rule_engine/manifest"
"pandax/pkg/rule_engine/message"
"pandax/pkg/rule_engine/nodes"
"strings"
)
// ruleChainInstance is rulechain's runtime instance that manage all nodes in this chain,
type ruleChainInstance struct {
firstRuleNodeId string
nodes map[string]nodes.Node
}
func NewRuleChainInstance(data []byte) (*ruleChainInstance, []error) {
errors := make([]error, 0)
manifest, err := manifest.New(data)
if err != nil {
errors = append(errors, err)
logrus.WithError(err).Errorf("invalidi manifest file")
return nil, errors
}
return newInstanceWithManifest(manifest)
}
// newWithManifest create rule chain by user's manifest file
func newInstanceWithManifest(m *manifest.Manifest) (*ruleChainInstance, []error) {
errs := make([]error, 0)
r := &ruleChainInstance{
firstRuleNodeId: m.FirstRuleNodeId,
nodes: make(map[string]nodes.Node),
}
// Create All nodes
for _, n := range m.Nodes {
metadata := nodes.NewMetadataWithValues(n.Properties)
node, err := nodes.NewNode(n.Type, n.Id, metadata)
if err != nil {
errs = append(errs, err)
continue
}
if _, found := r.nodes[n.Id]; found {
err := fmt.Errorf("node '%s' already exist in rulechain", n.Id)
errs = append(errs, err)
continue
}
r.nodes[n.Id] = node
}
for _, edge := range m.Edges {
originalNode, found := r.nodes[edge.SourceNodeId]
if !found {
err := fmt.Errorf("original node '%s' no exist in", originalNode.Name())
errs = append(errs, err)
continue
}
targetNode, found := r.nodes[edge.TargetNodeId]
if !found {
err := fmt.Errorf("target node '%s' no exist in rulechain", targetNode.Name())
errs = append(errs, err)
continue
}
//可以有多个类型
split := strings.Split(edge.Properties["type"].(string), "/")
for _, ty := range split {
originalNode.AddLinkedNode(ty, targetNode)
}
}
for name, node := range r.nodes {
targetNodes := node.GetLinkedNodes()
mustLabels := node.MustLabels()
for _, label := range mustLabels {
if _, found := targetNodes[label]; !found {
err := fmt.Errorf("the label '%s' in node '%s' no exist'", label, name)
errs = append(errs, err)
continue
}
}
}
return r, errs
}
// StartRuleChain
func (c *ruleChainInstance) StartRuleChain(context context.Context, message message.Message) error {
if node, found := c.nodes[c.firstRuleNodeId]; found {
go node.Handle(message)
}
return nil
}

View File

@@ -0,0 +1,64 @@
package flow_engine
import (
"io/ioutil"
"pandax/pkg/rule_engine/message"
"pandax/pkg/rule_engine/nodes"
"testing"
)
func TestNewRuleChainInstance(t *testing.T) {
buf, err := ioutil.ReadFile("./manifest/manifest_sample.json")
if err != nil {
t.Error(err)
}
_, errs := NewRuleChainInstance(buf)
if len(errs) > 0 {
t.Error(errs[0])
}
}
func TestScriptEngine(t *testing.T) {
metadata := message.NewDefaultMetadata(map[string]interface{}{"device": "aa"})
msg := message.NewMessageWithDetail("1", message.MessageTypeConnectEvent, map[string]interface{}{"aa": 5}, metadata)
scriptEngine := nodes.NewScriptEngine()
const script = `
function Switch(msg, metadata, msgType) {
function nextRelation(metadata, msg) {
return ['one','nine'];
}
if(metadata.device === 'aa') {
return ['six'];
}
if(msgType === 'Post telemetry') {
return ['two'];
}
return nextRelation(metadata, msg);
}
`
SwitchResults, err := scriptEngine.ScriptOnSwitch(msg, script)
if err != nil {
t.Error(err)
}
t.Log(SwitchResults)
}
func TestScriptOnMessage(t *testing.T) {
metadata := message.NewDefaultMetadata(map[string]interface{}{"device": "aa"})
msg := message.NewMessageWithDetail("1", message.MessageTypeConnectEvent, map[string]interface{}{"aa": 5}, metadata)
scriptEngine := nodes.NewScriptEngine()
const script = `
function Transform(msg, metadata, msgType) {
msg.bb = "33"
metadata.event = 55
return {msg: msg, metadata: metadata, msgType: msgType};
}
`
ScriptOnMessageResults, err := scriptEngine.ScriptOnMessage(msg, script)
if err != nil {
t.Error(err)
}
t.Log(ScriptOnMessageResults.GetMetadata())
}

View File

@@ -0,0 +1,62 @@
package manifest
import (
"encoding/json"
"github.com/sirupsen/logrus"
)
type Node struct {
Type string `json:"type" yaml:"type"`
Id string `json:"id" yaml:"id"`
Properties map[string]interface{} `json:"properties" yaml:"properties"` //debugMode
}
type Edge struct {
SourceNodeId string `json:"sourceNodeId" yaml:"sourceNodeId"`
TargetNodeId string `json:"targetNodeId" yaml:"targetNodeId"`
Type string `json:"type" yaml:"type"` //success or fail
Properties map[string]interface{} `json:"properties" yaml:"properties"` //debugMode
}
type Manifest struct {
FirstRuleNodeId string `json:"firstRuleNodeId" yaml:"firstRuleNodeId"`
Nodes []Node `json:"nodes" yaml:"nodes"`
Edges []Edge `json:"edges" yaml:"edges"`
}
func New(data []byte) (*Manifest, error) {
firstRuleNodeId := ""
manifest := make(map[string]interface{})
if err := json.Unmarshal(data, &manifest); err != nil {
logrus.WithError(err).Errorf("invalid node chain manifest file")
return nil, err
}
nodes := make([]Node, 0)
for _, mn := range manifest["nodes"].([]interface{}) {
node := mn.(map[string]interface{})
if node["type"].(string) == "InputNode" {
firstRuleNodeId = node["id"].(string)
}
nodes = append(nodes, Node{
Id: node["id"].(string),
Type: node["type"].(string),
Properties: node["properties"].(map[string]interface{}),
})
}
edges := make([]Edge, 0)
for _, en := range manifest["edges"].([]interface{}) {
edge := en.(map[string]interface{})
edges = append(edges, Edge{
Type: edge["type"].(string),
Properties: edge["properties"].(map[string]interface{}),
SourceNodeId: edge["sourceNodeId"].(string),
TargetNodeId: edge["targetNodeId"].(string),
})
}
m := &Manifest{
FirstRuleNodeId: firstRuleNodeId,
Nodes: nodes,
Edges: edges,
}
return m, nil
}

View File

@@ -0,0 +1,127 @@
{
"nodes": [
{
"id": "3b6b8df4-445f-4e70-9674-f7aa486b3d81",
"type": "InputNode",
"x": 280,
"y": 280,
"properties": {
"icon": "/src/assets/icon_module/svg/start.svg",
"debugMode": false
},
"zIndex": 1002,
"text": {
"x": 290,
"y": 280,
"value": "输入"
}
},
{
"id": "45afb241-1977-4cbf-8af3-c12844d5666b",
"type": "DelayNode",
"x": 600,
"y": 160,
"properties": {
"icon": "/src/assets/icon_module/svg/function.svg",
"debugMode": false
},
"zIndex": 1004,
"text": {
"x": 610,
"y": 160,
"value": "延迟"
}
},
{
"id": "95047b03-e966-4685-b625-3cea27415706",
"type": "SwitchNode",
"x": 600,
"y": 460,
"properties": {
"icon": "/src/assets/icon_module/svg/switch.svg",
"debugMode": false,
"scripts": "return {\n msg: msg,\n metadata: metadata,\n msgType: msgType\n};"
},
"zIndex": 1006,
"text": {
"x": 610,
"y": 460,
"value": "分流"
}
}
],
"edges": [
{
"id": "fde7f2de-cc0f-467d-a614-4505f058dc2a",
"type": "bezier-link",
"sourceNodeId": "3b6b8df4-445f-4e70-9674-f7aa486b3d81",
"targetNodeId": "45afb241-1977-4cbf-8af3-c12844d5666b",
"startPoint": {
"x": 340,
"y": 280
},
"endPoint": {
"x": 540,
"y": 160
},
"properties": {
"type": "Success"
},
"zIndex": 1007,
"pointsList": [
{
"x": 340,
"y": 280
},
{
"x": 440,
"y": 280
},
{
"x": 440,
"y": 160
},
{
"x": 540,
"y": 160
}
]
},
{
"id": "986878cc-a9be-42b5-afc0-6c0a169c4e4d",
"type": "bezier-link",
"sourceNodeId": "3b6b8df4-445f-4e70-9674-f7aa486b3d81",
"targetNodeId": "95047b03-e966-4685-b625-3cea27415706",
"startPoint": {
"x": 340,
"y": 280
},
"endPoint": {
"x": 540,
"y": 460
},
"properties": {
"type": "Failure"
},
"zIndex": 1008,
"pointsList": [
{
"x": 340,
"y": 280
},
{
"x": 440,
"y": 280
},
{
"x": 440,
"y": 460
},
{
"x": 540,
"y": 460
}
]
}
]
}

View File

@@ -0,0 +1,109 @@
package message
import "github.com/sirupsen/logrus"
// Message ...
type Message interface {
GetOriginator() string
GetType() string
GetCh() chan map[string]bool
GetMetadata() Metadata
SetType(string)
SetOriginator(string)
SetMetadata(Metadata)
}
// Metadata ...
type Metadata interface {
Keys() []string
GetKeyValue(key string) interface{}
SetKeyValue(key string, val interface{})
GetValues() map[string]interface{}
}
// Predefined message types
const (
MessageTypePostAttributesRequest = "Post attributes"
MessageTypePostTelemetryRequest = "Post telemetry"
MessageTypeActivityEvent = "Activity event"
MessageTypeInactivityEvent = "Inactivity event"
MessageTypeConnectEvent = "Connect event"
MessageTypeDisconnectEvent = "Disconnect event"
)
// NewMessage ...
func NewMessage() Message {
return &defaultMessage{
ch: make(chan map[string]bool),
}
}
type defaultMessage struct {
id string //uuid
ts int64 //时间戳
msgType string //消息类型
originator string //数据发布者
ch chan map[string]bool //通道
metadata Metadata //消息的元数据 包括,设备名称,设备类型,命名空间,时间戳等
}
// NewMessageWithDetail ...
func NewMessageWithDetail(originator string, messageType string, msg map[string]interface{}, metadata Metadata) Message {
return &defaultMessage{
originator: originator,
msgType: messageType,
ch: make(chan map[string]bool),
metadata: metadata,
}
}
func (t *defaultMessage) GetOriginator() string { return t.originator }
func (t *defaultMessage) GetType() string { return t.msgType }
func (t *defaultMessage) GetCh() chan map[string]bool { return t.ch }
func (t *defaultMessage) GetMetadata() Metadata { return t.metadata }
func (t *defaultMessage) SetType(msgType string) { t.msgType = msgType }
func (t *defaultMessage) SetOriginator(originator string) { t.originator = originator }
func (t *defaultMessage) SetMetadata(metadata Metadata) { t.metadata = metadata }
// NewMetadata ...
func NewMetadata() Metadata {
return &defaultMetadata{
values: make(map[string]interface{}),
}
}
type defaultMetadata struct {
values map[string]interface{}
}
func NewDefaultMetadata(vals map[string]interface{}) Metadata {
return &defaultMetadata{
values: vals,
}
}
func (t *defaultMetadata) Keys() []string {
keys := make([]string, 0)
for key := range t.values {
keys = append(keys, key)
}
return keys
}
func (t *defaultMetadata) GetKeyValue(key string) interface{} {
if _, found := t.values[key]; !found {
logrus.Fatalf("no key '%s' in metadata", key)
}
return t.values[key]
}
func (t *defaultMetadata) SetKeyValue(key string, val interface{}) {
t.values[key] = val
}
func (t *defaultMetadata) GetValues() map[string]interface{} {
return t.values
}
func (t *defaultMetadata) SetValues(values map[string]interface{}) {
t.values = values
}

View File

@@ -0,0 +1,33 @@
package nodes
import (
"github.com/sirupsen/logrus"
"pandax/pkg/flow_engine/message"
)
type externalDingNode struct {
bareNode
WebHook string `json:"webHook" yaml:"webHook"`
MsgType string `json:"msgType" yaml:"msgType"`
Content string `json:"content" yaml:"content"`
IsAtAll bool `json:"isAtAll" yaml:"isAtAll"`
atMobiles []string `json:"atMobiles" yaml:"atMobiles"`
}
type externalDingNodeFactory struct{}
func (f externalDingNodeFactory) Name() string { return "ExternalDingNode" }
func (f externalDingNodeFactory) Category() string { return NODE_CATEGORY_EXTERNAL }
func (f externalDingNodeFactory) Labels() []string { return []string{"Success", "Failure"} }
func (f externalDingNodeFactory) Create(id string, meta Metadata) (Node, error) {
node := &externalDingNode{
bareNode: newBareNode(f.Name(), id, meta, f.Labels()),
}
return decodePath(meta, node)
}
func (n *externalDingNode) Handle(msg message.Message) error {
logrus.Infof("%s handle message '%s'", n.Name(), msg.GetType())
return nil
}

View File

@@ -0,0 +1,51 @@
package nodes
import (
"github.com/sirupsen/logrus"
"pandax/pkg/flow_engine/message"
)
type externalSendEmailNode struct {
bareNode
From string `json:"from" yaml:"from"`
To string `json:"to" yaml:"to"`
Cc string `json:"cc" yaml:"cc"`
Bcc string `json:"bcc" yaml:"bcc"`
Subject string `json:"subject" yaml:"subject"`
Body string `json:"body" yaml:"body"`
}
type externalSendEmailNodeFactory struct{}
func (f externalSendEmailNodeFactory) Name() string { return "ExternalSendEmailNode" }
func (f externalSendEmailNodeFactory) Category() string { return NODE_CATEGORY_EXTERNAL }
func (f externalSendEmailNodeFactory) Labels() []string { return []string{"Success", "Failure"} }
func (f externalSendEmailNodeFactory) Create(id string, meta Metadata) (Node, error) {
labels := []string{"Success", "Failure"}
node := &externalSendEmailNode{
bareNode: newBareNode(f.Name(), id, meta, labels),
}
return decodePath(meta, node)
}
func (n *externalSendEmailNode) Handle(msg message.Message) error {
logrus.Infof("%s handle message '%s'", n.Name(), msg.GetType())
successLabelNode := n.GetLinkedNode("Success")
//failureLabelNode := n.GetLinkedNode("Failure")
/*dialer := runtime.NewDialer(runtime.EMAIL)
variables := map[string]string{
"from": n.From,
"to": n.To,
"cc": n.Cc,
"bcc": n.Bcc,
"subject": n.Subject,
"body": n.Body,
}
if err := dialer.DialAndSend(msg.GetMetadata(), variables); err != nil {
return failureLabelNode.Handle(msg)
}*/
return successLabelNode.Handle(msg)
}

View File

@@ -0,0 +1,31 @@
package nodes
import (
"github.com/sirupsen/logrus"
"pandax/pkg/flow_engine/message"
)
type externalSendSmsNode struct {
bareNode
}
type externalSendSmsNodeFactory struct{}
func (f externalSendSmsNodeFactory) Name() string { return "ExternalSendSmslNode" }
func (f externalSendSmsNodeFactory) Category() string { return NODE_CATEGORY_EXTERNAL }
func (f externalSendSmsNodeFactory) Labels() []string { return []string{"Success", "Failure"} }
func (f externalSendSmsNodeFactory) Create(id string, meta Metadata) (Node, error) {
node := &externalSendSmsNode{
bareNode: newBareNode(f.Name(), id, meta, f.Labels()),
}
return decodePath(meta, node)
}
func (n *externalSendSmsNode) Handle(msg message.Message) error {
logrus.Infof("%s handle message '%s'", n.Name(), msg.GetType())
successLabelNode := n.GetLinkedNode("Success")
//failureLabelNode := n.GetLinkedNode("Failure")
return successLabelNode.Handle(msg)
}

View File

@@ -0,0 +1,33 @@
package nodes
import (
"github.com/sirupsen/logrus"
"pandax/pkg/flow_engine/message"
)
type externalWechatNode struct {
bareNode
WebHook string `json:"webHook" yaml:"webHook"`
MsgType string `json:"msgType" yaml:"msgType"`
Content string `json:"content" yaml:"content"`
IsAtAll bool `json:"isAtAll" yaml:"isAtAll"`
atMobiles []string `json:"atMobiles" yaml:"atMobiles"`
}
type externalWechatNodeFactory struct{}
func (f externalWechatNodeFactory) Name() string { return "ExternalWechatNode" }
func (f externalWechatNodeFactory) Category() string { return NODE_CATEGORY_EXTERNAL }
func (f externalWechatNodeFactory) Labels() []string { return []string{"Success", "Failure"} }
func (f externalWechatNodeFactory) Create(id string, meta Metadata) (Node, error) {
node := &externalWechatNode{
bareNode: newBareNode(f.Name(), id, meta, f.Labels()),
}
return decodePath(meta, node)
}
func (n *externalWechatNode) Handle(msg message.Message) error {
logrus.Infof("%s handle message '%s'", n.Name(), msg.GetType())
return nil
}

View File

@@ -0,0 +1,51 @@
package nodes
import (
"fmt"
)
const (
NODE_CATEGORY_TRANSFORM = "transform"
NODE_CATEGORY_EXTERNAL = "external"
NODE_CATEGORY_OTHERS = "others"
)
// Factory is node's factory to create node based on metadata
// factory also manage node's metadta description which can be used by other
// service to present node in web
type Factory interface {
Name() string
Category() string
Labels() []string
Create(id string, meta Metadata) (Node, error)
}
var (
// allNodeFactories hold all node's factory
allNodeFactories map[string]Factory = make(map[string]Factory)
// allNodeCategories hold node's metadata by category
allNodeCategories map[string][]map[string]interface{} = make(map[string][]map[string]interface{})
)
// RegisterFactory add a new node factory and classify its category for
// metadata description
func RegisterFactory(f Factory) {
allNodeFactories[f.Name()] = f
if allNodeCategories[f.Category()] == nil {
allNodeCategories[f.Category()] = []map[string]interface{}{}
}
allNodeCategories[f.Category()] = append(allNodeCategories[f.Category()], map[string]interface{}{"name": f.Name(), "labels": f.Labels()})
}
// NewNode is the only way to create a new node
func NewNode(nodeType string, id string, meta Metadata) (Node, error) {
if f, found := allNodeFactories[nodeType]; found {
return f.Create(id, meta)
}
return nil, fmt.Errorf("invalid node type '%s'", nodeType)
}
// GetCategoryNodes return specified category's all nodes
func GetCategoryNodes() map[string][]map[string]interface{} { return allNodeCategories }

View File

@@ -0,0 +1,13 @@
package nodes
// init register all node's factory
func init() {
RegisterFactory(inputNodeFactory{})
RegisterFactory(transformScriptNodeFactory{})
RegisterFactory(externalDingNodeFactory{})
RegisterFactory(externalWechatNodeFactory{})
RegisterFactory(externalSendEmailNodeFactory{})
RegisterFactory(externalSendSmsNodeFactory{})
}

View File

@@ -0,0 +1,34 @@
package nodes
import (
"github.com/sirupsen/logrus"
"pandax/pkg/flow_engine/message"
)
const InputNodeName = "InputNode"
type inputNode struct {
bareNode
}
type inputNodeFactory struct{}
func (f inputNodeFactory) Name() string { return "InputNode" }
func (f inputNodeFactory) Category() string { return NODE_CATEGORY_OTHERS }
func (f inputNodeFactory) Labels() []string { return []string{} }
func (f inputNodeFactory) Create(id string, meta Metadata) (Node, error) {
node := &inputNode{
bareNode: newBareNode(InputNodeName, id, meta, f.Labels()),
}
return node, nil
}
func (n *inputNode) Handle(msg message.Message) error {
logrus.Infof("%s handle message '%s'", n.Name(), msg.GetType())
nodes := n.GetLinkedNodes()
for _, node := range nodes {
return node.Handle(msg)
}
return nil
}

View File

@@ -0,0 +1,63 @@
package nodes
import (
"fmt"
"github.com/mitchellh/mapstructure"
)
const (
NODE_CONFIG_MESSAGE_TYPE_KEY = "messageTypeKey"
NODE_CONFIG_ORIGINATOR_TYPE_KEY = "originatorTypeKey"
)
type Metadata interface {
Keys() []string
With(key string, val interface{}) Metadata
Value(key string) (interface{}, error)
DecodePath(rawVal interface{}) error
}
type nodeMetadata struct {
keypairs map[string]interface{}
}
func NewMetadata() Metadata {
return &nodeMetadata{
keypairs: make(map[string]interface{}),
}
}
func NewMetadataWithString(vals string) Metadata {
return &nodeMetadata{}
}
func NewMetadataWithValues(vals map[string]interface{}) Metadata {
return &nodeMetadata{
keypairs: vals,
}
}
func (c *nodeMetadata) Keys() []string {
keys := []string{}
for key, _ := range c.keypairs {
keys = append(keys, key)
}
return keys
}
func (c *nodeMetadata) Value(key string) (interface{}, error) {
if val, found := c.keypairs[key]; found {
return val, nil
}
return nil, fmt.Errorf("key '%s' not found", key)
}
func (c *nodeMetadata) With(key string, val interface{}) Metadata {
c.keypairs[key] = val
return c
}
func (c *nodeMetadata) DecodePath(rawVal interface{}) error {
//return utils.Map2Struct(c.keypairs, rawVal)
return mapstructure.Decode(c.keypairs, rawVal)
}

View File

@@ -0,0 +1,65 @@
package nodes
import (
"errors"
"pandax/pkg/flow_engine/message"
"github.com/sirupsen/logrus"
)
type Node interface {
Name() string
Id() string
Metadata() Metadata
MustLabels() []string
Handle(message.Message) error
AddLinkedNode(label string, node Node)
GetLinkedNode(label string) Node
GetLinkedNodes() map[string]Node
}
type bareNode struct {
name string
id string
nodes map[string]Node
meta Metadata
labels []string
}
func newBareNode(name string, id string, meta Metadata, labels []string) bareNode {
return bareNode{
name: name,
id: id,
nodes: make(map[string]Node),
meta: meta,
labels: labels,
}
}
func (n *bareNode) Name() string { return n.name }
func (n *bareNode) WithId(id string) { n.id = id }
func (n *bareNode) Id() string { return n.id }
func (n *bareNode) MustLabels() []string { return n.labels }
func (n *bareNode) AddLinkedNode(label string, node Node) { n.nodes[label] = node }
func (n *bareNode) GetLinkedNode(label string) Node {
if node, found := n.nodes[label]; found {
return node
}
logrus.Errorf("no label '%s' in node '%s'", label, n.name)
return nil
}
func (n *bareNode) GetLinkedNodes() map[string]Node { return n.nodes }
func (n *bareNode) Metadata() Metadata { return n.meta }
func (n *bareNode) Handle(message.Message) error { return errors.New("not implemented") }
func decodePath(meta Metadata, n Node) (Node, error) {
if err := meta.DecodePath(n); err != nil {
return n, err
}
return n, nil
}

View File

@@ -0,0 +1,84 @@
package nodes
import (
"github.com/dop251/goja"
"github.com/sirupsen/logrus"
"pandax/pkg/flow_engine/message"
)
type ScriptEngine interface {
ScriptOnMessage(msg message.Message, script string) (message.Message, error)
//used by filter_switch_node
ScriptOnSwitch(msg message.Message, script string) ([]string, error)
//used by filter_script_node
ScriptOnFilter(msg message.Message, script string) (bool, error)
ScriptToString(msg message.Message, script string) (string, error)
}
type baseScriptEngine struct {
}
func NewScriptEngine() ScriptEngine {
return &baseScriptEngine{}
}
func (bse *baseScriptEngine) ScriptOnMessage(msg message.Message, script string) (message.Message, error) {
vm := goja.New()
_, err := vm.RunString(script)
if err != nil {
logrus.Info("JS代码有问题")
return nil, err
}
var fn func(map[string]interface{}, map[string]interface{}, string) map[string]interface{}
err = vm.ExportTo(vm.Get("Transform"), &fn)
if err != nil {
logrus.Info("Js函数映射到 Go 函数失败!")
return nil, err
}
datas := fn(msg.GetMsg(), msg.GetMetadata().GetValues(), msg.GetType())
msg.SetMsg(datas["msg"].(map[string]interface{}))
msg.SetMetadata(message.NewDefaultMetadata(datas["metadata"].(map[string]interface{})))
msg.SetType(datas["msgType"].(string))
return msg, nil
return nil, nil
}
func (bse *baseScriptEngine) ScriptOnSwitch(msg message.Message, script string) ([]string, error) {
vm := goja.New()
_, err := vm.RunString(script)
if err != nil {
logrus.Info("JS代码有问题")
return nil, err
}
var fn func(map[string]interface{}, map[string]interface{}, string) []string
err = vm.ExportTo(vm.Get("Switch"), &fn)
if err != nil {
logrus.Info("Js函数映射到 Go 函数失败!")
return nil, err
}
datas := fn(msg.GetMsg(), msg.GetMetadata().GetValues(), msg.GetType())
return datas, nil
}
func (bse *baseScriptEngine) ScriptOnFilter(msg message.Message, script string) (bool, error) {
vm := goja.New()
_, err := vm.RunString(script)
if err != nil {
logrus.Info("JS代码有问题")
return false, err
}
var fn func(map[string]interface{}, map[string]interface{}, string) bool
err = vm.ExportTo(vm.Get("Filter"), &fn)
if err != nil {
logrus.Info("Js函数映射到 Go 函数失败!")
return false, err
}
datas := fn(msg.GetMsg(), msg.GetMetadata().GetValues(), msg.GetType())
return datas, nil
}
func (bse *baseScriptEngine) ScriptToString(msg message.Message, script string) (string, error) {
return "", nil
}

View File

@@ -0,0 +1,38 @@
package nodes
import (
"github.com/sirupsen/logrus"
"pandax/pkg/flow_engine/message"
)
type transformScriptNode struct {
bareNode
Script string `json:"script" yaml:"script"`
}
type transformScriptNodeFactory struct{}
func (f transformScriptNodeFactory) Name() string { return "TransformScriptNode" }
func (f transformScriptNodeFactory) Category() string { return NODE_CATEGORY_TRANSFORM }
func (f transformScriptNodeFactory) Labels() []string { return []string{"Success", "Failure"} }
func (f transformScriptNodeFactory) Create(id string, meta Metadata) (Node, error) {
labels := []string{"Success", "Failure"}
node := &transformScriptNode{
bareNode: newBareNode(f.Name(), id, meta, labels),
}
return decodePath(meta, node)
}
func (n *transformScriptNode) Handle(msg message.Message) error {
logrus.Infof("%s handle message '%s'", n.Name(), msg.GetType())
successLabelNode := n.GetLinkedNode("Success")
failureLabelNode := n.GetLinkedNode("Failure")
scriptEngine := NewScriptEngine()
newMessage, err := scriptEngine.ScriptOnMessage(msg, n.Script)
if err != nil {
return failureLabelNode.Handle(msg)
}
return successLabelNode.Handle(newMessage)
}