mirror of
https://gitee.com/XM-GO/PandaX.git
synced 2026-04-23 02:48:34 +08:00
规则链
This commit is contained in:
@@ -1,6 +1,11 @@
|
||||
package message
|
||||
|
||||
import "github.com/sirupsen/logrus"
|
||||
import (
|
||||
"encoding/json"
|
||||
"github.com/google/uuid"
|
||||
"github.com/sirupsen/logrus"
|
||||
"time"
|
||||
)
|
||||
|
||||
// Message ...
|
||||
type Message interface {
|
||||
@@ -12,6 +17,7 @@ type Message interface {
|
||||
SetMsg(map[string]interface{})
|
||||
SetOriginator(string)
|
||||
SetMetadata(Metadata)
|
||||
MarshalBinary() ([]byte, error)
|
||||
}
|
||||
|
||||
// Metadata ...
|
||||
@@ -35,16 +41,18 @@ const (
|
||||
// NewMessage ...
|
||||
func NewMessage() Message {
|
||||
return &defaultMessage{
|
||||
id: uuid.New().String(),
|
||||
ts: time.Now(),
|
||||
msg: map[string]interface{}{},
|
||||
}
|
||||
}
|
||||
|
||||
type defaultMessage struct {
|
||||
id string //uuid
|
||||
ts int64 //时间戳
|
||||
ts time.Time //时间戳
|
||||
msgType string //消息类型, attributes(参数),telemetry(遥测),Connect连接事件
|
||||
originator string //数据发布者 设备 规则链
|
||||
customerId string //客户Id UUID
|
||||
userId string //客户Id UUID
|
||||
deviceId string //设备Id UUID
|
||||
msg map[string]interface{} //数据 数据结构JSON 设备原始数据 msg
|
||||
metadata Metadata //消息的元数据 包括,设备名称,设备类型,命名空间,时间戳等
|
||||
@@ -68,6 +76,9 @@ func (t *defaultMessage) SetType(msgType string) { t.msgType = msgTyp
|
||||
func (t *defaultMessage) SetMsg(msg map[string]interface{}) { t.msg = msg }
|
||||
func (t *defaultMessage) SetOriginator(originator string) { t.originator = originator }
|
||||
func (t *defaultMessage) SetMetadata(metadata Metadata) { t.metadata = metadata }
|
||||
func (t *defaultMessage) MarshalBinary() ([]byte, error) {
|
||||
return json.Marshal(t)
|
||||
}
|
||||
|
||||
// NewMetadata ...
|
||||
func NewMetadata() Metadata {
|
||||
|
||||
@@ -1,14 +1,3 @@
|
||||
// Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
// not use p file except in compliance with the License. You may obtain
|
||||
// a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
// License for the specific language governing permissions and limitations
|
||||
// under the License.
|
||||
package nodes
|
||||
|
||||
import (
|
||||
|
||||
@@ -14,7 +14,7 @@ type messageGeneratorNode struct {
|
||||
|
||||
type messageGeneratorNodeFactory struct{}
|
||||
|
||||
func (f messageGeneratorNodeFactory) Name() string { return "MessageGeneratorNode" }
|
||||
func (f messageGeneratorNodeFactory) Name() string { return "GeneratorNode" }
|
||||
func (f messageGeneratorNodeFactory) Category() string { return NODE_CATEGORY_ACTION }
|
||||
func (f messageGeneratorNodeFactory) Labels() []string { return []string{"Created", "Updated"} }
|
||||
func (f messageGeneratorNodeFactory) Create(id string, meta Metadata) (Node, error) {
|
||||
|
||||
@@ -16,7 +16,7 @@ type externalDingNode struct {
|
||||
|
||||
type externalDingNodeFactory struct{}
|
||||
|
||||
func (f externalDingNodeFactory) Name() string { return "ExternalDingNode" }
|
||||
func (f externalDingNodeFactory) Name() string { return "DingNode" }
|
||||
func (f externalDingNodeFactory) Category() string { return NODE_CATEGORY_EXTERNAL }
|
||||
func (f externalDingNodeFactory) Labels() []string { return []string{"Success", "Failure"} }
|
||||
func (f externalDingNodeFactory) Create(id string, meta Metadata) (Node, error) {
|
||||
|
||||
@@ -14,7 +14,7 @@ type externalKafkaNode struct {
|
||||
|
||||
type externalKafkaNodeFactory struct{}
|
||||
|
||||
func (f externalKafkaNodeFactory) Name() string { return "ExternalKafkaNode" }
|
||||
func (f externalKafkaNodeFactory) Name() string { return "KafkaNode" }
|
||||
func (f externalKafkaNodeFactory) Category() string { return NODE_CATEGORY_EXTERNAL }
|
||||
func (f externalKafkaNodeFactory) Labels() []string { return []string{"Success", "Failure"} }
|
||||
func (f externalKafkaNodeFactory) Create(id string, meta Metadata) (Node, error) {
|
||||
|
||||
@@ -1,28 +0,0 @@
|
||||
package nodes
|
||||
|
||||
import (
|
||||
"github.com/sirupsen/logrus"
|
||||
"pandax/pkg/rule_engine/message"
|
||||
)
|
||||
|
||||
type externalMqNode struct {
|
||||
bareNode
|
||||
}
|
||||
|
||||
type externalMqNodeFactory struct{}
|
||||
|
||||
func (f externalMqNodeFactory) Name() string { return "ExternalMqNode" }
|
||||
func (f externalMqNodeFactory) Category() string { return NODE_CATEGORY_EXTERNAL }
|
||||
func (f externalMqNodeFactory) Labels() []string { return []string{"Success", "Failure"} }
|
||||
func (f externalMqNodeFactory) Create(id string, meta Metadata) (Node, error) {
|
||||
node := &externalMqNode{
|
||||
bareNode: newBareNode(f.Name(), id, meta, f.Labels()),
|
||||
}
|
||||
return decodePath(meta, node)
|
||||
}
|
||||
|
||||
func (n *externalMqNode) Handle(msg message.Message) error {
|
||||
logrus.Infof("%s handle message '%s'", n.Name(), msg.GetType())
|
||||
|
||||
return nil
|
||||
}
|
||||
@@ -24,7 +24,7 @@ type externalMqttNode struct {
|
||||
|
||||
type externalMqttNodeFactory struct{}
|
||||
|
||||
func (f externalMqttNodeFactory) Name() string { return "ExternalMqttNode" }
|
||||
func (f externalMqttNodeFactory) Name() string { return "MqttNode" }
|
||||
func (f externalMqttNodeFactory) Category() string { return NODE_CATEGORY_EXTERNAL }
|
||||
func (f externalMqttNodeFactory) Labels() []string { return []string{"Success", "Failure"} }
|
||||
func (f externalMqttNodeFactory) Create(id string, meta Metadata) (Node, error) {
|
||||
|
||||
48
pkg/rule_engine/nodes/external_nats_node.go
Normal file
48
pkg/rule_engine/nodes/external_nats_node.go
Normal file
@@ -0,0 +1,48 @@
|
||||
package nodes
|
||||
|
||||
import (
|
||||
"github.com/nats-io/nats.go"
|
||||
"github.com/sirupsen/logrus"
|
||||
"pandax/pkg/rule_engine/message"
|
||||
)
|
||||
|
||||
type externalNatsNode struct {
|
||||
bareNode
|
||||
Url string `json:"url"`
|
||||
Subject string `json:"subject"`
|
||||
Body string
|
||||
client *nats.Conn
|
||||
}
|
||||
|
||||
type externalNatsNodeFactory struct{}
|
||||
|
||||
func (f externalNatsNodeFactory) Name() string { return "NatsNode" }
|
||||
func (f externalNatsNodeFactory) Category() string { return NODE_CATEGORY_EXTERNAL }
|
||||
func (f externalNatsNodeFactory) Labels() []string { return []string{"Success", "Failure"} }
|
||||
func (f externalNatsNodeFactory) Create(id string, meta Metadata) (Node, error) {
|
||||
node := &externalNatsNode{
|
||||
bareNode: newBareNode(f.Name(), id, meta, f.Labels()),
|
||||
}
|
||||
_, err := decodePath(meta, node)
|
||||
if err != nil {
|
||||
return node, err
|
||||
}
|
||||
connect, err := nats.Connect(node.Url)
|
||||
if err != nil {
|
||||
return node, err
|
||||
}
|
||||
node.client = connect
|
||||
return node, nil
|
||||
}
|
||||
|
||||
func (n *externalNatsNode) Handle(msg message.Message) error {
|
||||
logrus.Infof("%s handle message '%s'", n.Name(), msg.GetType())
|
||||
successLabelNode := n.GetLinkedNode("Success")
|
||||
failureLabelNode := n.GetLinkedNode("Failure")
|
||||
err := n.client.Publish(n.Subject, []byte(n.Body))
|
||||
if err != nil {
|
||||
n.client.Close()
|
||||
return failureLabelNode.Handle(msg)
|
||||
}
|
||||
return successLabelNode.Handle(msg)
|
||||
}
|
||||
@@ -7,20 +7,14 @@ import (
|
||||
|
||||
type externalRestapiNode struct {
|
||||
bareNode
|
||||
RestEndpointUrlPattern string `json:"restEndpointUrlPattern" yaml:"restEndpointUrlPattern"`
|
||||
RequestMethod string `json:"requestMethod" yaml:"requestMethod"`
|
||||
headers map[string]string `json:"headers" yaml:"headers"`
|
||||
UseSimpleClientHttpFactory bool `json:"useSimpleClientHttpFactory" yaml:"useSimpleClientHttpFactory"`
|
||||
ReadTimeoutMs int `json:"readTimeoutMs" yaml:"readTimeoutMs"`
|
||||
MaxParallelRequestsCount int `json:"maxParallelRequestsCount" yaml:"maxParallelRequestsCount"`
|
||||
UseRedisQueueForMsgPersistence bool `json:"useRedisQueueForMsgPersistence" yaml:"useRedisQueueForMsgPersistence"`
|
||||
trimQueue bool `json:"trimQueue" yaml:"trimQueue"`
|
||||
MaxQueueSize int `json:"maxQueueSize" yaml:"maxQueueSize"`
|
||||
RestEndpointUrlPattern string `json:"restEndpointUrlPattern" yaml:"restEndpointUrlPattern"`
|
||||
RequestMethod string `json:"requestMethod" yaml:"requestMethod"`
|
||||
headers map[string]string `json:"headers" yaml:"headers"`
|
||||
}
|
||||
|
||||
type externalRestapiNodeFactory struct{}
|
||||
|
||||
func (f externalRestapiNodeFactory) Name() string { return "ExternalRestapiNode" }
|
||||
func (f externalRestapiNodeFactory) Name() string { return "RestapiNode" }
|
||||
func (f externalRestapiNodeFactory) Category() string { return NODE_CATEGORY_EXTERNAL }
|
||||
func (f externalRestapiNodeFactory) Labels() []string { return []string{"Success", "Failure"} }
|
||||
func (f externalRestapiNodeFactory) Create(id string, meta Metadata) (Node, error) {
|
||||
|
||||
@@ -12,7 +12,7 @@ type externalRuleChainNode struct {
|
||||
|
||||
type externalRuleChainNodeFactory struct{}
|
||||
|
||||
func (f externalRuleChainNodeFactory) Name() string { return "ExternalRuleChainNode" }
|
||||
func (f externalRuleChainNodeFactory) Name() string { return "RuleChainNode" }
|
||||
func (f externalRuleChainNodeFactory) Category() string { return NODE_CATEGORY_EXTERNAL }
|
||||
func (f externalRuleChainNodeFactory) Labels() []string { return []string{"Success", "Failure"} }
|
||||
func (f externalRuleChainNodeFactory) Create(id string, meta Metadata) (Node, error) {
|
||||
|
||||
@@ -17,7 +17,7 @@ type externalSendEmailNode struct {
|
||||
|
||||
type externalSendEmailNodeFactory struct{}
|
||||
|
||||
func (f externalSendEmailNodeFactory) Name() string { return "ExternalSendEmailNode" }
|
||||
func (f externalSendEmailNodeFactory) Name() string { return "SendEmailNode" }
|
||||
func (f externalSendEmailNodeFactory) Category() string { return NODE_CATEGORY_EXTERNAL }
|
||||
func (f externalSendEmailNodeFactory) Labels() []string { return []string{"Success", "Failure"} }
|
||||
func (f externalSendEmailNodeFactory) Create(id string, meta Metadata) (Node, error) {
|
||||
|
||||
@@ -11,7 +11,7 @@ type externalSendSmsNode struct {
|
||||
|
||||
type externalSendSmsNodeFactory struct{}
|
||||
|
||||
func (f externalSendSmsNodeFactory) Name() string { return "ExternalSendSmslNode" }
|
||||
func (f externalSendSmsNodeFactory) Name() string { return "SendSmsNode" }
|
||||
func (f externalSendSmsNodeFactory) Category() string { return NODE_CATEGORY_EXTERNAL }
|
||||
func (f externalSendSmsNodeFactory) Labels() []string { return []string{"Success", "Failure"} }
|
||||
func (f externalSendSmsNodeFactory) Create(id string, meta Metadata) (Node, error) {
|
||||
|
||||
@@ -16,7 +16,7 @@ type externalWechatNode struct {
|
||||
|
||||
type externalWechatNodeFactory struct{}
|
||||
|
||||
func (f externalWechatNodeFactory) Name() string { return "ExternalWechatNode" }
|
||||
func (f externalWechatNodeFactory) Name() string { return "WechatNode" }
|
||||
func (f externalWechatNodeFactory) Category() string { return NODE_CATEGORY_EXTERNAL }
|
||||
func (f externalWechatNodeFactory) Labels() []string { return []string{"Success", "Failure"} }
|
||||
func (f externalWechatNodeFactory) Create(id string, meta Metadata) (Node, error) {
|
||||
|
||||
@@ -30,6 +30,7 @@ var (
|
||||
|
||||
// allNodeCategories hold node's metadata by category
|
||||
allNodeCategories map[string][]map[string]interface{} = make(map[string][]map[string]interface{})
|
||||
allCategories []map[string]interface{} = make([]map[string]interface{}, 0)
|
||||
)
|
||||
|
||||
// RegisterFactory add a new node factory and classify its category for
|
||||
@@ -41,6 +42,7 @@ func RegisterFactory(f Factory) {
|
||||
allNodeCategories[f.Category()] = []map[string]interface{}{}
|
||||
}
|
||||
allNodeCategories[f.Category()] = append(allNodeCategories[f.Category()], map[string]interface{}{"name": f.Name(), "labels": f.Labels()})
|
||||
allCategories = append(allCategories, map[string]interface{}{"name": f.Name(), "labels": f.Labels()})
|
||||
}
|
||||
|
||||
// NewNode is the only way to create a new node
|
||||
@@ -53,3 +55,5 @@ func NewNode(nodeType string, id string, meta Metadata) (Node, error) {
|
||||
|
||||
// GetCategoryNodes return specified category's all nodes
|
||||
func GetCategoryNodes() map[string][]map[string]interface{} { return allNodeCategories }
|
||||
|
||||
func GetCategory() []map[string]interface{} { return allCategories }
|
||||
|
||||
@@ -19,7 +19,7 @@ type deviceTypeSwitchNode struct {
|
||||
|
||||
type deviceTypeSwitchNodeFactory struct{}
|
||||
|
||||
func (f deviceTypeSwitchNodeFactory) Name() string { return "DeviceTypeSwitch" }
|
||||
func (f deviceTypeSwitchNodeFactory) Name() string { return "DeviceTypeSwitchNode" }
|
||||
func (f deviceTypeSwitchNodeFactory) Category() string { return NODE_CATEGORY_FILTER }
|
||||
func (f deviceTypeSwitchNodeFactory) Labels() []string { return []string{DEVICE, GATEWAY} }
|
||||
func (f deviceTypeSwitchNodeFactory) Create(id string, meta Metadata) (Node, error) {
|
||||
|
||||
@@ -24,7 +24,7 @@ func init() {
|
||||
RegisterFactory(externalDingNodeFactory{})
|
||||
RegisterFactory(externalWechatNodeFactory{})
|
||||
RegisterFactory(externalKafkaNodeFactory{})
|
||||
RegisterFactory(externalMqNodeFactory{})
|
||||
RegisterFactory(externalNatsNodeFactory{})
|
||||
RegisterFactory(externalMqttNodeFactory{})
|
||||
RegisterFactory(externalRestapiNodeFactory{})
|
||||
RegisterFactory(externalSendEmailNodeFactory{})
|
||||
|
||||
@@ -12,7 +12,7 @@ type transformDeleteKeyNode struct {
|
||||
}
|
||||
type transformDeleteKeyNodeFactory struct{}
|
||||
|
||||
func (f transformDeleteKeyNodeFactory) Name() string { return "TransformDeleteKeyNode" }
|
||||
func (f transformDeleteKeyNodeFactory) Name() string { return "DeleteKeyNode" }
|
||||
func (f transformDeleteKeyNodeFactory) Category() string { return NODE_CATEGORY_TRANSFORM }
|
||||
func (f transformDeleteKeyNodeFactory) Labels() []string { return []string{"Success", "Failure"} }
|
||||
func (f transformDeleteKeyNodeFactory) Create(id string, meta Metadata) (Node, error) {
|
||||
|
||||
@@ -16,7 +16,7 @@ type KeyName struct {
|
||||
}
|
||||
type transformRenameKeyNodeFactory struct{}
|
||||
|
||||
func (f transformRenameKeyNodeFactory) Name() string { return "TransformRenameKeyNode" }
|
||||
func (f transformRenameKeyNodeFactory) Name() string { return "RenameKeyNode" }
|
||||
func (f transformRenameKeyNodeFactory) Category() string { return NODE_CATEGORY_TRANSFORM }
|
||||
func (f transformRenameKeyNodeFactory) Labels() []string { return []string{"Success", "Failure"} }
|
||||
func (f transformRenameKeyNodeFactory) Create(id string, meta Metadata) (Node, error) {
|
||||
|
||||
@@ -12,7 +12,7 @@ type transformScriptNode struct {
|
||||
|
||||
type transformScriptNodeFactory struct{}
|
||||
|
||||
func (f transformScriptNodeFactory) Name() string { return "TransformScriptNode" }
|
||||
func (f transformScriptNodeFactory) Name() string { return "ScriptNode" }
|
||||
func (f transformScriptNodeFactory) Category() string { return NODE_CATEGORY_TRANSFORM }
|
||||
func (f transformScriptNodeFactory) Labels() []string { return []string{"Success", "Failure"} }
|
||||
func (f transformScriptNodeFactory) Create(id string, meta Metadata) (Node, error) {
|
||||
|
||||
Reference in New Issue
Block a user