diff --git a/pkg/flow_engine/instance.go b/pkg/flow_engine/instance.go new file mode 100644 index 0000000..72de323 --- /dev/null +++ b/pkg/flow_engine/instance.go @@ -0,0 +1,93 @@ +package flow_engine + +import ( + "context" + "fmt" + "github.com/sirupsen/logrus" + "pandax/pkg/rule_engine/manifest" + "pandax/pkg/rule_engine/message" + "pandax/pkg/rule_engine/nodes" + "strings" +) + +// ruleChainInstance is rulechain's runtime instance that manage all nodes in this chain, +type ruleChainInstance struct { + firstRuleNodeId string + nodes map[string]nodes.Node +} + +func NewRuleChainInstance(data []byte) (*ruleChainInstance, []error) { + errors := make([]error, 0) + + manifest, err := manifest.New(data) + if err != nil { + errors = append(errors, err) + logrus.WithError(err).Errorf("invalidi manifest file") + return nil, errors + } + return newInstanceWithManifest(manifest) +} + +// newWithManifest create rule chain by user's manifest file +func newInstanceWithManifest(m *manifest.Manifest) (*ruleChainInstance, []error) { + errs := make([]error, 0) + r := &ruleChainInstance{ + firstRuleNodeId: m.FirstRuleNodeId, + nodes: make(map[string]nodes.Node), + } + // Create All nodes + for _, n := range m.Nodes { + metadata := nodes.NewMetadataWithValues(n.Properties) + node, err := nodes.NewNode(n.Type, n.Id, metadata) + if err != nil { + errs = append(errs, err) + continue + } + if _, found := r.nodes[n.Id]; found { + err := fmt.Errorf("node '%s' already exist in rulechain", n.Id) + errs = append(errs, err) + continue + } + r.nodes[n.Id] = node + } + for _, edge := range m.Edges { + originalNode, found := r.nodes[edge.SourceNodeId] + if !found { + err := fmt.Errorf("original node '%s' no exist in", originalNode.Name()) + errs = append(errs, err) + continue + } + targetNode, found := r.nodes[edge.TargetNodeId] + if !found { + err := fmt.Errorf("target node '%s' no exist in rulechain", targetNode.Name()) + errs = append(errs, err) + continue + } + //可以有多个类型 + split := strings.Split(edge.Properties["type"].(string), "/") + for _, ty := range split { + originalNode.AddLinkedNode(ty, targetNode) + } + } + for name, node := range r.nodes { + targetNodes := node.GetLinkedNodes() + mustLabels := node.MustLabels() + for _, label := range mustLabels { + if _, found := targetNodes[label]; !found { + err := fmt.Errorf("the label '%s' in node '%s' no exist'", label, name) + errs = append(errs, err) + continue + } + } + } + + return r, errs +} + +// StartRuleChain +func (c *ruleChainInstance) StartRuleChain(context context.Context, message message.Message) error { + if node, found := c.nodes[c.firstRuleNodeId]; found { + go node.Handle(message) + } + return nil +} diff --git a/pkg/flow_engine/instance_test.go b/pkg/flow_engine/instance_test.go new file mode 100644 index 0000000..dfee433 --- /dev/null +++ b/pkg/flow_engine/instance_test.go @@ -0,0 +1,64 @@ +package flow_engine + +import ( + "io/ioutil" + "pandax/pkg/rule_engine/message" + "pandax/pkg/rule_engine/nodes" + "testing" +) + +func TestNewRuleChainInstance(t *testing.T) { + buf, err := ioutil.ReadFile("./manifest/manifest_sample.json") + if err != nil { + t.Error(err) + } + _, errs := NewRuleChainInstance(buf) + if len(errs) > 0 { + t.Error(errs[0]) + } +} + +func TestScriptEngine(t *testing.T) { + metadata := message.NewDefaultMetadata(map[string]interface{}{"device": "aa"}) + msg := message.NewMessageWithDetail("1", message.MessageTypeConnectEvent, map[string]interface{}{"aa": 5}, metadata) + scriptEngine := nodes.NewScriptEngine() + const script = ` + function Switch(msg, metadata, msgType) { + function nextRelation(metadata, msg) { + return ['one','nine']; + } + if(metadata.device === 'aa') { + return ['six']; + } + if(msgType === 'Post telemetry') { + return ['two']; + } + return nextRelation(metadata, msg); + } + ` + SwitchResults, err := scriptEngine.ScriptOnSwitch(msg, script) + + if err != nil { + t.Error(err) + } + t.Log(SwitchResults) +} + +func TestScriptOnMessage(t *testing.T) { + metadata := message.NewDefaultMetadata(map[string]interface{}{"device": "aa"}) + msg := message.NewMessageWithDetail("1", message.MessageTypeConnectEvent, map[string]interface{}{"aa": 5}, metadata) + scriptEngine := nodes.NewScriptEngine() + const script = ` + function Transform(msg, metadata, msgType) { + msg.bb = "33" + metadata.event = 55 + return {msg: msg, metadata: metadata, msgType: msgType}; + } + ` + ScriptOnMessageResults, err := scriptEngine.ScriptOnMessage(msg, script) + + if err != nil { + t.Error(err) + } + t.Log(ScriptOnMessageResults.GetMetadata()) +} diff --git a/pkg/flow_engine/manifest/manifest.go b/pkg/flow_engine/manifest/manifest.go new file mode 100644 index 0000000..21b9349 --- /dev/null +++ b/pkg/flow_engine/manifest/manifest.go @@ -0,0 +1,62 @@ +package manifest + +import ( + "encoding/json" + "github.com/sirupsen/logrus" +) + +type Node struct { + Type string `json:"type" yaml:"type"` + Id string `json:"id" yaml:"id"` + Properties map[string]interface{} `json:"properties" yaml:"properties"` //debugMode +} + +type Edge struct { + SourceNodeId string `json:"sourceNodeId" yaml:"sourceNodeId"` + TargetNodeId string `json:"targetNodeId" yaml:"targetNodeId"` + Type string `json:"type" yaml:"type"` //success or fail + Properties map[string]interface{} `json:"properties" yaml:"properties"` //debugMode +} + +type Manifest struct { + FirstRuleNodeId string `json:"firstRuleNodeId" yaml:"firstRuleNodeId"` + Nodes []Node `json:"nodes" yaml:"nodes"` + Edges []Edge `json:"edges" yaml:"edges"` +} + +func New(data []byte) (*Manifest, error) { + firstRuleNodeId := "" + manifest := make(map[string]interface{}) + if err := json.Unmarshal(data, &manifest); err != nil { + logrus.WithError(err).Errorf("invalid node chain manifest file") + return nil, err + } + nodes := make([]Node, 0) + for _, mn := range manifest["nodes"].([]interface{}) { + node := mn.(map[string]interface{}) + if node["type"].(string) == "InputNode" { + firstRuleNodeId = node["id"].(string) + } + nodes = append(nodes, Node{ + Id: node["id"].(string), + Type: node["type"].(string), + Properties: node["properties"].(map[string]interface{}), + }) + } + edges := make([]Edge, 0) + for _, en := range manifest["edges"].([]interface{}) { + edge := en.(map[string]interface{}) + edges = append(edges, Edge{ + Type: edge["type"].(string), + Properties: edge["properties"].(map[string]interface{}), + SourceNodeId: edge["sourceNodeId"].(string), + TargetNodeId: edge["targetNodeId"].(string), + }) + } + m := &Manifest{ + FirstRuleNodeId: firstRuleNodeId, + Nodes: nodes, + Edges: edges, + } + return m, nil +} diff --git a/pkg/flow_engine/manifest/manifest_sample.json b/pkg/flow_engine/manifest/manifest_sample.json new file mode 100644 index 0000000..b80e944 --- /dev/null +++ b/pkg/flow_engine/manifest/manifest_sample.json @@ -0,0 +1,127 @@ +{ + "nodes": [ + { + "id": "3b6b8df4-445f-4e70-9674-f7aa486b3d81", + "type": "InputNode", + "x": 280, + "y": 280, + "properties": { + "icon": "/src/assets/icon_module/svg/start.svg", + "debugMode": false + }, + "zIndex": 1002, + "text": { + "x": 290, + "y": 280, + "value": "输入" + } + }, + { + "id": "45afb241-1977-4cbf-8af3-c12844d5666b", + "type": "DelayNode", + "x": 600, + "y": 160, + "properties": { + "icon": "/src/assets/icon_module/svg/function.svg", + "debugMode": false + }, + "zIndex": 1004, + "text": { + "x": 610, + "y": 160, + "value": "延迟" + } + }, + { + "id": "95047b03-e966-4685-b625-3cea27415706", + "type": "SwitchNode", + "x": 600, + "y": 460, + "properties": { + "icon": "/src/assets/icon_module/svg/switch.svg", + "debugMode": false, + "scripts": "return {\n msg: msg,\n metadata: metadata,\n msgType: msgType\n};" + }, + "zIndex": 1006, + "text": { + "x": 610, + "y": 460, + "value": "分流" + } + } + ], + "edges": [ + { + "id": "fde7f2de-cc0f-467d-a614-4505f058dc2a", + "type": "bezier-link", + "sourceNodeId": "3b6b8df4-445f-4e70-9674-f7aa486b3d81", + "targetNodeId": "45afb241-1977-4cbf-8af3-c12844d5666b", + "startPoint": { + "x": 340, + "y": 280 + }, + "endPoint": { + "x": 540, + "y": 160 + }, + "properties": { + "type": "Success" + }, + "zIndex": 1007, + "pointsList": [ + { + "x": 340, + "y": 280 + }, + { + "x": 440, + "y": 280 + }, + { + "x": 440, + "y": 160 + }, + { + "x": 540, + "y": 160 + } + ] + }, + { + "id": "986878cc-a9be-42b5-afc0-6c0a169c4e4d", + "type": "bezier-link", + "sourceNodeId": "3b6b8df4-445f-4e70-9674-f7aa486b3d81", + "targetNodeId": "95047b03-e966-4685-b625-3cea27415706", + "startPoint": { + "x": 340, + "y": 280 + }, + "endPoint": { + "x": 540, + "y": 460 + }, + "properties": { + "type": "Failure" + }, + "zIndex": 1008, + "pointsList": [ + { + "x": 340, + "y": 280 + }, + { + "x": 440, + "y": 280 + }, + { + "x": 440, + "y": 460 + }, + { + "x": 540, + "y": 460 + } + ] + } + ] +} \ No newline at end of file diff --git a/pkg/flow_engine/message/message.go b/pkg/flow_engine/message/message.go new file mode 100644 index 0000000..3d08166 --- /dev/null +++ b/pkg/flow_engine/message/message.go @@ -0,0 +1,109 @@ +package message + +import "github.com/sirupsen/logrus" + +// Message ... +type Message interface { + GetOriginator() string + GetType() string + GetCh() chan map[string]bool + GetMetadata() Metadata + SetType(string) + SetOriginator(string) + SetMetadata(Metadata) +} + +// Metadata ... +type Metadata interface { + Keys() []string + GetKeyValue(key string) interface{} + SetKeyValue(key string, val interface{}) + GetValues() map[string]interface{} +} + +// Predefined message types +const ( + MessageTypePostAttributesRequest = "Post attributes" + MessageTypePostTelemetryRequest = "Post telemetry" + MessageTypeActivityEvent = "Activity event" + MessageTypeInactivityEvent = "Inactivity event" + MessageTypeConnectEvent = "Connect event" + MessageTypeDisconnectEvent = "Disconnect event" +) + +// NewMessage ... +func NewMessage() Message { + return &defaultMessage{ + ch: make(chan map[string]bool), + } +} + +type defaultMessage struct { + id string //uuid + ts int64 //时间戳 + msgType string //消息类型 + originator string //数据发布者 + ch chan map[string]bool //通道 + metadata Metadata //消息的元数据 包括,设备名称,设备类型,命名空间,时间戳等 +} + +// NewMessageWithDetail ... +func NewMessageWithDetail(originator string, messageType string, msg map[string]interface{}, metadata Metadata) Message { + return &defaultMessage{ + originator: originator, + msgType: messageType, + ch: make(chan map[string]bool), + metadata: metadata, + } +} + +func (t *defaultMessage) GetOriginator() string { return t.originator } +func (t *defaultMessage) GetType() string { return t.msgType } +func (t *defaultMessage) GetCh() chan map[string]bool { return t.ch } +func (t *defaultMessage) GetMetadata() Metadata { return t.metadata } +func (t *defaultMessage) SetType(msgType string) { t.msgType = msgType } +func (t *defaultMessage) SetOriginator(originator string) { t.originator = originator } +func (t *defaultMessage) SetMetadata(metadata Metadata) { t.metadata = metadata } + +// NewMetadata ... +func NewMetadata() Metadata { + return &defaultMetadata{ + values: make(map[string]interface{}), + } +} + +type defaultMetadata struct { + values map[string]interface{} +} + +func NewDefaultMetadata(vals map[string]interface{}) Metadata { + return &defaultMetadata{ + values: vals, + } +} + +func (t *defaultMetadata) Keys() []string { + keys := make([]string, 0) + for key := range t.values { + keys = append(keys, key) + } + return keys +} + +func (t *defaultMetadata) GetKeyValue(key string) interface{} { + if _, found := t.values[key]; !found { + logrus.Fatalf("no key '%s' in metadata", key) + } + return t.values[key] +} + +func (t *defaultMetadata) SetKeyValue(key string, val interface{}) { + t.values[key] = val +} + +func (t *defaultMetadata) GetValues() map[string]interface{} { + return t.values +} +func (t *defaultMetadata) SetValues(values map[string]interface{}) { + t.values = values +} diff --git a/pkg/flow_engine/nodes/external_ding_node.go b/pkg/flow_engine/nodes/external_ding_node.go new file mode 100644 index 0000000..0577992 --- /dev/null +++ b/pkg/flow_engine/nodes/external_ding_node.go @@ -0,0 +1,33 @@ +package nodes + +import ( + "github.com/sirupsen/logrus" + "pandax/pkg/flow_engine/message" +) + +type externalDingNode struct { + bareNode + WebHook string `json:"webHook" yaml:"webHook"` + MsgType string `json:"msgType" yaml:"msgType"` + Content string `json:"content" yaml:"content"` + IsAtAll bool `json:"isAtAll" yaml:"isAtAll"` + atMobiles []string `json:"atMobiles" yaml:"atMobiles"` +} + +type externalDingNodeFactory struct{} + +func (f externalDingNodeFactory) Name() string { return "ExternalDingNode" } +func (f externalDingNodeFactory) Category() string { return NODE_CATEGORY_EXTERNAL } +func (f externalDingNodeFactory) Labels() []string { return []string{"Success", "Failure"} } +func (f externalDingNodeFactory) Create(id string, meta Metadata) (Node, error) { + node := &externalDingNode{ + bareNode: newBareNode(f.Name(), id, meta, f.Labels()), + } + return decodePath(meta, node) +} + +func (n *externalDingNode) Handle(msg message.Message) error { + logrus.Infof("%s handle message '%s'", n.Name(), msg.GetType()) + + return nil +} diff --git a/pkg/flow_engine/nodes/external_send_email_node.go b/pkg/flow_engine/nodes/external_send_email_node.go new file mode 100644 index 0000000..8e23ee9 --- /dev/null +++ b/pkg/flow_engine/nodes/external_send_email_node.go @@ -0,0 +1,51 @@ +package nodes + +import ( + "github.com/sirupsen/logrus" + "pandax/pkg/flow_engine/message" +) + +type externalSendEmailNode struct { + bareNode + From string `json:"from" yaml:"from"` + To string `json:"to" yaml:"to"` + Cc string `json:"cc" yaml:"cc"` + Bcc string `json:"bcc" yaml:"bcc"` + Subject string `json:"subject" yaml:"subject"` + Body string `json:"body" yaml:"body"` +} + +type externalSendEmailNodeFactory struct{} + +func (f externalSendEmailNodeFactory) Name() string { return "ExternalSendEmailNode" } +func (f externalSendEmailNodeFactory) Category() string { return NODE_CATEGORY_EXTERNAL } +func (f externalSendEmailNodeFactory) Labels() []string { return []string{"Success", "Failure"} } +func (f externalSendEmailNodeFactory) Create(id string, meta Metadata) (Node, error) { + labels := []string{"Success", "Failure"} + + node := &externalSendEmailNode{ + bareNode: newBareNode(f.Name(), id, meta, labels), + } + return decodePath(meta, node) +} + +func (n *externalSendEmailNode) Handle(msg message.Message) error { + logrus.Infof("%s handle message '%s'", n.Name(), msg.GetType()) + + successLabelNode := n.GetLinkedNode("Success") + //failureLabelNode := n.GetLinkedNode("Failure") + + /*dialer := runtime.NewDialer(runtime.EMAIL) + variables := map[string]string{ + "from": n.From, + "to": n.To, + "cc": n.Cc, + "bcc": n.Bcc, + "subject": n.Subject, + "body": n.Body, + } + if err := dialer.DialAndSend(msg.GetMetadata(), variables); err != nil { + return failureLabelNode.Handle(msg) + }*/ + return successLabelNode.Handle(msg) +} diff --git a/pkg/flow_engine/nodes/external_send_sms_node.go b/pkg/flow_engine/nodes/external_send_sms_node.go new file mode 100644 index 0000000..b6836ae --- /dev/null +++ b/pkg/flow_engine/nodes/external_send_sms_node.go @@ -0,0 +1,31 @@ +package nodes + +import ( + "github.com/sirupsen/logrus" + "pandax/pkg/flow_engine/message" +) + +type externalSendSmsNode struct { + bareNode +} + +type externalSendSmsNodeFactory struct{} + +func (f externalSendSmsNodeFactory) Name() string { return "ExternalSendSmslNode" } +func (f externalSendSmsNodeFactory) Category() string { return NODE_CATEGORY_EXTERNAL } +func (f externalSendSmsNodeFactory) Labels() []string { return []string{"Success", "Failure"} } +func (f externalSendSmsNodeFactory) Create(id string, meta Metadata) (Node, error) { + node := &externalSendSmsNode{ + bareNode: newBareNode(f.Name(), id, meta, f.Labels()), + } + return decodePath(meta, node) +} + +func (n *externalSendSmsNode) Handle(msg message.Message) error { + logrus.Infof("%s handle message '%s'", n.Name(), msg.GetType()) + + successLabelNode := n.GetLinkedNode("Success") + //failureLabelNode := n.GetLinkedNode("Failure") + + return successLabelNode.Handle(msg) +} diff --git a/pkg/flow_engine/nodes/external_wechat_node.go b/pkg/flow_engine/nodes/external_wechat_node.go new file mode 100644 index 0000000..2d78c8c --- /dev/null +++ b/pkg/flow_engine/nodes/external_wechat_node.go @@ -0,0 +1,33 @@ +package nodes + +import ( + "github.com/sirupsen/logrus" + "pandax/pkg/flow_engine/message" +) + +type externalWechatNode struct { + bareNode + WebHook string `json:"webHook" yaml:"webHook"` + MsgType string `json:"msgType" yaml:"msgType"` + Content string `json:"content" yaml:"content"` + IsAtAll bool `json:"isAtAll" yaml:"isAtAll"` + atMobiles []string `json:"atMobiles" yaml:"atMobiles"` +} + +type externalWechatNodeFactory struct{} + +func (f externalWechatNodeFactory) Name() string { return "ExternalWechatNode" } +func (f externalWechatNodeFactory) Category() string { return NODE_CATEGORY_EXTERNAL } +func (f externalWechatNodeFactory) Labels() []string { return []string{"Success", "Failure"} } +func (f externalWechatNodeFactory) Create(id string, meta Metadata) (Node, error) { + node := &externalWechatNode{ + bareNode: newBareNode(f.Name(), id, meta, f.Labels()), + } + return decodePath(meta, node) +} + +func (n *externalWechatNode) Handle(msg message.Message) error { + logrus.Infof("%s handle message '%s'", n.Name(), msg.GetType()) + + return nil +} diff --git a/pkg/flow_engine/nodes/factory.go b/pkg/flow_engine/nodes/factory.go new file mode 100644 index 0000000..3414c1b --- /dev/null +++ b/pkg/flow_engine/nodes/factory.go @@ -0,0 +1,51 @@ +package nodes + +import ( + "fmt" +) + +const ( + NODE_CATEGORY_TRANSFORM = "transform" + NODE_CATEGORY_EXTERNAL = "external" + NODE_CATEGORY_OTHERS = "others" +) + +// Factory is node's factory to create node based on metadata +// factory also manage node's metadta description which can be used by other +// service to present node in web +type Factory interface { + Name() string + Category() string + Labels() []string + Create(id string, meta Metadata) (Node, error) +} + +var ( + // allNodeFactories hold all node's factory + allNodeFactories map[string]Factory = make(map[string]Factory) + + // allNodeCategories hold node's metadata by category + allNodeCategories map[string][]map[string]interface{} = make(map[string][]map[string]interface{}) +) + +// RegisterFactory add a new node factory and classify its category for +// metadata description +func RegisterFactory(f Factory) { + allNodeFactories[f.Name()] = f + + if allNodeCategories[f.Category()] == nil { + allNodeCategories[f.Category()] = []map[string]interface{}{} + } + allNodeCategories[f.Category()] = append(allNodeCategories[f.Category()], map[string]interface{}{"name": f.Name(), "labels": f.Labels()}) +} + +// NewNode is the only way to create a new node +func NewNode(nodeType string, id string, meta Metadata) (Node, error) { + if f, found := allNodeFactories[nodeType]; found { + return f.Create(id, meta) + } + return nil, fmt.Errorf("invalid node type '%s'", nodeType) +} + +// GetCategoryNodes return specified category's all nodes +func GetCategoryNodes() map[string][]map[string]interface{} { return allNodeCategories } diff --git a/pkg/flow_engine/nodes/init.go b/pkg/flow_engine/nodes/init.go new file mode 100644 index 0000000..a18d394 --- /dev/null +++ b/pkg/flow_engine/nodes/init.go @@ -0,0 +1,13 @@ +package nodes + +// init register all node's factory +func init() { + RegisterFactory(inputNodeFactory{}) + RegisterFactory(transformScriptNodeFactory{}) + + RegisterFactory(externalDingNodeFactory{}) + RegisterFactory(externalWechatNodeFactory{}) + RegisterFactory(externalSendEmailNodeFactory{}) + RegisterFactory(externalSendSmsNodeFactory{}) + +} diff --git a/pkg/flow_engine/nodes/input_node.go b/pkg/flow_engine/nodes/input_node.go new file mode 100644 index 0000000..eddd306 --- /dev/null +++ b/pkg/flow_engine/nodes/input_node.go @@ -0,0 +1,34 @@ +package nodes + +import ( + "github.com/sirupsen/logrus" + "pandax/pkg/flow_engine/message" +) + +const InputNodeName = "InputNode" + +type inputNode struct { + bareNode +} + +type inputNodeFactory struct{} + +func (f inputNodeFactory) Name() string { return "InputNode" } +func (f inputNodeFactory) Category() string { return NODE_CATEGORY_OTHERS } +func (f inputNodeFactory) Labels() []string { return []string{} } +func (f inputNodeFactory) Create(id string, meta Metadata) (Node, error) { + node := &inputNode{ + bareNode: newBareNode(InputNodeName, id, meta, f.Labels()), + } + return node, nil +} + +func (n *inputNode) Handle(msg message.Message) error { + logrus.Infof("%s handle message '%s'", n.Name(), msg.GetType()) + + nodes := n.GetLinkedNodes() + for _, node := range nodes { + return node.Handle(msg) + } + return nil +} diff --git a/pkg/flow_engine/nodes/metadata.go b/pkg/flow_engine/nodes/metadata.go new file mode 100644 index 0000000..84c38c6 --- /dev/null +++ b/pkg/flow_engine/nodes/metadata.go @@ -0,0 +1,63 @@ +package nodes + +import ( + "fmt" + "github.com/mitchellh/mapstructure" +) + +const ( + NODE_CONFIG_MESSAGE_TYPE_KEY = "messageTypeKey" + NODE_CONFIG_ORIGINATOR_TYPE_KEY = "originatorTypeKey" +) + +type Metadata interface { + Keys() []string + With(key string, val interface{}) Metadata + Value(key string) (interface{}, error) + DecodePath(rawVal interface{}) error +} + +type nodeMetadata struct { + keypairs map[string]interface{} +} + +func NewMetadata() Metadata { + return &nodeMetadata{ + keypairs: make(map[string]interface{}), + } +} + +func NewMetadataWithString(vals string) Metadata { + return &nodeMetadata{} +} + +func NewMetadataWithValues(vals map[string]interface{}) Metadata { + return &nodeMetadata{ + keypairs: vals, + } +} + +func (c *nodeMetadata) Keys() []string { + keys := []string{} + for key, _ := range c.keypairs { + keys = append(keys, key) + } + return keys +} + +func (c *nodeMetadata) Value(key string) (interface{}, error) { + if val, found := c.keypairs[key]; found { + return val, nil + } + return nil, fmt.Errorf("key '%s' not found", key) +} + +func (c *nodeMetadata) With(key string, val interface{}) Metadata { + c.keypairs[key] = val + return c +} + +func (c *nodeMetadata) DecodePath(rawVal interface{}) error { + //return utils.Map2Struct(c.keypairs, rawVal) + return mapstructure.Decode(c.keypairs, rawVal) +} diff --git a/pkg/flow_engine/nodes/node.go b/pkg/flow_engine/nodes/node.go new file mode 100644 index 0000000..a6f95f4 --- /dev/null +++ b/pkg/flow_engine/nodes/node.go @@ -0,0 +1,65 @@ +package nodes + +import ( + "errors" + "pandax/pkg/flow_engine/message" + + "github.com/sirupsen/logrus" +) + +type Node interface { + Name() string + Id() string + Metadata() Metadata + MustLabels() []string + Handle(message.Message) error + + AddLinkedNode(label string, node Node) + GetLinkedNode(label string) Node + GetLinkedNodes() map[string]Node +} + +type bareNode struct { + name string + id string + nodes map[string]Node + meta Metadata + labels []string +} + +func newBareNode(name string, id string, meta Metadata, labels []string) bareNode { + return bareNode{ + name: name, + id: id, + nodes: make(map[string]Node), + meta: meta, + labels: labels, + } +} + +func (n *bareNode) Name() string { return n.name } +func (n *bareNode) WithId(id string) { n.id = id } +func (n *bareNode) Id() string { return n.id } +func (n *bareNode) MustLabels() []string { return n.labels } +func (n *bareNode) AddLinkedNode(label string, node Node) { n.nodes[label] = node } + +func (n *bareNode) GetLinkedNode(label string) Node { + if node, found := n.nodes[label]; found { + return node + } + logrus.Errorf("no label '%s' in node '%s'", label, n.name) + return nil +} + +func (n *bareNode) GetLinkedNodes() map[string]Node { return n.nodes } + +func (n *bareNode) Metadata() Metadata { return n.meta } + +func (n *bareNode) Handle(message.Message) error { return errors.New("not implemented") } + +func decodePath(meta Metadata, n Node) (Node, error) { + if err := meta.DecodePath(n); err != nil { + return n, err + } + return n, nil +} diff --git a/pkg/flow_engine/nodes/script_engine.go b/pkg/flow_engine/nodes/script_engine.go new file mode 100644 index 0000000..b1f6af3 --- /dev/null +++ b/pkg/flow_engine/nodes/script_engine.go @@ -0,0 +1,84 @@ +package nodes + +import ( + "github.com/dop251/goja" + "github.com/sirupsen/logrus" + "pandax/pkg/flow_engine/message" +) + +type ScriptEngine interface { + ScriptOnMessage(msg message.Message, script string) (message.Message, error) + //used by filter_switch_node + ScriptOnSwitch(msg message.Message, script string) ([]string, error) + //used by filter_script_node + ScriptOnFilter(msg message.Message, script string) (bool, error) + ScriptToString(msg message.Message, script string) (string, error) +} + +type baseScriptEngine struct { +} + +func NewScriptEngine() ScriptEngine { + return &baseScriptEngine{} +} + +func (bse *baseScriptEngine) ScriptOnMessage(msg message.Message, script string) (message.Message, error) { + vm := goja.New() + _, err := vm.RunString(script) + if err != nil { + logrus.Info("JS代码有问题") + return nil, err + } + var fn func(map[string]interface{}, map[string]interface{}, string) map[string]interface{} + err = vm.ExportTo(vm.Get("Transform"), &fn) + if err != nil { + logrus.Info("Js函数映射到 Go 函数失败!") + return nil, err + } + datas := fn(msg.GetMsg(), msg.GetMetadata().GetValues(), msg.GetType()) + msg.SetMsg(datas["msg"].(map[string]interface{})) + msg.SetMetadata(message.NewDefaultMetadata(datas["metadata"].(map[string]interface{}))) + msg.SetType(datas["msgType"].(string)) + return msg, nil + + return nil, nil +} + +func (bse *baseScriptEngine) ScriptOnSwitch(msg message.Message, script string) ([]string, error) { + vm := goja.New() + _, err := vm.RunString(script) + if err != nil { + logrus.Info("JS代码有问题") + return nil, err + } + var fn func(map[string]interface{}, map[string]interface{}, string) []string + err = vm.ExportTo(vm.Get("Switch"), &fn) + if err != nil { + logrus.Info("Js函数映射到 Go 函数失败!") + return nil, err + } + datas := fn(msg.GetMsg(), msg.GetMetadata().GetValues(), msg.GetType()) + return datas, nil +} + +func (bse *baseScriptEngine) ScriptOnFilter(msg message.Message, script string) (bool, error) { + vm := goja.New() + _, err := vm.RunString(script) + if err != nil { + logrus.Info("JS代码有问题") + return false, err + } + var fn func(map[string]interface{}, map[string]interface{}, string) bool + err = vm.ExportTo(vm.Get("Filter"), &fn) + if err != nil { + logrus.Info("Js函数映射到 Go 函数失败!") + return false, err + } + datas := fn(msg.GetMsg(), msg.GetMetadata().GetValues(), msg.GetType()) + return datas, nil +} + +func (bse *baseScriptEngine) ScriptToString(msg message.Message, script string) (string, error) { + + return "", nil +} diff --git a/pkg/flow_engine/nodes/transform_script_node.go b/pkg/flow_engine/nodes/transform_script_node.go new file mode 100644 index 0000000..875a2d9 --- /dev/null +++ b/pkg/flow_engine/nodes/transform_script_node.go @@ -0,0 +1,38 @@ +package nodes + +import ( + "github.com/sirupsen/logrus" + "pandax/pkg/flow_engine/message" +) + +type transformScriptNode struct { + bareNode + Script string `json:"script" yaml:"script"` +} + +type transformScriptNodeFactory struct{} + +func (f transformScriptNodeFactory) Name() string { return "TransformScriptNode" } +func (f transformScriptNodeFactory) Category() string { return NODE_CATEGORY_TRANSFORM } +func (f transformScriptNodeFactory) Labels() []string { return []string{"Success", "Failure"} } +func (f transformScriptNodeFactory) Create(id string, meta Metadata) (Node, error) { + labels := []string{"Success", "Failure"} + node := &transformScriptNode{ + bareNode: newBareNode(f.Name(), id, meta, labels), + } + return decodePath(meta, node) +} + +func (n *transformScriptNode) Handle(msg message.Message) error { + logrus.Infof("%s handle message '%s'", n.Name(), msg.GetType()) + + successLabelNode := n.GetLinkedNode("Success") + failureLabelNode := n.GetLinkedNode("Failure") + + scriptEngine := NewScriptEngine() + newMessage, err := scriptEngine.ScriptOnMessage(msg, n.Script) + if err != nil { + return failureLabelNode.Handle(msg) + } + return successLabelNode.Handle(newMessage) +}