diff --git a/config.yml b/config.yml index eef6749..ad4c67b 100644 --- a/config.yml +++ b/config.yml @@ -29,9 +29,9 @@ redis: port: 6379 mysql: - host: 127.0.0.1:3306 + host: 101.35.247.125:3306 username: root - password: 123456 + password: pandax db-name: pandax config: charset=utf8&loc=Local&parseTime=true diff --git a/pkg/flow_engine/instance.go b/pkg/flow_engine/instance.go deleted file mode 100644 index 72de323..0000000 --- a/pkg/flow_engine/instance.go +++ /dev/null @@ -1,93 +0,0 @@ -package flow_engine - -import ( - "context" - "fmt" - "github.com/sirupsen/logrus" - "pandax/pkg/rule_engine/manifest" - "pandax/pkg/rule_engine/message" - "pandax/pkg/rule_engine/nodes" - "strings" -) - -// ruleChainInstance is rulechain's runtime instance that manage all nodes in this chain, -type ruleChainInstance struct { - firstRuleNodeId string - nodes map[string]nodes.Node -} - -func NewRuleChainInstance(data []byte) (*ruleChainInstance, []error) { - errors := make([]error, 0) - - manifest, err := manifest.New(data) - if err != nil { - errors = append(errors, err) - logrus.WithError(err).Errorf("invalidi manifest file") - return nil, errors - } - return newInstanceWithManifest(manifest) -} - -// newWithManifest create rule chain by user's manifest file -func newInstanceWithManifest(m *manifest.Manifest) (*ruleChainInstance, []error) { - errs := make([]error, 0) - r := &ruleChainInstance{ - firstRuleNodeId: m.FirstRuleNodeId, - nodes: make(map[string]nodes.Node), - } - // Create All nodes - for _, n := range m.Nodes { - metadata := nodes.NewMetadataWithValues(n.Properties) - node, err := nodes.NewNode(n.Type, n.Id, metadata) - if err != nil { - errs = append(errs, err) - continue - } - if _, found := r.nodes[n.Id]; found { - err := fmt.Errorf("node '%s' already exist in rulechain", n.Id) - errs = append(errs, err) - continue - } - r.nodes[n.Id] = node - } - for _, edge := range m.Edges { - originalNode, found := r.nodes[edge.SourceNodeId] - if !found { - err := fmt.Errorf("original node '%s' no exist in", originalNode.Name()) - errs = append(errs, err) - continue - } - targetNode, found := r.nodes[edge.TargetNodeId] - if !found { - err := fmt.Errorf("target node '%s' no exist in rulechain", targetNode.Name()) - errs = append(errs, err) - continue - } - //可以有多个类型 - split := strings.Split(edge.Properties["type"].(string), "/") - for _, ty := range split { - originalNode.AddLinkedNode(ty, targetNode) - } - } - for name, node := range r.nodes { - targetNodes := node.GetLinkedNodes() - mustLabels := node.MustLabels() - for _, label := range mustLabels { - if _, found := targetNodes[label]; !found { - err := fmt.Errorf("the label '%s' in node '%s' no exist'", label, name) - errs = append(errs, err) - continue - } - } - } - - return r, errs -} - -// StartRuleChain -func (c *ruleChainInstance) StartRuleChain(context context.Context, message message.Message) error { - if node, found := c.nodes[c.firstRuleNodeId]; found { - go node.Handle(message) - } - return nil -} diff --git a/pkg/flow_engine/instance_test.go b/pkg/flow_engine/instance_test.go deleted file mode 100644 index dfee433..0000000 --- a/pkg/flow_engine/instance_test.go +++ /dev/null @@ -1,64 +0,0 @@ -package flow_engine - -import ( - "io/ioutil" - "pandax/pkg/rule_engine/message" - "pandax/pkg/rule_engine/nodes" - "testing" -) - -func TestNewRuleChainInstance(t *testing.T) { - buf, err := ioutil.ReadFile("./manifest/manifest_sample.json") - if err != nil { - t.Error(err) - } - _, errs := NewRuleChainInstance(buf) - if len(errs) > 0 { - t.Error(errs[0]) - } -} - -func TestScriptEngine(t *testing.T) { - metadata := message.NewDefaultMetadata(map[string]interface{}{"device": "aa"}) - msg := message.NewMessageWithDetail("1", message.MessageTypeConnectEvent, map[string]interface{}{"aa": 5}, metadata) - scriptEngine := nodes.NewScriptEngine() - const script = ` - function Switch(msg, metadata, msgType) { - function nextRelation(metadata, msg) { - return ['one','nine']; - } - if(metadata.device === 'aa') { - return ['six']; - } - if(msgType === 'Post telemetry') { - return ['two']; - } - return nextRelation(metadata, msg); - } - ` - SwitchResults, err := scriptEngine.ScriptOnSwitch(msg, script) - - if err != nil { - t.Error(err) - } - t.Log(SwitchResults) -} - -func TestScriptOnMessage(t *testing.T) { - metadata := message.NewDefaultMetadata(map[string]interface{}{"device": "aa"}) - msg := message.NewMessageWithDetail("1", message.MessageTypeConnectEvent, map[string]interface{}{"aa": 5}, metadata) - scriptEngine := nodes.NewScriptEngine() - const script = ` - function Transform(msg, metadata, msgType) { - msg.bb = "33" - metadata.event = 55 - return {msg: msg, metadata: metadata, msgType: msgType}; - } - ` - ScriptOnMessageResults, err := scriptEngine.ScriptOnMessage(msg, script) - - if err != nil { - t.Error(err) - } - t.Log(ScriptOnMessageResults.GetMetadata()) -} diff --git a/pkg/flow_engine/manifest/manifest.go b/pkg/flow_engine/manifest/manifest.go deleted file mode 100644 index 21b9349..0000000 --- a/pkg/flow_engine/manifest/manifest.go +++ /dev/null @@ -1,62 +0,0 @@ -package manifest - -import ( - "encoding/json" - "github.com/sirupsen/logrus" -) - -type Node struct { - Type string `json:"type" yaml:"type"` - Id string `json:"id" yaml:"id"` - Properties map[string]interface{} `json:"properties" yaml:"properties"` //debugMode -} - -type Edge struct { - SourceNodeId string `json:"sourceNodeId" yaml:"sourceNodeId"` - TargetNodeId string `json:"targetNodeId" yaml:"targetNodeId"` - Type string `json:"type" yaml:"type"` //success or fail - Properties map[string]interface{} `json:"properties" yaml:"properties"` //debugMode -} - -type Manifest struct { - FirstRuleNodeId string `json:"firstRuleNodeId" yaml:"firstRuleNodeId"` - Nodes []Node `json:"nodes" yaml:"nodes"` - Edges []Edge `json:"edges" yaml:"edges"` -} - -func New(data []byte) (*Manifest, error) { - firstRuleNodeId := "" - manifest := make(map[string]interface{}) - if err := json.Unmarshal(data, &manifest); err != nil { - logrus.WithError(err).Errorf("invalid node chain manifest file") - return nil, err - } - nodes := make([]Node, 0) - for _, mn := range manifest["nodes"].([]interface{}) { - node := mn.(map[string]interface{}) - if node["type"].(string) == "InputNode" { - firstRuleNodeId = node["id"].(string) - } - nodes = append(nodes, Node{ - Id: node["id"].(string), - Type: node["type"].(string), - Properties: node["properties"].(map[string]interface{}), - }) - } - edges := make([]Edge, 0) - for _, en := range manifest["edges"].([]interface{}) { - edge := en.(map[string]interface{}) - edges = append(edges, Edge{ - Type: edge["type"].(string), - Properties: edge["properties"].(map[string]interface{}), - SourceNodeId: edge["sourceNodeId"].(string), - TargetNodeId: edge["targetNodeId"].(string), - }) - } - m := &Manifest{ - FirstRuleNodeId: firstRuleNodeId, - Nodes: nodes, - Edges: edges, - } - return m, nil -} diff --git a/pkg/flow_engine/manifest/manifest_sample.json b/pkg/flow_engine/manifest/manifest_sample.json deleted file mode 100644 index b80e944..0000000 --- a/pkg/flow_engine/manifest/manifest_sample.json +++ /dev/null @@ -1,127 +0,0 @@ -{ - "nodes": [ - { - "id": "3b6b8df4-445f-4e70-9674-f7aa486b3d81", - "type": "InputNode", - "x": 280, - "y": 280, - "properties": { - "icon": "/src/assets/icon_module/svg/start.svg", - "debugMode": false - }, - "zIndex": 1002, - "text": { - "x": 290, - "y": 280, - "value": "输入" - } - }, - { - "id": "45afb241-1977-4cbf-8af3-c12844d5666b", - "type": "DelayNode", - "x": 600, - "y": 160, - "properties": { - "icon": "/src/assets/icon_module/svg/function.svg", - "debugMode": false - }, - "zIndex": 1004, - "text": { - "x": 610, - "y": 160, - "value": "延迟" - } - }, - { - "id": "95047b03-e966-4685-b625-3cea27415706", - "type": "SwitchNode", - "x": 600, - "y": 460, - "properties": { - "icon": "/src/assets/icon_module/svg/switch.svg", - "debugMode": false, - "scripts": "return {\n msg: msg,\n metadata: metadata,\n msgType: msgType\n};" - }, - "zIndex": 1006, - "text": { - "x": 610, - "y": 460, - "value": "分流" - } - } - ], - "edges": [ - { - "id": "fde7f2de-cc0f-467d-a614-4505f058dc2a", - "type": "bezier-link", - "sourceNodeId": "3b6b8df4-445f-4e70-9674-f7aa486b3d81", - "targetNodeId": "45afb241-1977-4cbf-8af3-c12844d5666b", - "startPoint": { - "x": 340, - "y": 280 - }, - "endPoint": { - "x": 540, - "y": 160 - }, - "properties": { - "type": "Success" - }, - "zIndex": 1007, - "pointsList": [ - { - "x": 340, - "y": 280 - }, - { - "x": 440, - "y": 280 - }, - { - "x": 440, - "y": 160 - }, - { - "x": 540, - "y": 160 - } - ] - }, - { - "id": "986878cc-a9be-42b5-afc0-6c0a169c4e4d", - "type": "bezier-link", - "sourceNodeId": "3b6b8df4-445f-4e70-9674-f7aa486b3d81", - "targetNodeId": "95047b03-e966-4685-b625-3cea27415706", - "startPoint": { - "x": 340, - "y": 280 - }, - "endPoint": { - "x": 540, - "y": 460 - }, - "properties": { - "type": "Failure" - }, - "zIndex": 1008, - "pointsList": [ - { - "x": 340, - "y": 280 - }, - { - "x": 440, - "y": 280 - }, - { - "x": 440, - "y": 460 - }, - { - "x": 540, - "y": 460 - } - ] - } - ] -} \ No newline at end of file diff --git a/pkg/flow_engine/message/message.go b/pkg/flow_engine/message/message.go deleted file mode 100644 index 3d08166..0000000 --- a/pkg/flow_engine/message/message.go +++ /dev/null @@ -1,109 +0,0 @@ -package message - -import "github.com/sirupsen/logrus" - -// Message ... -type Message interface { - GetOriginator() string - GetType() string - GetCh() chan map[string]bool - GetMetadata() Metadata - SetType(string) - SetOriginator(string) - SetMetadata(Metadata) -} - -// Metadata ... -type Metadata interface { - Keys() []string - GetKeyValue(key string) interface{} - SetKeyValue(key string, val interface{}) - GetValues() map[string]interface{} -} - -// Predefined message types -const ( - MessageTypePostAttributesRequest = "Post attributes" - MessageTypePostTelemetryRequest = "Post telemetry" - MessageTypeActivityEvent = "Activity event" - MessageTypeInactivityEvent = "Inactivity event" - MessageTypeConnectEvent = "Connect event" - MessageTypeDisconnectEvent = "Disconnect event" -) - -// NewMessage ... -func NewMessage() Message { - return &defaultMessage{ - ch: make(chan map[string]bool), - } -} - -type defaultMessage struct { - id string //uuid - ts int64 //时间戳 - msgType string //消息类型 - originator string //数据发布者 - ch chan map[string]bool //通道 - metadata Metadata //消息的元数据 包括,设备名称,设备类型,命名空间,时间戳等 -} - -// NewMessageWithDetail ... -func NewMessageWithDetail(originator string, messageType string, msg map[string]interface{}, metadata Metadata) Message { - return &defaultMessage{ - originator: originator, - msgType: messageType, - ch: make(chan map[string]bool), - metadata: metadata, - } -} - -func (t *defaultMessage) GetOriginator() string { return t.originator } -func (t *defaultMessage) GetType() string { return t.msgType } -func (t *defaultMessage) GetCh() chan map[string]bool { return t.ch } -func (t *defaultMessage) GetMetadata() Metadata { return t.metadata } -func (t *defaultMessage) SetType(msgType string) { t.msgType = msgType } -func (t *defaultMessage) SetOriginator(originator string) { t.originator = originator } -func (t *defaultMessage) SetMetadata(metadata Metadata) { t.metadata = metadata } - -// NewMetadata ... -func NewMetadata() Metadata { - return &defaultMetadata{ - values: make(map[string]interface{}), - } -} - -type defaultMetadata struct { - values map[string]interface{} -} - -func NewDefaultMetadata(vals map[string]interface{}) Metadata { - return &defaultMetadata{ - values: vals, - } -} - -func (t *defaultMetadata) Keys() []string { - keys := make([]string, 0) - for key := range t.values { - keys = append(keys, key) - } - return keys -} - -func (t *defaultMetadata) GetKeyValue(key string) interface{} { - if _, found := t.values[key]; !found { - logrus.Fatalf("no key '%s' in metadata", key) - } - return t.values[key] -} - -func (t *defaultMetadata) SetKeyValue(key string, val interface{}) { - t.values[key] = val -} - -func (t *defaultMetadata) GetValues() map[string]interface{} { - return t.values -} -func (t *defaultMetadata) SetValues(values map[string]interface{}) { - t.values = values -} diff --git a/pkg/flow_engine/nodes/external_ding_node.go b/pkg/flow_engine/nodes/external_ding_node.go deleted file mode 100644 index 0577992..0000000 --- a/pkg/flow_engine/nodes/external_ding_node.go +++ /dev/null @@ -1,33 +0,0 @@ -package nodes - -import ( - "github.com/sirupsen/logrus" - "pandax/pkg/flow_engine/message" -) - -type externalDingNode struct { - bareNode - WebHook string `json:"webHook" yaml:"webHook"` - MsgType string `json:"msgType" yaml:"msgType"` - Content string `json:"content" yaml:"content"` - IsAtAll bool `json:"isAtAll" yaml:"isAtAll"` - atMobiles []string `json:"atMobiles" yaml:"atMobiles"` -} - -type externalDingNodeFactory struct{} - -func (f externalDingNodeFactory) Name() string { return "ExternalDingNode" } -func (f externalDingNodeFactory) Category() string { return NODE_CATEGORY_EXTERNAL } -func (f externalDingNodeFactory) Labels() []string { return []string{"Success", "Failure"} } -func (f externalDingNodeFactory) Create(id string, meta Metadata) (Node, error) { - node := &externalDingNode{ - bareNode: newBareNode(f.Name(), id, meta, f.Labels()), - } - return decodePath(meta, node) -} - -func (n *externalDingNode) Handle(msg message.Message) error { - logrus.Infof("%s handle message '%s'", n.Name(), msg.GetType()) - - return nil -} diff --git a/pkg/flow_engine/nodes/external_send_email_node.go b/pkg/flow_engine/nodes/external_send_email_node.go deleted file mode 100644 index 8e23ee9..0000000 --- a/pkg/flow_engine/nodes/external_send_email_node.go +++ /dev/null @@ -1,51 +0,0 @@ -package nodes - -import ( - "github.com/sirupsen/logrus" - "pandax/pkg/flow_engine/message" -) - -type externalSendEmailNode struct { - bareNode - From string `json:"from" yaml:"from"` - To string `json:"to" yaml:"to"` - Cc string `json:"cc" yaml:"cc"` - Bcc string `json:"bcc" yaml:"bcc"` - Subject string `json:"subject" yaml:"subject"` - Body string `json:"body" yaml:"body"` -} - -type externalSendEmailNodeFactory struct{} - -func (f externalSendEmailNodeFactory) Name() string { return "ExternalSendEmailNode" } -func (f externalSendEmailNodeFactory) Category() string { return NODE_CATEGORY_EXTERNAL } -func (f externalSendEmailNodeFactory) Labels() []string { return []string{"Success", "Failure"} } -func (f externalSendEmailNodeFactory) Create(id string, meta Metadata) (Node, error) { - labels := []string{"Success", "Failure"} - - node := &externalSendEmailNode{ - bareNode: newBareNode(f.Name(), id, meta, labels), - } - return decodePath(meta, node) -} - -func (n *externalSendEmailNode) Handle(msg message.Message) error { - logrus.Infof("%s handle message '%s'", n.Name(), msg.GetType()) - - successLabelNode := n.GetLinkedNode("Success") - //failureLabelNode := n.GetLinkedNode("Failure") - - /*dialer := runtime.NewDialer(runtime.EMAIL) - variables := map[string]string{ - "from": n.From, - "to": n.To, - "cc": n.Cc, - "bcc": n.Bcc, - "subject": n.Subject, - "body": n.Body, - } - if err := dialer.DialAndSend(msg.GetMetadata(), variables); err != nil { - return failureLabelNode.Handle(msg) - }*/ - return successLabelNode.Handle(msg) -} diff --git a/pkg/flow_engine/nodes/external_send_sms_node.go b/pkg/flow_engine/nodes/external_send_sms_node.go deleted file mode 100644 index b6836ae..0000000 --- a/pkg/flow_engine/nodes/external_send_sms_node.go +++ /dev/null @@ -1,31 +0,0 @@ -package nodes - -import ( - "github.com/sirupsen/logrus" - "pandax/pkg/flow_engine/message" -) - -type externalSendSmsNode struct { - bareNode -} - -type externalSendSmsNodeFactory struct{} - -func (f externalSendSmsNodeFactory) Name() string { return "ExternalSendSmslNode" } -func (f externalSendSmsNodeFactory) Category() string { return NODE_CATEGORY_EXTERNAL } -func (f externalSendSmsNodeFactory) Labels() []string { return []string{"Success", "Failure"} } -func (f externalSendSmsNodeFactory) Create(id string, meta Metadata) (Node, error) { - node := &externalSendSmsNode{ - bareNode: newBareNode(f.Name(), id, meta, f.Labels()), - } - return decodePath(meta, node) -} - -func (n *externalSendSmsNode) Handle(msg message.Message) error { - logrus.Infof("%s handle message '%s'", n.Name(), msg.GetType()) - - successLabelNode := n.GetLinkedNode("Success") - //failureLabelNode := n.GetLinkedNode("Failure") - - return successLabelNode.Handle(msg) -} diff --git a/pkg/flow_engine/nodes/external_wechat_node.go b/pkg/flow_engine/nodes/external_wechat_node.go deleted file mode 100644 index 2d78c8c..0000000 --- a/pkg/flow_engine/nodes/external_wechat_node.go +++ /dev/null @@ -1,33 +0,0 @@ -package nodes - -import ( - "github.com/sirupsen/logrus" - "pandax/pkg/flow_engine/message" -) - -type externalWechatNode struct { - bareNode - WebHook string `json:"webHook" yaml:"webHook"` - MsgType string `json:"msgType" yaml:"msgType"` - Content string `json:"content" yaml:"content"` - IsAtAll bool `json:"isAtAll" yaml:"isAtAll"` - atMobiles []string `json:"atMobiles" yaml:"atMobiles"` -} - -type externalWechatNodeFactory struct{} - -func (f externalWechatNodeFactory) Name() string { return "ExternalWechatNode" } -func (f externalWechatNodeFactory) Category() string { return NODE_CATEGORY_EXTERNAL } -func (f externalWechatNodeFactory) Labels() []string { return []string{"Success", "Failure"} } -func (f externalWechatNodeFactory) Create(id string, meta Metadata) (Node, error) { - node := &externalWechatNode{ - bareNode: newBareNode(f.Name(), id, meta, f.Labels()), - } - return decodePath(meta, node) -} - -func (n *externalWechatNode) Handle(msg message.Message) error { - logrus.Infof("%s handle message '%s'", n.Name(), msg.GetType()) - - return nil -} diff --git a/pkg/flow_engine/nodes/factory.go b/pkg/flow_engine/nodes/factory.go deleted file mode 100644 index 3414c1b..0000000 --- a/pkg/flow_engine/nodes/factory.go +++ /dev/null @@ -1,51 +0,0 @@ -package nodes - -import ( - "fmt" -) - -const ( - NODE_CATEGORY_TRANSFORM = "transform" - NODE_CATEGORY_EXTERNAL = "external" - NODE_CATEGORY_OTHERS = "others" -) - -// Factory is node's factory to create node based on metadata -// factory also manage node's metadta description which can be used by other -// service to present node in web -type Factory interface { - Name() string - Category() string - Labels() []string - Create(id string, meta Metadata) (Node, error) -} - -var ( - // allNodeFactories hold all node's factory - allNodeFactories map[string]Factory = make(map[string]Factory) - - // allNodeCategories hold node's metadata by category - allNodeCategories map[string][]map[string]interface{} = make(map[string][]map[string]interface{}) -) - -// RegisterFactory add a new node factory and classify its category for -// metadata description -func RegisterFactory(f Factory) { - allNodeFactories[f.Name()] = f - - if allNodeCategories[f.Category()] == nil { - allNodeCategories[f.Category()] = []map[string]interface{}{} - } - allNodeCategories[f.Category()] = append(allNodeCategories[f.Category()], map[string]interface{}{"name": f.Name(), "labels": f.Labels()}) -} - -// NewNode is the only way to create a new node -func NewNode(nodeType string, id string, meta Metadata) (Node, error) { - if f, found := allNodeFactories[nodeType]; found { - return f.Create(id, meta) - } - return nil, fmt.Errorf("invalid node type '%s'", nodeType) -} - -// GetCategoryNodes return specified category's all nodes -func GetCategoryNodes() map[string][]map[string]interface{} { return allNodeCategories } diff --git a/pkg/flow_engine/nodes/init.go b/pkg/flow_engine/nodes/init.go deleted file mode 100644 index a18d394..0000000 --- a/pkg/flow_engine/nodes/init.go +++ /dev/null @@ -1,13 +0,0 @@ -package nodes - -// init register all node's factory -func init() { - RegisterFactory(inputNodeFactory{}) - RegisterFactory(transformScriptNodeFactory{}) - - RegisterFactory(externalDingNodeFactory{}) - RegisterFactory(externalWechatNodeFactory{}) - RegisterFactory(externalSendEmailNodeFactory{}) - RegisterFactory(externalSendSmsNodeFactory{}) - -} diff --git a/pkg/flow_engine/nodes/input_node.go b/pkg/flow_engine/nodes/input_node.go deleted file mode 100644 index eddd306..0000000 --- a/pkg/flow_engine/nodes/input_node.go +++ /dev/null @@ -1,34 +0,0 @@ -package nodes - -import ( - "github.com/sirupsen/logrus" - "pandax/pkg/flow_engine/message" -) - -const InputNodeName = "InputNode" - -type inputNode struct { - bareNode -} - -type inputNodeFactory struct{} - -func (f inputNodeFactory) Name() string { return "InputNode" } -func (f inputNodeFactory) Category() string { return NODE_CATEGORY_OTHERS } -func (f inputNodeFactory) Labels() []string { return []string{} } -func (f inputNodeFactory) Create(id string, meta Metadata) (Node, error) { - node := &inputNode{ - bareNode: newBareNode(InputNodeName, id, meta, f.Labels()), - } - return node, nil -} - -func (n *inputNode) Handle(msg message.Message) error { - logrus.Infof("%s handle message '%s'", n.Name(), msg.GetType()) - - nodes := n.GetLinkedNodes() - for _, node := range nodes { - return node.Handle(msg) - } - return nil -} diff --git a/pkg/flow_engine/nodes/metadata.go b/pkg/flow_engine/nodes/metadata.go deleted file mode 100644 index 84c38c6..0000000 --- a/pkg/flow_engine/nodes/metadata.go +++ /dev/null @@ -1,63 +0,0 @@ -package nodes - -import ( - "fmt" - "github.com/mitchellh/mapstructure" -) - -const ( - NODE_CONFIG_MESSAGE_TYPE_KEY = "messageTypeKey" - NODE_CONFIG_ORIGINATOR_TYPE_KEY = "originatorTypeKey" -) - -type Metadata interface { - Keys() []string - With(key string, val interface{}) Metadata - Value(key string) (interface{}, error) - DecodePath(rawVal interface{}) error -} - -type nodeMetadata struct { - keypairs map[string]interface{} -} - -func NewMetadata() Metadata { - return &nodeMetadata{ - keypairs: make(map[string]interface{}), - } -} - -func NewMetadataWithString(vals string) Metadata { - return &nodeMetadata{} -} - -func NewMetadataWithValues(vals map[string]interface{}) Metadata { - return &nodeMetadata{ - keypairs: vals, - } -} - -func (c *nodeMetadata) Keys() []string { - keys := []string{} - for key, _ := range c.keypairs { - keys = append(keys, key) - } - return keys -} - -func (c *nodeMetadata) Value(key string) (interface{}, error) { - if val, found := c.keypairs[key]; found { - return val, nil - } - return nil, fmt.Errorf("key '%s' not found", key) -} - -func (c *nodeMetadata) With(key string, val interface{}) Metadata { - c.keypairs[key] = val - return c -} - -func (c *nodeMetadata) DecodePath(rawVal interface{}) error { - //return utils.Map2Struct(c.keypairs, rawVal) - return mapstructure.Decode(c.keypairs, rawVal) -} diff --git a/pkg/flow_engine/nodes/node.go b/pkg/flow_engine/nodes/node.go deleted file mode 100644 index a6f95f4..0000000 --- a/pkg/flow_engine/nodes/node.go +++ /dev/null @@ -1,65 +0,0 @@ -package nodes - -import ( - "errors" - "pandax/pkg/flow_engine/message" - - "github.com/sirupsen/logrus" -) - -type Node interface { - Name() string - Id() string - Metadata() Metadata - MustLabels() []string - Handle(message.Message) error - - AddLinkedNode(label string, node Node) - GetLinkedNode(label string) Node - GetLinkedNodes() map[string]Node -} - -type bareNode struct { - name string - id string - nodes map[string]Node - meta Metadata - labels []string -} - -func newBareNode(name string, id string, meta Metadata, labels []string) bareNode { - return bareNode{ - name: name, - id: id, - nodes: make(map[string]Node), - meta: meta, - labels: labels, - } -} - -func (n *bareNode) Name() string { return n.name } -func (n *bareNode) WithId(id string) { n.id = id } -func (n *bareNode) Id() string { return n.id } -func (n *bareNode) MustLabels() []string { return n.labels } -func (n *bareNode) AddLinkedNode(label string, node Node) { n.nodes[label] = node } - -func (n *bareNode) GetLinkedNode(label string) Node { - if node, found := n.nodes[label]; found { - return node - } - logrus.Errorf("no label '%s' in node '%s'", label, n.name) - return nil -} - -func (n *bareNode) GetLinkedNodes() map[string]Node { return n.nodes } - -func (n *bareNode) Metadata() Metadata { return n.meta } - -func (n *bareNode) Handle(message.Message) error { return errors.New("not implemented") } - -func decodePath(meta Metadata, n Node) (Node, error) { - if err := meta.DecodePath(n); err != nil { - return n, err - } - return n, nil -} diff --git a/pkg/flow_engine/nodes/script_engine.go b/pkg/flow_engine/nodes/script_engine.go deleted file mode 100644 index b1f6af3..0000000 --- a/pkg/flow_engine/nodes/script_engine.go +++ /dev/null @@ -1,84 +0,0 @@ -package nodes - -import ( - "github.com/dop251/goja" - "github.com/sirupsen/logrus" - "pandax/pkg/flow_engine/message" -) - -type ScriptEngine interface { - ScriptOnMessage(msg message.Message, script string) (message.Message, error) - //used by filter_switch_node - ScriptOnSwitch(msg message.Message, script string) ([]string, error) - //used by filter_script_node - ScriptOnFilter(msg message.Message, script string) (bool, error) - ScriptToString(msg message.Message, script string) (string, error) -} - -type baseScriptEngine struct { -} - -func NewScriptEngine() ScriptEngine { - return &baseScriptEngine{} -} - -func (bse *baseScriptEngine) ScriptOnMessage(msg message.Message, script string) (message.Message, error) { - vm := goja.New() - _, err := vm.RunString(script) - if err != nil { - logrus.Info("JS代码有问题") - return nil, err - } - var fn func(map[string]interface{}, map[string]interface{}, string) map[string]interface{} - err = vm.ExportTo(vm.Get("Transform"), &fn) - if err != nil { - logrus.Info("Js函数映射到 Go 函数失败!") - return nil, err - } - datas := fn(msg.GetMsg(), msg.GetMetadata().GetValues(), msg.GetType()) - msg.SetMsg(datas["msg"].(map[string]interface{})) - msg.SetMetadata(message.NewDefaultMetadata(datas["metadata"].(map[string]interface{}))) - msg.SetType(datas["msgType"].(string)) - return msg, nil - - return nil, nil -} - -func (bse *baseScriptEngine) ScriptOnSwitch(msg message.Message, script string) ([]string, error) { - vm := goja.New() - _, err := vm.RunString(script) - if err != nil { - logrus.Info("JS代码有问题") - return nil, err - } - var fn func(map[string]interface{}, map[string]interface{}, string) []string - err = vm.ExportTo(vm.Get("Switch"), &fn) - if err != nil { - logrus.Info("Js函数映射到 Go 函数失败!") - return nil, err - } - datas := fn(msg.GetMsg(), msg.GetMetadata().GetValues(), msg.GetType()) - return datas, nil -} - -func (bse *baseScriptEngine) ScriptOnFilter(msg message.Message, script string) (bool, error) { - vm := goja.New() - _, err := vm.RunString(script) - if err != nil { - logrus.Info("JS代码有问题") - return false, err - } - var fn func(map[string]interface{}, map[string]interface{}, string) bool - err = vm.ExportTo(vm.Get("Filter"), &fn) - if err != nil { - logrus.Info("Js函数映射到 Go 函数失败!") - return false, err - } - datas := fn(msg.GetMsg(), msg.GetMetadata().GetValues(), msg.GetType()) - return datas, nil -} - -func (bse *baseScriptEngine) ScriptToString(msg message.Message, script string) (string, error) { - - return "", nil -} diff --git a/pkg/flow_engine/nodes/transform_script_node.go b/pkg/flow_engine/nodes/transform_script_node.go deleted file mode 100644 index 875a2d9..0000000 --- a/pkg/flow_engine/nodes/transform_script_node.go +++ /dev/null @@ -1,38 +0,0 @@ -package nodes - -import ( - "github.com/sirupsen/logrus" - "pandax/pkg/flow_engine/message" -) - -type transformScriptNode struct { - bareNode - Script string `json:"script" yaml:"script"` -} - -type transformScriptNodeFactory struct{} - -func (f transformScriptNodeFactory) Name() string { return "TransformScriptNode" } -func (f transformScriptNodeFactory) Category() string { return NODE_CATEGORY_TRANSFORM } -func (f transformScriptNodeFactory) Labels() []string { return []string{"Success", "Failure"} } -func (f transformScriptNodeFactory) Create(id string, meta Metadata) (Node, error) { - labels := []string{"Success", "Failure"} - node := &transformScriptNode{ - bareNode: newBareNode(f.Name(), id, meta, labels), - } - return decodePath(meta, node) -} - -func (n *transformScriptNode) Handle(msg message.Message) error { - logrus.Infof("%s handle message '%s'", n.Name(), msg.GetType()) - - successLabelNode := n.GetLinkedNode("Success") - failureLabelNode := n.GetLinkedNode("Failure") - - scriptEngine := NewScriptEngine() - newMessage, err := scriptEngine.ScriptOnMessage(msg, n.Script) - if err != nil { - return failureLabelNode.Handle(msg) - } - return successLabelNode.Handle(newMessage) -} diff --git a/pkg/initialize/table.go b/pkg/initialize/table.go index 7f434bb..ebbb4c2 100644 --- a/pkg/initialize/table.go +++ b/pkg/initialize/table.go @@ -46,8 +46,6 @@ func InitTable() { flowEntity.FlowWorkOrder{}, flowEntity.FlowWorkOrderTemplate{}, flowEntity.FlowWorkStage{}, - flowEntity.FlowWorkTask{}, - flowEntity.FlowWorkTaskHistory{}, ), "初始化表失败") }