This commit is contained in:
PandaGoAdmin
2023-03-28 17:23:11 +08:00
parent 6682c8d9fe
commit b9f0bd744c
46 changed files with 546 additions and 502 deletions

View File

@@ -1,13 +0,0 @@
package nodes
type assignToCustomerNode struct {
bareNode
}
type assignToCustomerFactory struct{}
func (f assignToCustomerFactory) Name() string { return "AssignCustomerFactoryNode" }
func (f assignToCustomerFactory) Category() string { return NODE_CATEGORY_ACTION }
func (f assignToCustomerFactory) Create(id string, meta Metadata) (Node, error) {
return nil, nil
}

View File

@@ -1,7 +1,7 @@
package nodes
import (
"pandax/pkg/rule_engine/message"
"dz-iot-server/rule_engine/message"
"time"
"github.com/sirupsen/logrus"
@@ -23,10 +23,10 @@ type clearAlarmNode struct {
func (f clearAlarmNodeFactory) Name() string { return ClearAlarmNodeName }
func (f clearAlarmNodeFactory) Category() string { return NODE_CATEGORY_ACTION }
func (f clearAlarmNodeFactory) Labels() []string { return []string{"Created", "Updated", "Failure"} }
func (f clearAlarmNodeFactory) Create(id string, meta Metadata) (Node, error) {
labels := []string{"Created", "Updated", "Failure"}
node := &clearAlarmNode{
bareNode: newBareNode(f.Name(), id, meta, labels),
bareNode: newBareNode(f.Name(), id, meta, f.Labels()),
}
return decodePath(meta, node)
}

View File

@@ -1,8 +1,8 @@
package nodes
import (
"dz-iot-server/rule_engine/message"
"fmt"
"pandax/pkg/rule_engine/message"
"github.com/sirupsen/logrus"
)
@@ -21,10 +21,10 @@ type createAlarmNodeFactory struct{}
func (f createAlarmNodeFactory) Name() string { return "CreateAlarmNode" }
func (f createAlarmNodeFactory) Category() string { return NODE_CATEGORY_ACTION }
func (f createAlarmNodeFactory) Labels() []string { return []string{"Created", "Updated", "Failure"} }
func (f createAlarmNodeFactory) Create(id string, meta Metadata) (Node, error) {
labels := []string{"Created", "Updated", "Failure"}
node := &createAlarmNode{
bareNode: newBareNode(f.Name(), id, meta, labels),
bareNode: newBareNode(f.Name(), id, meta, f.Labels()),
}
return decodePath(meta, node)
}

View File

@@ -1,34 +0,0 @@
package nodes
import (
"pandax/pkg/rule_engine/message"
)
type createRelationNode struct {
bareNode
Direction string
RelationType string
EntityType string
EntityNamePattern string
EntityTypePattern string
EntityCacheExpiration int64
CreateEntityIfNotExists bool
ChangeOriginatorToRelatedEntity bool
RemoveCurrentRelations bool
}
type createRelationNodeFactory struct{}
func (f createRelationNodeFactory) Name() string { return "CreateRelationNode" }
func (f createRelationNodeFactory) Category() string { return NODE_CATEGORY_ACTION }
func (f createRelationNodeFactory) Create(id string, meta Metadata) (Node, error) {
labels := []string{"Success", "Failure"}
node := &createRelationNode{
bareNode: newBareNode(f.Name(), id, meta, labels),
}
return decodePath(meta, node)
}
func (n *createRelationNode) Handle(msg message.Message) error {
return nil
}

View File

@@ -12,8 +12,8 @@
package nodes
import (
"dz-iot-server/rule_engine/message"
"fmt"
"pandax/pkg/rule_engine/message"
"sync"
"time"

View File

@@ -1,9 +0,0 @@
package nodes
type deleteRelationNodeFactory struct{}
func (f deleteRelationNodeFactory) Name() string { return "DeleteRelationNode" }
func (f deleteRelationNodeFactory) Category() string { return NODE_CATEGORY_ACTION }
func (f deleteRelationNodeFactory) Create(id string, meta Metadata) (Node, error) {
return nil, nil
}

View File

@@ -1,9 +1,9 @@
package nodes
import (
"dz-iot-server/rule_engine/message"
"fmt"
"github.com/sirupsen/logrus"
"pandax/pkg/rule_engine/message"
)
type messageGeneratorNode struct {
@@ -16,11 +16,10 @@ type messageGeneratorNodeFactory struct{}
func (f messageGeneratorNodeFactory) Name() string { return "MessageGeneratorNode" }
func (f messageGeneratorNodeFactory) Category() string { return NODE_CATEGORY_ACTION }
func (f messageGeneratorNodeFactory) Labels() []string { return []string{"Created", "Updated"} }
func (f messageGeneratorNodeFactory) Create(id string, meta Metadata) (Node, error) {
labels := []string{"Created", "Updated"}
node := &messageGeneratorNode{
bareNode: newBareNode(f.Name(), id, meta, labels),
bareNode: newBareNode(f.Name(), id, meta, f.Labels()),
}
return decodePath(meta, node)
}

View File

@@ -1,24 +1,24 @@
package nodes
import (
"dz-iot-server/rule_engine/message"
"fmt"
"log"
"pandax/pkg/rule_engine/message"
)
type logNode struct {
bareNode
Script string
Script string `json:"script"`
}
type logNodeFactory struct{}
func (f logNodeFactory) Name() string { return "LogNode" }
func (f logNodeFactory) Category() string { return NODE_CATEGORY_ACTION }
func (f logNodeFactory) Labels() []string { return []string{"Success", "Failure"} }
func (f logNodeFactory) Create(id string, meta Metadata) (Node, error) {
labels := []string{"Success", "Failure"}
node := &logNode{
bareNode: newBareNode(f.Name(), id, meta, labels),
bareNode: newBareNode(f.Name(), id, meta, f.Labels()),
}
return decodePath(meta, node)
}

View File

@@ -1,9 +0,0 @@
package nodes
type rpcCallReplyNodeFactory struct{}
func (f rpcCallReplyNodeFactory) Name() string { return "RPCCallReplyNode" }
func (f rpcCallReplyNodeFactory) Category() string { return NODE_CATEGORY_ACTION }
func (f rpcCallReplyNodeFactory) Create(id string, meta Metadata) (Node, error) {
return nil, nil
}

View File

@@ -1,14 +0,0 @@
package nodes
type rPCCallRequestNode struct {
bareNode
TimeoutInSeconds int
}
type rpcCallRequestNodeFactory struct{}
func (f rpcCallRequestNodeFactory) Name() string { return "RPCCallRequestNode" }
func (f rpcCallRequestNodeFactory) Category() string { return NODE_CATEGORY_ACTION }
func (f rpcCallRequestNodeFactory) Create(id string, meta Metadata) (Node, error) {
return nil, nil
}

View File

@@ -1,8 +1,8 @@
package nodes
import (
"dz-iot-server/rule_engine/message"
"fmt"
"pandax/pkg/rule_engine/message"
)
type SaveAttributesNode struct {
@@ -13,10 +13,10 @@ type saveAttributesNodeFactory struct{}
func (f saveAttributesNodeFactory) Name() string { return "SaveAttributesNode" }
func (f saveAttributesNodeFactory) Category() string { return NODE_CATEGORY_ACTION }
func (f saveAttributesNodeFactory) Labels() []string { return []string{"Success", "Failure"} }
func (f saveAttributesNodeFactory) Create(id string, meta Metadata) (Node, error) {
labels := []string{"Success", "Failure"}
node := &SaveAttributesNode{
bareNode: newBareNode(f.Name(), id, meta, labels),
bareNode: newBareNode(f.Name(), id, meta, f.Labels()),
}
return decodePath(meta, node)
}

View File

@@ -1,9 +1,25 @@
package nodes
import (
"dz-iot-server/rule_engine/message"
)
type SaveTimeSeriesNode struct {
bareNode
}
type saveTimeSeriesNodeFactory struct{}
func (f saveTimeSeriesNodeFactory) Name() string { return "SaveTimeSeriesNode" }
func (f saveTimeSeriesNodeFactory) Category() string { return NODE_CATEGORY_ACTION }
func (f saveTimeSeriesNodeFactory) Labels() []string { return []string{"Success", "Failure"} }
func (f saveTimeSeriesNodeFactory) Create(id string, meta Metadata) (Node, error) {
return nil, nil
}
func (n *SaveTimeSeriesNode) Handle(msg message.Message) error {
successLableNode := n.GetLinkedNode("Success")
//failureLableNode := n.GetLinkedNode("Failure")
return successLableNode.Handle(msg)
}

View File

@@ -1,9 +0,0 @@
package nodes
type unassignFromCustomerNodeFactory struct{}
func (f unassignFromCustomerNodeFactory) Name() string { return "UnassignFromCustomerNode" }
func (f unassignFromCustomerNodeFactory) Category() string { return NODE_CATEGORY_ACTION }
func (f unassignFromCustomerNodeFactory) Create(id string, meta Metadata) (Node, error) {
return nil, nil
}

View File

@@ -1,23 +0,0 @@
package nodes
import "pandax/pkg/rule_engine/message"
type enrichmentCustomerNode struct {
bareNode
}
type enrichmentCustomerAttrNodeFactory struct{}
func (f enrichmentCustomerAttrNodeFactory) Name() string { return "EnrichmentCustomerNode" }
func (f enrichmentCustomerAttrNodeFactory) Category() string { return NODE_CATEGORY_ENRICHMENT }
func (f enrichmentCustomerAttrNodeFactory) Create(id string, meta Metadata) (Node, error) {
labels := []string{"Success", "Failure"}
node := &enrichmentCustomerNode{
bareNode: newBareNode(f.Name(), id, meta, labels),
}
return decodePath(meta, node)
}
func (n *enrichmentCustomerNode) Handle(msg message.Message) error {
return nil
}

View File

@@ -1,9 +0,0 @@
package nodes
type enrichmentDeviceAttrNodeFactory struct{}
func (f enrichmentDeviceAttrNodeFactory) Name() string { return "EnrichmentDeviceAttrbute" }
func (f enrichmentDeviceAttrNodeFactory) Category() string { return NODE_CATEGORY_ENRICHMENT }
func (f enrichmentDeviceAttrNodeFactory) Create(id string, meta Metadata) (Node, error) {
return nil, nil
}

View File

@@ -1,9 +0,0 @@
package nodes
type enrichmentOriginatorAttrNodeFactory struct{}
func (f enrichmentOriginatorAttrNodeFactory) Name() string { return "EnrichmentOriginatorAttribute" }
func (f enrichmentOriginatorAttrNodeFactory) Category() string { return NODE_CATEGORY_ENRICHMENT }
func (f enrichmentOriginatorAttrNodeFactory) Create(id string, meta Metadata) (Node, error) {
return nil, nil
}

View File

@@ -1,9 +0,0 @@
package nodes
type enrichmentOriginatorFieldsNodeFactory struct{}
func (f enrichmentOriginatorFieldsNodeFactory) Name() string { return "EnrichmentOriginatorFieldsNode" }
func (f enrichmentOriginatorFieldsNodeFactory) Category() string { return NODE_CATEGORY_ENRICHMENT }
func (f enrichmentOriginatorFieldsNodeFactory) Create(id string, meta Metadata) (Node, error) {
return nil, nil
}

View File

@@ -1,11 +0,0 @@
package nodes
type enrichmentOriginatorTelemetryNodeFactory struct{}
func (f enrichmentOriginatorTelemetryNodeFactory) Name() string {
return "EnrichmentOriginatorTelemetryNode"
}
func (f enrichmentOriginatorTelemetryNodeFactory) Category() string { return NODE_CATEGORY_ENRICHMENT }
func (f enrichmentOriginatorTelemetryNodeFactory) Create(id string, meta Metadata) (Node, error) {
return nil, nil
}

View File

@@ -1,17 +0,0 @@
package nodes
type enrichmentTenantNode struct {
bareNode
}
type enrichmentTenantNodeFactory struct{}
func (f enrichmentTenantNodeFactory) Name() string { return "EnrichmentTenantNode" }
func (f enrichmentTenantNodeFactory) Category() string { return NODE_CATEGORY_ENRICHMENT }
func (f enrichmentTenantNodeFactory) Create(id string, meta Metadata) (Node, error) {
labels := []string{"Success", "Failure"}
node := &enrichmentTenantNode{
bareNode: newBareNode(f.Name(), id, meta, labels),
}
return decodePath(meta, node)
}

View File

@@ -0,0 +1,33 @@
package nodes
import (
"dz-iot-server/rule_engine/message"
"github.com/sirupsen/logrus"
)
type externalDingNode struct {
bareNode
WebHook string `json:"webHook" yaml:"webHook"`
MsgType string `json:"msgType" yaml:"msgType"`
Content string `json:"content" yaml:"content"`
IsAtAll bool `json:"isAtAll" yaml:"isAtAll"`
atMobiles []string `json:"atMobiles" yaml:"atMobiles"`
}
type externalDingNodeFactory struct{}
func (f externalDingNodeFactory) Name() string { return "ExternalDingNode" }
func (f externalDingNodeFactory) Category() string { return NODE_CATEGORY_EXTERNAL }
func (f externalDingNodeFactory) Labels() []string { return []string{"Success", "Failure"} }
func (f externalDingNodeFactory) Create(id string, meta Metadata) (Node, error) {
node := &externalDingNode{
bareNode: newBareNode(f.Name(), id, meta, f.Labels()),
}
return decodePath(meta, node)
}
func (n *externalDingNode) Handle(msg message.Message) error {
logrus.Infof("%s handle message '%s'", n.Name(), msg.GetType())
return nil
}

View File

@@ -0,0 +1,31 @@
package nodes
import (
"dz-iot-server/rule_engine/message"
"github.com/sirupsen/logrus"
)
type externalKafkaNode struct {
bareNode
Server string `json:"server" yaml:"server"`
Topic string `json:"topic" yaml:"topic"`
KafkaCli string
}
type externalKafkaNodeFactory struct{}
func (f externalKafkaNodeFactory) Name() string { return "ExternalKafkaNode" }
func (f externalKafkaNodeFactory) Category() string { return NODE_CATEGORY_EXTERNAL }
func (f externalKafkaNodeFactory) Labels() []string { return []string{"Success", "Failure"} }
func (f externalKafkaNodeFactory) Create(id string, meta Metadata) (Node, error) {
node := &externalKafkaNode{
bareNode: newBareNode(f.Name(), id, meta, f.Labels()),
}
return decodePath(meta, node)
}
func (n *externalKafkaNode) Handle(msg message.Message) error {
logrus.Infof("%s handle message '%s'", n.Name(), msg.GetType())
return nil
}

View File

@@ -0,0 +1,28 @@
package nodes
import (
"dz-iot-server/rule_engine/message"
"github.com/sirupsen/logrus"
)
type externalMqNode struct {
bareNode
}
type externalMqNodeFactory struct{}
func (f externalMqNodeFactory) Name() string { return "ExternalMqNode" }
func (f externalMqNodeFactory) Category() string { return NODE_CATEGORY_EXTERNAL }
func (f externalMqNodeFactory) Labels() []string { return []string{"Success", "Failure"} }
func (f externalMqNodeFactory) Create(id string, meta Metadata) (Node, error) {
node := &externalMqNode{
bareNode: newBareNode(f.Name(), id, meta, f.Labels()),
}
return decodePath(meta, node)
}
func (n *externalMqNode) Handle(msg message.Message) error {
logrus.Infof("%s handle message '%s'", n.Name(), msg.GetType())
return nil
}

View File

@@ -1,8 +1,9 @@
package nodes
import (
"dz-iot-server/rule_engine/message"
"encoding/json"
"fmt"
"pandax/pkg/rule_engine/message"
"time"
mqtt "github.com/eclipse/paho.mqtt.golang"
@@ -11,13 +12,13 @@ import (
type externalMqttNode struct {
bareNode
TopicPattern string
Host string
Port string
ConnectTimeoutSec int
ClientId string
CleanSession bool
Ssl bool
TopicPattern string `json:"topicPattern"`
Host string `json:"host"`
Port string `json:"port"`
ConnectTimeoutSec int `json:"connectTimeoutSec"`
ClientId string `json:"clientId"`
CleanSession bool `json:"cleanSession"`
Ssl bool `json:"ssl"`
MqttCli mqtt.Client
}
@@ -25,11 +26,11 @@ type externalMqttNodeFactory struct{}
func (f externalMqttNodeFactory) Name() string { return "ExternalMqttNode" }
func (f externalMqttNodeFactory) Category() string { return NODE_CATEGORY_EXTERNAL }
func (f externalMqttNodeFactory) Labels() []string { return []string{"Success", "Failure"} }
func (f externalMqttNodeFactory) Create(id string, meta Metadata) (Node, error) {
labels := []string{"Success", "Failure"}
node := &externalMqttNode{
bareNode: newBareNode(f.Name(), id, meta, labels),
bareNode: newBareNode(f.Name(), id, meta, f.Labels()),
}
broker := fmt.Sprintf("tcp://%s:%s", node.Host, node.Port)
opts := mqtt.NewClientOptions().AddBroker(broker)
@@ -51,7 +52,7 @@ func (n *externalMqttNode) Handle(msg message.Message) error {
successLabelNode := n.GetLinkedNode("Success")
failureLabelNode := n.GetLinkedNode("Failure")
topic := n.TopicPattern //need fix add msg.metadata in it
sendmqttmsg, err := msg.MarshalBinary()
sendmqttmsg, err := json.Marshal(msg.GetMsg())
if err != nil {
return nil
}

View File

@@ -1,26 +1,37 @@
package nodes
import (
"dz-iot-server/rule_engine/message"
"github.com/sirupsen/logrus"
)
type externalRestapiNode struct {
bareNode
RestEndpointUrlPattern string
RequestMethod string
headers map[string]string
UseSimpleClientHttpFactory bool
ReadTimeoutMs int
MaxParallelRequestsCount int
UseRedisQueueForMsgPersistence bool
trimQueue bool
MaxQueueSize int
RestEndpointUrlPattern string `json:"restEndpointUrlPattern" yaml:"restEndpointUrlPattern"`
RequestMethod string `json:"requestMethod" yaml:"requestMethod"`
headers map[string]string `json:"headers" yaml:"headers"`
UseSimpleClientHttpFactory bool `json:"useSimpleClientHttpFactory" yaml:"useSimpleClientHttpFactory"`
ReadTimeoutMs int `json:"readTimeoutMs" yaml:"readTimeoutMs"`
MaxParallelRequestsCount int `json:"maxParallelRequestsCount" yaml:"maxParallelRequestsCount"`
UseRedisQueueForMsgPersistence bool `json:"useRedisQueueForMsgPersistence" yaml:"useRedisQueueForMsgPersistence"`
trimQueue bool `json:"trimQueue" yaml:"trimQueue"`
MaxQueueSize int `json:"maxQueueSize" yaml:"maxQueueSize"`
}
type externalRestapiNodeFactory struct{}
func (f externalRestapiNodeFactory) Name() string { return "ExternalRestapiNode" }
func (f externalRestapiNodeFactory) Category() string { return NODE_CATEGORY_EXTERNAL }
func (f externalRestapiNodeFactory) Labels() []string { return []string{"Success", "Failure"} }
func (f externalRestapiNodeFactory) Create(id string, meta Metadata) (Node, error) {
labels := []string{"True", "False"}
node := &externalRestapiNode{
bareNode: newBareNode(f.Name(), id, meta, labels),
bareNode: newBareNode(f.Name(), id, meta, f.Labels()),
}
return decodePath(meta, node)
}
func (n *externalRestapiNode) Handle(msg message.Message) error {
logrus.Infof("%s handle message '%s'", n.Name(), msg.GetType())
return nil
}

View File

@@ -0,0 +1,32 @@
package nodes
import (
"dz-iot-server/rule_engine/message"
"github.com/sirupsen/logrus"
)
type externalRuleChainNode struct {
bareNode
RuleId string `json:"ruleId" yaml:"ruleId"`
}
type externalRuleChainNodeFactory struct{}
func (f externalRuleChainNodeFactory) Name() string { return "ExternalRuleChainNode" }
func (f externalRuleChainNodeFactory) Category() string { return NODE_CATEGORY_EXTERNAL }
func (f externalRuleChainNodeFactory) Labels() []string { return []string{"Success", "Failure"} }
func (f externalRuleChainNodeFactory) Create(id string, meta Metadata) (Node, error) {
labels := []string{"Success", "Failure"}
node := &externalRuleChainNode{
bareNode: newBareNode(f.Name(), id, meta, labels),
}
return decodePath(meta, node)
}
func (n *externalRuleChainNode) Handle(msg message.Message) error {
logrus.Infof("%s handle message '%s'", n.Name(), msg.GetType())
return nil
}

View File

@@ -1,11 +1,11 @@
package nodes
import (
"dz-iot-server/rule_engine/message"
"github.com/sirupsen/logrus"
"pandax/pkg/rule_engine/message"
)
type transformToEmailNode struct {
type externalSendEmailNode struct {
bareNode
From string `json:"from" yaml:"from"`
To string `json:"to" yaml:"to"`
@@ -15,21 +15,21 @@ type transformToEmailNode struct {
Body string `json:"body" yaml:"body"`
}
type transformToEmailNodeFactory struct{}
type externalSendEmailNodeFactory struct{}
func (f transformToEmailNodeFactory) Name() string { return "TransformToEmailNode" }
func (f transformToEmailNodeFactory) Category() string { return NODE_CATEGORY_TRANSFORM }
func (f transformToEmailNodeFactory) Create(id string, meta Metadata) (Node, error) {
func (f externalSendEmailNodeFactory) Name() string { return "ExternalSendEmailNode" }
func (f externalSendEmailNodeFactory) Category() string { return NODE_CATEGORY_EXTERNAL }
func (f externalSendEmailNodeFactory) Labels() []string { return []string{"Success", "Failure"} }
func (f externalSendEmailNodeFactory) Create(id string, meta Metadata) (Node, error) {
labels := []string{"Success", "Failure"}
node := &transformToEmailNode{
node := &externalSendEmailNode{
bareNode: newBareNode(f.Name(), id, meta, labels),
}
return decodePath(meta, node)
}
func (n *transformToEmailNode) Handle(msg message.Message) error {
func (n *externalSendEmailNode) Handle(msg message.Message) error {
logrus.Infof("%s handle message '%s'", n.Name(), msg.GetType())
successLabelNode := n.GetLinkedNode("Success")

View File

@@ -0,0 +1,31 @@
package nodes
import (
"dz-iot-server/rule_engine/message"
"github.com/sirupsen/logrus"
)
type externalSendSmsNode struct {
bareNode
}
type externalSendSmsNodeFactory struct{}
func (f externalSendSmsNodeFactory) Name() string { return "ExternalSendSmslNode" }
func (f externalSendSmsNodeFactory) Category() string { return NODE_CATEGORY_EXTERNAL }
func (f externalSendSmsNodeFactory) Labels() []string { return []string{"Success", "Failure"} }
func (f externalSendSmsNodeFactory) Create(id string, meta Metadata) (Node, error) {
node := &externalSendSmsNode{
bareNode: newBareNode(f.Name(), id, meta, f.Labels()),
}
return decodePath(meta, node)
}
func (n *externalSendSmsNode) Handle(msg message.Message) error {
logrus.Infof("%s handle message '%s'", n.Name(), msg.GetType())
successLabelNode := n.GetLinkedNode("Success")
//failureLabelNode := n.GetLinkedNode("Failure")
return successLabelNode.Handle(msg)
}

View File

@@ -0,0 +1,33 @@
package nodes
import (
"dz-iot-server/rule_engine/message"
"github.com/sirupsen/logrus"
)
type externalWechatNode struct {
bareNode
WebHook string `json:"webHook" yaml:"webHook"`
MsgType string `json:"msgType" yaml:"msgType"`
Content string `json:"content" yaml:"content"`
IsAtAll bool `json:"isAtAll" yaml:"isAtAll"`
atMobiles []string `json:"atMobiles" yaml:"atMobiles"`
}
type externalWechatNodeFactory struct{}
func (f externalWechatNodeFactory) Name() string { return "ExternalWechatNode" }
func (f externalWechatNodeFactory) Category() string { return NODE_CATEGORY_EXTERNAL }
func (f externalWechatNodeFactory) Labels() []string { return []string{"Success", "Failure"} }
func (f externalWechatNodeFactory) Create(id string, meta Metadata) (Node, error) {
node := &externalWechatNode{
bareNode: newBareNode(f.Name(), id, meta, f.Labels()),
}
return decodePath(meta, node)
}
func (n *externalWechatNode) Handle(msg message.Message) error {
logrus.Infof("%s handle message '%s'", n.Name(), msg.GetType())
return nil
}

View File

@@ -1,34 +0,0 @@
package nodes
import (
"github.com/sirupsen/logrus"
"pandax/pkg/rule_engine/message"
)
//检查关联关系
//该消息来自与哪个实体或到那个实体
type checkExistenceFieldsNode struct {
bareNode
}
type checkExistenceFieldsNodeFactory struct{}
func (f checkExistenceFieldsNodeFactory) Name() string { return "CheckExistenceFieldsNode" }
func (f checkExistenceFieldsNodeFactory) Category() string { return NODE_CATEGORY_FILTER }
func (f checkExistenceFieldsNodeFactory) Create(id string, meta Metadata) (Node, error) {
labels := []string{"True", "False"}
node := &checkExistenceFieldsNode{
bareNode: newBareNode(f.Name(), id, meta, labels),
}
return decodePath(meta, node)
}
func (n *checkExistenceFieldsNode) Handle(msg message.Message) error {
logrus.Infof("%s handle message '%s'", n.Name(), msg.GetType())
//trueLabelNode := n.GetLinkedNode("True")
//falseLabelNode := n.GetLinkedNode("False")
return nil
}

View File

@@ -1,67 +0,0 @@
package nodes
import (
"github.com/sirupsen/logrus"
"pandax/pkg/rule_engine/message"
)
const (
RelationTypeContains = "Contains"
RelationTypeNotContains = "NotContains"
)
//检查关联关系
//该消息来自与哪个实体或到那个实体
type checkRelationFilterNode struct {
bareNode
Direction string `json:"direction" yaml:"direction"`
RelationType string `json:"relationType" yaml:"relationType"`
InstanceType string `json:"instanceType" yaml:"instanceType"`
Values []string `json:"values" yaml:"values"`
}
type checkRelationFilterNodeFactory struct{}
func (f checkRelationFilterNodeFactory) Name() string { return "CheckRelationFilterNode" }
func (f checkRelationFilterNodeFactory) Category() string { return NODE_CATEGORY_FILTER }
func (f checkRelationFilterNodeFactory) Create(id string, meta Metadata) (Node, error) {
labels := []string{"True", "False"}
node := &checkRelationFilterNode{
bareNode: newBareNode(f.Name(), id, meta, labels),
Values: []string{},
}
return decodePath(meta, node)
}
func (n *checkRelationFilterNode) Handle(msg message.Message) error {
logrus.Infof("%s handle message '%s'", n.Name(), msg.GetType())
trueLabelNode := n.GetLinkedNode("True")
falseLabelNode := n.GetLinkedNode("False")
//direction := msg.GetDirection()
attr := msg.GetMetadata().GetKeyValue(n.InstanceType)
switch n.RelationType {
case RelationTypeContains:
for _, val := range n.Values {
// specified attribute exist in names
if attr == val {
return trueLabelNode.Handle(msg)
}
}
// not found
return falseLabelNode.Handle(msg)
case RelationTypeNotContains:
for _, val := range n.Values {
// specified attribute exist in names
if attr == val {
return falseLabelNode.Handle(msg)
}
}
// not found
return trueLabelNode.Handle(msg)
}
return nil
}

View File

@@ -0,0 +1,46 @@
package nodes
import (
"dz-iot-server/rule_engine/message"
"fmt"
"github.com/sirupsen/logrus"
)
const (
DEVICE = "DEVICE"
GATEWAY = "GATEWAY"
)
//检查关联关系
//该消息来自与哪个实体或到那个实体
type deviceTypeSwitchNode struct {
bareNode
}
type deviceTypeSwitchNodeFactory struct{}
func (f deviceTypeSwitchNodeFactory) Name() string { return "DeviceTypeSwitch" }
func (f deviceTypeSwitchNodeFactory) Category() string { return NODE_CATEGORY_FILTER }
func (f deviceTypeSwitchNodeFactory) Labels() []string { return []string{DEVICE, GATEWAY} }
func (f deviceTypeSwitchNodeFactory) Create(id string, meta Metadata) (Node, error) {
node := &deviceTypeSwitchNode{
bareNode: newBareNode(f.Name(), id, meta, f.Labels()),
}
return decodePath(meta, node)
}
func (n *deviceTypeSwitchNode) Handle(msg message.Message) error {
logrus.Infof("%s handle message '%s'", n.Name(), msg.GetType())
deviceLabelNode := n.GetLinkedNode(DEVICE)
gatewayLabelNode := n.GetLinkedNode(GATEWAY)
if deviceLabelNode == nil && gatewayLabelNode == nil {
return fmt.Errorf("no device and gateway label linked node in %s", n.Name())
}
if msg.GetMetadata().GetKeyValue("deviceType") == DEVICE {
return deviceLabelNode.Handle(msg)
}
return gatewayLabelNode.Handle(msg)
}

View File

@@ -1,9 +1,8 @@
package nodes
import (
"fmt"
"dz-iot-server/rule_engine/message"
"github.com/sirupsen/logrus"
"pandax/pkg/rule_engine/message"
)
type messageTypeFilterNode struct {
@@ -15,11 +14,11 @@ type messageTypeFilterNodeFactory struct{}
func (f messageTypeFilterNodeFactory) Name() string { return "MessageTypeFilterNode" }
func (f messageTypeFilterNodeFactory) Category() string { return NODE_CATEGORY_FILTER }
func (f messageTypeFilterNodeFactory) Labels() []string { return []string{"True", "False"} }
func (f messageTypeFilterNodeFactory) Create(id string, meta Metadata) (Node, error) {
labels := []string{"True", "False"}
node := &messageTypeFilterNode{
bareNode: newBareNode(f.Name(), id, meta, labels),
bareNode: newBareNode(f.Name(), id, meta, f.Labels()),
MessageTypes: []string{},
}
return decodePath(meta, node)
@@ -30,16 +29,10 @@ func (n *messageTypeFilterNode) Handle(msg message.Message) error {
trueLabelNode := n.GetLinkedNode("True")
falseLabelNode := n.GetLinkedNode("False")
if trueLabelNode == nil || falseLabelNode == nil {
return fmt.Errorf("no true or false label linked node in %s", n.Name())
}
messageType := msg.GetType()
// TODO: how to resolve user customized message type dynamically
//userMessageType := msg.GetMetadata().GetKeyValue(n.Metadata().MessageTypeKey)
userMessageType := "TODO"
for _, filterType := range n.MessageTypes {
if filterType == messageType || filterType == userMessageType {
if filterType == messageType {
return trueLabelNode.Handle(msg)
}
}

View File

@@ -1,8 +1,8 @@
package nodes
import (
"dz-iot-server/rule_engine/message"
"fmt"
"pandax/pkg/rule_engine/message"
"github.com/sirupsen/logrus"
)
@@ -14,7 +14,15 @@ type messageTypeSwitchNodeFactory struct{}
func (f messageTypeSwitchNodeFactory) Name() string { return "MessageTypeSwitchNode" }
func (f messageTypeSwitchNodeFactory) Category() string { return NODE_CATEGORY_FILTER }
func (f messageTypeSwitchNodeFactory) Labels() []string { return []string{"True", "False"} }
func (f messageTypeSwitchNodeFactory) Labels() []string {
return []string{
message.MessageTypePostAttributesRequest,
message.MessageTypePostTelemetryRequest,
message.MessageTypeConnectEvent,
message.MessageTypeDisconnectEvent,
"Other",
}
}
func (f messageTypeSwitchNodeFactory) Create(id string, meta Metadata) (Node, error) {
node := &messageTypeSwitchNode{
bareNode: newBareNode(f.Name(), id, meta, f.Labels()),
@@ -27,19 +35,16 @@ func (n *messageTypeSwitchNode) Handle(msg message.Message) error {
nodes := n.GetLinkedNodes()
messageType := msg.GetType()
messageTypeKey, _ := n.Metadata().Value(NODE_CONFIG_MESSAGE_TYPE_KEY)
userMessageType := msg.GetMetadata().GetKeyValue(messageTypeKey.(string))
for label, node := range nodes {
if messageType == label || userMessageType == label {
if messageType == label {
return node.Handle(msg)
}
}
// if other label exist
// 自定义类型 或 未识别类型
if node := n.GetLinkedNode("Other"); node != nil {
return node.Handle(msg)
}
// not found
return fmt.Errorf("%s no label to handle message", n.Name())
}

View File

@@ -1,43 +0,0 @@
package nodes
import (
"github.com/sirupsen/logrus"
"pandax/pkg/rule_engine/message"
)
type originatorTypeFilterNode struct {
bareNode
Filters []string `json:"filters" yaml:"filters"`
}
type originatorFilterNodeFactory struct{}
func (f originatorFilterNodeFactory) Name() string { return "OriginatorFilterNode" }
func (f originatorFilterNodeFactory) Category() string { return NODE_CATEGORY_FILTER }
func (f originatorFilterNodeFactory) Create(id string, meta Metadata) (Node, error) {
labels := []string{"True", "False"}
node := &originatorTypeFilterNode{
bareNode: newBareNode(f.Name(), id, meta, labels),
Filters: []string{},
}
return decodePath(meta, node)
}
func (n *originatorTypeFilterNode) Handle(msg message.Message) error {
logrus.Infof("%s handle message '%s'", n.Name(), msg.GetType())
trueLabelNode := n.GetLinkedNode("True")
falseLabelNode := n.GetLinkedNode("False")
//links := n.GetLinks()
originatorType := msg.GetOriginator()
for _, filter := range n.Filters {
if originatorType == filter {
return trueLabelNode.Handle(msg)
}
}
// not found
return falseLabelNode.Handle(msg)
}

View File

@@ -1,39 +0,0 @@
package nodes
import (
"fmt"
"github.com/sirupsen/logrus"
"pandax/pkg/rule_engine/message"
)
type originatorTypeSwitchNode struct {
bareNode
}
type originatorTypeSwitchNodeFactory struct{}
func (f originatorTypeSwitchNodeFactory) Name() string { return "OriginatorTypeSwitchNode" }
func (f originatorTypeSwitchNodeFactory) Category() string { return NODE_CATEGORY_FILTER }
func (f originatorTypeSwitchNodeFactory) Create(id string, meta Metadata) (Node, error) {
labels := []string{}
node := &originatorTypeSwitchNode{
bareNode: newBareNode(f.Name(), id, meta, labels),
}
return decodePath(meta, node)
}
func (n *originatorTypeSwitchNode) Handle(msg message.Message) error {
logrus.Infof("%s handle message '%s'", n.Name(), msg.GetType())
nodes := n.GetLinkedNodes()
originatorType := msg.GetOriginator()
for label, node := range nodes {
if originatorType == label {
return node.Handle(msg)
}
}
// not found
return fmt.Errorf("%s no label to handle message", n.Name())
}

View File

@@ -1,8 +1,8 @@
package nodes
import (
"dz-iot-server/rule_engine/message"
"github.com/sirupsen/logrus"
"pandax/pkg/rule_engine/message"
)
const ScriptFilterNodeName = "ScriptFilterNode"
@@ -14,13 +14,12 @@ type scriptFilterNode struct {
type scriptFilterNodeFactory struct{}
func (f scriptFilterNodeFactory) Name() string { return "ScriptFilterNode" }
func (f scriptFilterNodeFactory) Name() string { return ScriptFilterNodeName }
func (f scriptFilterNodeFactory) Category() string { return NODE_CATEGORY_FILTER }
func (f scriptFilterNodeFactory) Labels() []string { return []string{"True", "False"} }
func (f scriptFilterNodeFactory) Create(id string, meta Metadata) (Node, error) {
labels := []string{"True", "False"}
node := &scriptFilterNode{
bareNode: newBareNode(f.Name(), id, meta, labels),
bareNode: newBareNode(f.Name(), id, meta, f.Labels()),
}
return decodePath(meta, node)
}

View File

@@ -12,8 +12,8 @@
package nodes
import (
"dz-iot-server/rule_engine/message"
"github.com/sirupsen/logrus"
"pandax/pkg/rule_engine/message"
)
type switchFilterNode struct {
@@ -27,7 +27,7 @@ func (f switchFilterNodeFactory) Name() string { return "SwitchNode" }
func (f switchFilterNodeFactory) Category() string { return NODE_CATEGORY_FILTER }
func (f switchFilterNodeFactory) Labels() []string {
return []string{
"True", "False", message.MessageTypePostTelemetryRequest,
"Failure", "True", "False", message.MessageTypePostTelemetryRequest,
message.MessageTypeConnectEvent,
}
}
@@ -44,7 +44,7 @@ func (n *switchFilterNode) Handle(msg message.Message) error {
scriptEngine := NewScriptEngine()
SwitchResults, err := scriptEngine.ScriptOnSwitch(msg, n.Scripts)
if err != nil {
return err
return nil
}
nodes := n.GetLinkedNodes()
for label, node := range nodes {

View File

@@ -4,5 +4,31 @@ package nodes
func init() {
RegisterFactory(inputNodeFactory{})
RegisterFactory(switchFilterNodeFactory{})
RegisterFactory(scriptFilterNodeFactory{})
RegisterFactory(messageTypeFilterNodeFactory{})
RegisterFactory(messageTypeSwitchNodeFactory{})
RegisterFactory(deviceTypeSwitchNodeFactory{})
RegisterFactory(transformDeleteKeyNodeFactory{})
RegisterFactory(transformRenameKeyNodeFactory{})
RegisterFactory(transformScriptNodeFactory{})
RegisterFactory(createAlarmNodeFactory{})
RegisterFactory(clearAlarmNodeFactory{})
RegisterFactory(messageGeneratorNodeFactory{})
RegisterFactory(logNodeFactory{})
RegisterFactory(saveAttributesNodeFactory{})
RegisterFactory(saveTimeSeriesNodeFactory{})
RegisterFactory(delayNodeFactory{})
RegisterFactory(externalDingNodeFactory{})
RegisterFactory(externalWechatNodeFactory{})
RegisterFactory(externalKafkaNodeFactory{})
RegisterFactory(externalMqNodeFactory{})
RegisterFactory(externalMqttNodeFactory{})
RegisterFactory(externalRestapiNodeFactory{})
RegisterFactory(externalSendEmailNodeFactory{})
RegisterFactory(externalSendSmsNodeFactory{})
RegisterFactory(externalRuleChainNodeFactory{})
}

View File

@@ -1,8 +1,8 @@
package nodes
import (
"dz-iot-server/rule_engine/message"
"github.com/sirupsen/logrus"
"pandax/pkg/rule_engine/message"
)
const InputNodeName = "InputNode"

View File

@@ -23,6 +23,23 @@ func NewScriptEngine() ScriptEngine {
}
func (bse *baseScriptEngine) ScriptOnMessage(msg message.Message, script string) (message.Message, error) {
vm := goja.New()
_, err := vm.RunString(script)
if err != nil {
logrus.Info("JS代码有问题")
return nil, err
}
var fn func(map[string]interface{}, map[string]interface{}, string) map[string]interface{}
err = vm.ExportTo(vm.Get("Transform"), &fn)
if err != nil {
logrus.Info("Js函数映射到 Go 函数失败!")
return nil, err
}
datas := fn(msg.GetMsg(), msg.GetMetadata().GetValues(), msg.GetType())
msg.SetMsg(datas["msg"].(map[string]interface{}))
msg.SetMetadata(message.NewDefaultMetadata(datas["metadata"].(map[string]interface{})))
msg.SetType(datas["msgType"].(string))
return msg, nil
return nil, nil
}
@@ -34,13 +51,13 @@ func (bse *baseScriptEngine) ScriptOnSwitch(msg message.Message, script string)
logrus.Info("JS代码有问题")
return nil, err
}
var fn func(message.Message, message.Metadata, string) []string
var fn func(map[string]interface{}, map[string]interface{}, string) []string
err = vm.ExportTo(vm.Get("Switch"), &fn)
if err != nil {
logrus.Info("Js函数映射到 Go 函数失败!")
return nil, err
}
datas := fn(msg, msg.GetMetadata(), msg.GetType())
datas := fn(msg.GetMsg(), msg.GetMetadata().GetValues(), msg.GetType())
return datas, nil
}
@@ -51,13 +68,13 @@ func (bse *baseScriptEngine) ScriptOnFilter(msg message.Message, script string)
logrus.Info("JS代码有问题")
return false, err
}
var fn func(message.Message, message.Metadata, string) bool
var fn func(map[string]interface{}, map[string]interface{}, string) bool
err = vm.ExportTo(vm.Get("Filter"), &fn)
if err != nil {
logrus.Info("Js函数映射到 Go 函数失败!")
return false, err
}
datas := fn(msg, msg.GetMetadata(), msg.GetType())
datas := fn(msg.GetMsg(), msg.GetMetadata().GetValues(), msg.GetType())
return datas, nil
}

View File

@@ -1,43 +0,0 @@
package nodes
import (
"github.com/sirupsen/logrus"
"pandax/pkg/rule_engine/message"
)
type transformChangeOriginatorNode struct {
bareNode
OriginatorSource string `json:"originatorSource" yaml:"originatorSource"`
Direction string `json:"direction" yaml:"direction"`
MaxRelationLevel int `json:"maxRelationLevel" yaml:"maxRelationLevel"`
//RelationFilters []runtime.RelationFilter `json:"relationFilters" yaml:"relationFilters"`
}
type transformChangeOriginatorNodeFactory struct{}
func (f transformChangeOriginatorNodeFactory) Name() string { return "TransformChangeOriginatorNode" }
func (f transformChangeOriginatorNodeFactory) Category() string { return NODE_CATEGORY_TRANSFORM }
func (f transformChangeOriginatorNodeFactory) Create(id string, meta Metadata) (Node, error) {
labels := []string{"Success", "Failure"}
node := &transformChangeOriginatorNode{
bareNode: newBareNode(f.Name(), id, meta, labels),
//RelationFilters: []runtime.RelationFilter{},
}
return decodePath(meta, node)
}
func (n *transformChangeOriginatorNode) Handle(msg message.Message) error {
logrus.Infof("%s handle message '%s'", n.Name(), msg.GetType())
//successLabelNode := n.GetLinkedNode("Sucess")
failureLabelNode := n.GetLinkedNode("Failure")
//relationQuery := runtime.NewRelationQuery()
//entities := relationQuery.QueryEntities(n.Direction, n.MaxRelationLevel, n.RelationFilters)
/*if len(entities) > 0 && entities[0] == msg.GetOriginator() {
msg.SetOriginator(entities[0])
return successLabelNode.Handle(msg)
}*/
return failureLabelNode.Handle(msg)
}

View File

@@ -0,0 +1,52 @@
package nodes
import (
"dz-iot-server/rule_engine/message"
"github.com/sirupsen/logrus"
)
type transformDeleteKeyNode struct {
bareNode
FormType string `json:"formType" yaml:"formType"` //msg metadata
Keys []string `json:"keys" yaml:"keys"`
}
type transformDeleteKeyNodeFactory struct{}
func (f transformDeleteKeyNodeFactory) Name() string { return "TransformDeleteKeyNode" }
func (f transformDeleteKeyNodeFactory) Category() string { return NODE_CATEGORY_TRANSFORM }
func (f transformDeleteKeyNodeFactory) Labels() []string { return []string{"Success", "Failure"} }
func (f transformDeleteKeyNodeFactory) Create(id string, meta Metadata) (Node, error) {
node := &transformDeleteKeyNode{
bareNode: newBareNode(f.Name(), id, meta, f.Labels()),
}
return decodePath(meta, node)
}
func (n *transformDeleteKeyNode) Handle(msg message.Message) error {
logrus.Infof("%s handle message '%s'", n.Name(), msg.GetType())
successLabelNode := n.GetLinkedNode("Success")
failureLabelNode := n.GetLinkedNode("Failure")
if n.FormType == "msg" {
data := msg.GetMsg()
for _, key := range n.Keys {
if _, found := data[key]; found {
delete(data, key)
msg.SetMsg(data)
}
}
} else if n.FormType == "metadata" {
data := msg.GetMetadata()
for _, key := range n.Keys {
if data.GetKeyValue(key) != nil {
values := data.GetValues()
delete(values, key)
msg.SetMetadata(message.NewDefaultMetadata(values))
}
}
} else {
failureLabelNode.Handle(msg)
}
return successLabelNode.Handle(msg)
}

View File

@@ -0,0 +1,58 @@
package nodes
import (
"dz-iot-server/rule_engine/message"
"github.com/sirupsen/logrus"
)
type transformRenameKeyNode struct {
bareNode
FormType string `json:"formType" yaml:"formType"` //msg metadata
Keys []KeyName `json:"keys" yaml:"keys"`
}
type KeyName struct {
oldName string `json:"oldName" yaml:"oldName"`
newName string `json:"newName" yaml:"newName"`
}
type transformRenameKeyNodeFactory struct{}
func (f transformRenameKeyNodeFactory) Name() string { return "TransformRenameKeyNode" }
func (f transformRenameKeyNodeFactory) Category() string { return NODE_CATEGORY_TRANSFORM }
func (f transformRenameKeyNodeFactory) Labels() []string { return []string{"Success", "Failure"} }
func (f transformRenameKeyNodeFactory) Create(id string, meta Metadata) (Node, error) {
node := &transformScriptNode{
bareNode: newBareNode(f.Name(), id, meta, f.Labels()),
}
return decodePath(meta, node)
}
func (n *transformRenameKeyNode) Handle(msg message.Message) error {
logrus.Infof("%s handle message '%s'", n.Name(), msg.GetType())
successLabelNode := n.GetLinkedNode("Success")
failureLabelNode := n.GetLinkedNode("Failure")
if n.FormType == "msg" {
data := msg.GetMsg()
for _, key := range n.Keys {
if _, found := data[key.oldName]; found {
data[key.newName] = data[key.oldName]
delete(data, key.oldName)
msg.SetMsg(data)
}
}
} else if n.FormType == "metadata" {
data := msg.GetMetadata()
for _, key := range n.Keys {
if data.GetKeyValue(key.oldName) != nil {
values := data.GetValues()
values[key.newName] = values[key.oldName]
delete(values, key.oldName)
msg.SetMetadata(message.NewDefaultMetadata(values))
}
}
} else {
failureLabelNode.Handle(msg)
}
return successLabelNode.Handle(msg)
}

View File

@@ -1,8 +1,8 @@
package nodes
import (
"dz-iot-server/rule_engine/message"
"github.com/sirupsen/logrus"
"pandax/pkg/rule_engine/message"
)
type transformScriptNode struct {
@@ -14,7 +14,7 @@ type transformScriptNodeFactory struct{}
func (f transformScriptNodeFactory) Name() string { return "TransformScriptNode" }
func (f transformScriptNodeFactory) Category() string { return NODE_CATEGORY_TRANSFORM }
func (f transformScriptNodeFactory) Labels() []string { return []string{"Success", "Failure"} }
func (f transformScriptNodeFactory) Create(id string, meta Metadata) (Node, error) {
labels := []string{"Success", "Failure"}
node := &transformScriptNode{