From b9f0bd744ce672ead6cfb7ad10e7ec898be5e29d Mon Sep 17 00:00:00 2001 From: PandaGoAdmin <18610165312@163.com> Date: Tue, 28 Mar 2023 17:23:11 +0800 Subject: [PATCH] =?UTF-8?q?=E6=9B=B4=E6=96=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pkg/rule_engine/instance_test.go | 24 ++++++- pkg/rule_engine/message/message.go | 57 ++++++++-------- .../nodes/action_assign_to_customer_node.go | 13 ---- .../nodes/action_clear_alarm_node.go | 6 +- .../nodes/action_create_alarm_node.go | 6 +- .../nodes/action_create_relation_node.go | 34 ---------- pkg/rule_engine/nodes/action_delay_node.go | 2 +- .../nodes/action_delete_relation_node.go | 9 --- .../nodes/action_generator_node.go | 7 +- pkg/rule_engine/nodes/action_log_node.go | 8 +-- .../nodes/action_rpc_call_reply_node.go | 9 --- .../nodes/action_rpc_call_request_node.go | 14 ---- .../nodes/action_save_attributes_node.go | 6 +- .../nodes/action_save_timeseries_node.go | 16 +++++ .../action_unassign_from_customer_node.go | 9 --- .../nodes/enrichment_customer_attr_node.go | 23 ------- .../nodes/enrichment_device_attr_node.go | 9 --- .../nodes/enrichment_originator_attr_node.go | 9 --- .../enrichment_originator_fields_node.go | 9 --- .../enrichment_originator_telemetry_node.go | 11 --- .../nodes/enrichment_tenant_attr.go | 17 ----- pkg/rule_engine/nodes/external_ding_node.go | 33 +++++++++ pkg/rule_engine/nodes/external_kafka_node.go | 31 +++++++++ pkg/rule_engine/nodes/external_mq_node.go | 28 ++++++++ pkg/rule_engine/nodes/external_mqtt_node.go | 23 ++++--- .../nodes/external_restapi_node.go | 33 ++++++--- .../nodes/external_rule_chain_node.go | 32 +++++++++ ...il_node.go => external_send_email_node.go} | 18 ++--- .../nodes/external_send_sms_node.go | 31 +++++++++ pkg/rule_engine/nodes/external_wechat_node.go | 33 +++++++++ .../filter_check_existence_fields_node.go | 34 ---------- .../nodes/filter_check_relation_node.go | 67 ------------------- .../nodes/filter_device_type_switch_node.go | 46 +++++++++++++ .../nodes/filter_message_type_node.go | 15 ++--- .../nodes/filter_message_type_switch_node.go | 19 ++++-- .../nodes/filter_originator_type_node.go | 43 ------------ .../filter_originator_type_switch_node.go | 39 ----------- pkg/rule_engine/nodes/filter_script_node.go | 9 ++- pkg/rule_engine/nodes/filter_switch_node.go | 6 +- pkg/rule_engine/nodes/init.go | 26 +++++++ pkg/rule_engine/nodes/input_node.go | 2 +- pkg/rule_engine/nodes/script_engine.go | 25 +++++-- .../nodes/transform_change_originator_node.go | 43 ------------ .../nodes/transform_delete_key_node.go | 52 ++++++++++++++ .../nodes/transform_rename_key_node.go | 58 ++++++++++++++++ .../nodes/transform_script_node.go | 4 +- 46 files changed, 546 insertions(+), 502 deletions(-) delete mode 100644 pkg/rule_engine/nodes/action_assign_to_customer_node.go delete mode 100644 pkg/rule_engine/nodes/action_create_relation_node.go delete mode 100644 pkg/rule_engine/nodes/action_delete_relation_node.go delete mode 100644 pkg/rule_engine/nodes/action_rpc_call_reply_node.go delete mode 100644 pkg/rule_engine/nodes/action_rpc_call_request_node.go delete mode 100644 pkg/rule_engine/nodes/action_unassign_from_customer_node.go delete mode 100644 pkg/rule_engine/nodes/enrichment_customer_attr_node.go delete mode 100644 pkg/rule_engine/nodes/enrichment_device_attr_node.go delete mode 100644 pkg/rule_engine/nodes/enrichment_originator_attr_node.go delete mode 100644 pkg/rule_engine/nodes/enrichment_originator_fields_node.go delete mode 100644 pkg/rule_engine/nodes/enrichment_originator_telemetry_node.go delete mode 100644 pkg/rule_engine/nodes/enrichment_tenant_attr.go create mode 100644 pkg/rule_engine/nodes/external_ding_node.go create mode 100644 pkg/rule_engine/nodes/external_kafka_node.go create mode 100644 pkg/rule_engine/nodes/external_mq_node.go create mode 100644 pkg/rule_engine/nodes/external_rule_chain_node.go rename pkg/rule_engine/nodes/{transform_to_email_node.go => external_send_email_node.go} (62%) create mode 100644 pkg/rule_engine/nodes/external_send_sms_node.go create mode 100644 pkg/rule_engine/nodes/external_wechat_node.go delete mode 100644 pkg/rule_engine/nodes/filter_check_existence_fields_node.go delete mode 100644 pkg/rule_engine/nodes/filter_check_relation_node.go create mode 100644 pkg/rule_engine/nodes/filter_device_type_switch_node.go delete mode 100644 pkg/rule_engine/nodes/filter_originator_type_node.go delete mode 100644 pkg/rule_engine/nodes/filter_originator_type_switch_node.go delete mode 100644 pkg/rule_engine/nodes/transform_change_originator_node.go create mode 100644 pkg/rule_engine/nodes/transform_delete_key_node.go create mode 100644 pkg/rule_engine/nodes/transform_rename_key_node.go diff --git a/pkg/rule_engine/instance_test.go b/pkg/rule_engine/instance_test.go index ae73588..c835239 100644 --- a/pkg/rule_engine/instance_test.go +++ b/pkg/rule_engine/instance_test.go @@ -20,13 +20,16 @@ func TestNewRuleChainInstance(t *testing.T) { func TestScriptEngine(t *testing.T) { metadata := message.NewDefaultMetadata(map[string]interface{}{"device": "aa"}) - msg := message.NewMessageWithDetail("1", message.MessageTypeConnectEvent, []byte{}, metadata) + msg := message.NewMessageWithDetail("1", message.MessageTypeConnectEvent, map[string]interface{}{"aa": 5}, metadata) scriptEngine := nodes.NewScriptEngine() const script = ` function Switch(msg, metadata, msgType) { function nextRelation(metadata, msg) { return ['one','nine']; } + if(metadata.device === 'aa') { + return ['six']; + } if(msgType === 'Post telemetry') { return ['two']; } @@ -40,3 +43,22 @@ func TestScriptEngine(t *testing.T) { } t.Log(SwitchResults) } + +func TestScriptOnMessage(t *testing.T) { + metadata := message.NewDefaultMetadata(map[string]interface{}{"device": "aa"}) + msg := message.NewMessageWithDetail("1", message.MessageTypeConnectEvent, map[string]interface{}{"aa": 5}, metadata) + scriptEngine := nodes.NewScriptEngine() + const script = ` + function Transform(msg, metadata, msgType) { + msg.bb = "33" + metadata.event = 55 + return {msg: msg, metadata: metadata, msgType: msgType}; + } + ` + ScriptOnMessageResults, err := scriptEngine.ScriptOnMessage(msg, script) + + if err != nil { + t.Error(err) + } + t.Log(ScriptOnMessageResults.GetMetadata()) +} diff --git a/pkg/rule_engine/message/message.go b/pkg/rule_engine/message/message.go index 1c78091..74237e5 100644 --- a/pkg/rule_engine/message/message.go +++ b/pkg/rule_engine/message/message.go @@ -6,14 +6,12 @@ import "github.com/sirupsen/logrus" type Message interface { GetOriginator() string GetType() string - GetData() []byte + GetMsg() map[string]interface{} GetMetadata() Metadata SetType(string) - SetData([]byte) + SetMsg(map[string]interface{}) SetOriginator(string) SetMetadata(Metadata) - MarshalBinary() ([]byte, error) - UnmarshalBinary(b []byte) error } // Metadata ... @@ -21,6 +19,7 @@ type Metadata interface { Keys() []string GetKeyValue(key string) interface{} SetKeyValue(key string, val interface{}) + GetValues() map[string]interface{} } // Predefined message types @@ -36,44 +35,41 @@ const ( // NewMessage ... func NewMessage() Message { return &defaultMessage{ - data: []byte{}, + msg: map[string]interface{}{}, } } type defaultMessage struct { - id string //uuid - ts int64 //时间戳 - msgType string //消息类型, attributes(参数),telemetry(遥测),连接事件 - originator string //数据发布者 - customerId string //客户Id UUID - entityId string //实体Id UUID - entityType string //实体类型 设备、资产,用户、规则链 - data []byte //数据 数据结构JSON 设备原始数据 - metadata Metadata //消息的元数据 包括,设备名称,设备类型,命名空间,时间戳等 + id string //uuid + ts int64 //时间戳 + msgType string //消息类型, attributes(参数),telemetry(遥测),Connect连接事件 + originator string //数据发布者 设备 规则链 + customerId string //客户Id UUID + deviceId string //设备Id UUID + msg map[string]interface{} //数据 数据结构JSON 设备原始数据 msg + metadata Metadata //消息的元数据 包括,设备名称,设备类型,命名空间,时间戳等 } // NewMessageWithDetail ... -func NewMessageWithDetail(originator string, messageType string, msg []byte, metadata Metadata) Message { +func NewMessageWithDetail(originator string, messageType string, msg map[string]interface{}, metadata Metadata) Message { return &defaultMessage{ originator: originator, msgType: messageType, - data: msg, + msg: msg, metadata: metadata, } } -func (t *defaultMessage) GetOriginator() string { return t.originator } -func (t *defaultMessage) GetType() string { return t.msgType } -func (t *defaultMessage) GetData() []byte { return t.data } -func (t *defaultMessage) GetMetadata() Metadata { return t.metadata } -func (t *defaultMessage) SetType(msgType string) { t.msgType = msgType } -func (t *defaultMessage) SetData(data []byte) { t.data = data } -func (t *defaultMessage) SetOriginator(originator string) { t.originator = originator } -func (t *defaultMessage) SetMetadata(metadata Metadata) { t.metadata = metadata } - -func (t *defaultMessage) MarshalBinary() ([]byte, error) { return nil, nil } -func (t *defaultMessage) UnmarshalBinary(b []byte) error { return nil } +func (t *defaultMessage) GetOriginator() string { return t.originator } +func (t *defaultMessage) GetType() string { return t.msgType } +func (t *defaultMessage) GetMsg() map[string]interface{} { return t.msg } +func (t *defaultMessage) GetMetadata() Metadata { return t.metadata } +func (t *defaultMessage) SetType(msgType string) { t.msgType = msgType } +func (t *defaultMessage) SetMsg(msg map[string]interface{}) { t.msg = msg } +func (t *defaultMessage) SetOriginator(originator string) { t.originator = originator } +func (t *defaultMessage) SetMetadata(metadata Metadata) { t.metadata = metadata } +// NewMetadata ... func NewMetadata() Metadata { return &defaultMetadata{ values: make(map[string]interface{}), @@ -108,3 +104,10 @@ func (t *defaultMetadata) GetKeyValue(key string) interface{} { func (t *defaultMetadata) SetKeyValue(key string, val interface{}) { t.values[key] = val } + +func (t *defaultMetadata) GetValues() map[string]interface{} { + return t.values +} +func (t *defaultMetadata) SetValues(values map[string]interface{}) { + t.values = values +} diff --git a/pkg/rule_engine/nodes/action_assign_to_customer_node.go b/pkg/rule_engine/nodes/action_assign_to_customer_node.go deleted file mode 100644 index 38ac240..0000000 --- a/pkg/rule_engine/nodes/action_assign_to_customer_node.go +++ /dev/null @@ -1,13 +0,0 @@ -package nodes - -type assignToCustomerNode struct { - bareNode -} - -type assignToCustomerFactory struct{} - -func (f assignToCustomerFactory) Name() string { return "AssignCustomerFactoryNode" } -func (f assignToCustomerFactory) Category() string { return NODE_CATEGORY_ACTION } -func (f assignToCustomerFactory) Create(id string, meta Metadata) (Node, error) { - return nil, nil -} diff --git a/pkg/rule_engine/nodes/action_clear_alarm_node.go b/pkg/rule_engine/nodes/action_clear_alarm_node.go index 6212e72..005459e 100644 --- a/pkg/rule_engine/nodes/action_clear_alarm_node.go +++ b/pkg/rule_engine/nodes/action_clear_alarm_node.go @@ -1,7 +1,7 @@ package nodes import ( - "pandax/pkg/rule_engine/message" + "dz-iot-server/rule_engine/message" "time" "github.com/sirupsen/logrus" @@ -23,10 +23,10 @@ type clearAlarmNode struct { func (f clearAlarmNodeFactory) Name() string { return ClearAlarmNodeName } func (f clearAlarmNodeFactory) Category() string { return NODE_CATEGORY_ACTION } +func (f clearAlarmNodeFactory) Labels() []string { return []string{"Created", "Updated", "Failure"} } func (f clearAlarmNodeFactory) Create(id string, meta Metadata) (Node, error) { - labels := []string{"Created", "Updated", "Failure"} node := &clearAlarmNode{ - bareNode: newBareNode(f.Name(), id, meta, labels), + bareNode: newBareNode(f.Name(), id, meta, f.Labels()), } return decodePath(meta, node) } diff --git a/pkg/rule_engine/nodes/action_create_alarm_node.go b/pkg/rule_engine/nodes/action_create_alarm_node.go index 627cbf7..4021815 100644 --- a/pkg/rule_engine/nodes/action_create_alarm_node.go +++ b/pkg/rule_engine/nodes/action_create_alarm_node.go @@ -1,8 +1,8 @@ package nodes import ( + "dz-iot-server/rule_engine/message" "fmt" - "pandax/pkg/rule_engine/message" "github.com/sirupsen/logrus" ) @@ -21,10 +21,10 @@ type createAlarmNodeFactory struct{} func (f createAlarmNodeFactory) Name() string { return "CreateAlarmNode" } func (f createAlarmNodeFactory) Category() string { return NODE_CATEGORY_ACTION } +func (f createAlarmNodeFactory) Labels() []string { return []string{"Created", "Updated", "Failure"} } func (f createAlarmNodeFactory) Create(id string, meta Metadata) (Node, error) { - labels := []string{"Created", "Updated", "Failure"} node := &createAlarmNode{ - bareNode: newBareNode(f.Name(), id, meta, labels), + bareNode: newBareNode(f.Name(), id, meta, f.Labels()), } return decodePath(meta, node) } diff --git a/pkg/rule_engine/nodes/action_create_relation_node.go b/pkg/rule_engine/nodes/action_create_relation_node.go deleted file mode 100644 index dc05056..0000000 --- a/pkg/rule_engine/nodes/action_create_relation_node.go +++ /dev/null @@ -1,34 +0,0 @@ -package nodes - -import ( - "pandax/pkg/rule_engine/message" -) - -type createRelationNode struct { - bareNode - Direction string - RelationType string - EntityType string - EntityNamePattern string - EntityTypePattern string - EntityCacheExpiration int64 - CreateEntityIfNotExists bool - ChangeOriginatorToRelatedEntity bool - RemoveCurrentRelations bool -} - -type createRelationNodeFactory struct{} - -func (f createRelationNodeFactory) Name() string { return "CreateRelationNode" } -func (f createRelationNodeFactory) Category() string { return NODE_CATEGORY_ACTION } -func (f createRelationNodeFactory) Create(id string, meta Metadata) (Node, error) { - labels := []string{"Success", "Failure"} - node := &createRelationNode{ - bareNode: newBareNode(f.Name(), id, meta, labels), - } - return decodePath(meta, node) -} - -func (n *createRelationNode) Handle(msg message.Message) error { - return nil -} diff --git a/pkg/rule_engine/nodes/action_delay_node.go b/pkg/rule_engine/nodes/action_delay_node.go index 9649ce2..5eb361a 100644 --- a/pkg/rule_engine/nodes/action_delay_node.go +++ b/pkg/rule_engine/nodes/action_delay_node.go @@ -12,8 +12,8 @@ package nodes import ( + "dz-iot-server/rule_engine/message" "fmt" - "pandax/pkg/rule_engine/message" "sync" "time" diff --git a/pkg/rule_engine/nodes/action_delete_relation_node.go b/pkg/rule_engine/nodes/action_delete_relation_node.go deleted file mode 100644 index 7ed5a9f..0000000 --- a/pkg/rule_engine/nodes/action_delete_relation_node.go +++ /dev/null @@ -1,9 +0,0 @@ -package nodes - -type deleteRelationNodeFactory struct{} - -func (f deleteRelationNodeFactory) Name() string { return "DeleteRelationNode" } -func (f deleteRelationNodeFactory) Category() string { return NODE_CATEGORY_ACTION } -func (f deleteRelationNodeFactory) Create(id string, meta Metadata) (Node, error) { - return nil, nil -} diff --git a/pkg/rule_engine/nodes/action_generator_node.go b/pkg/rule_engine/nodes/action_generator_node.go index 2b8a23f..88be560 100644 --- a/pkg/rule_engine/nodes/action_generator_node.go +++ b/pkg/rule_engine/nodes/action_generator_node.go @@ -1,9 +1,9 @@ package nodes import ( + "dz-iot-server/rule_engine/message" "fmt" "github.com/sirupsen/logrus" - "pandax/pkg/rule_engine/message" ) type messageGeneratorNode struct { @@ -16,11 +16,10 @@ type messageGeneratorNodeFactory struct{} func (f messageGeneratorNodeFactory) Name() string { return "MessageGeneratorNode" } func (f messageGeneratorNodeFactory) Category() string { return NODE_CATEGORY_ACTION } - +func (f messageGeneratorNodeFactory) Labels() []string { return []string{"Created", "Updated"} } func (f messageGeneratorNodeFactory) Create(id string, meta Metadata) (Node, error) { - labels := []string{"Created", "Updated"} node := &messageGeneratorNode{ - bareNode: newBareNode(f.Name(), id, meta, labels), + bareNode: newBareNode(f.Name(), id, meta, f.Labels()), } return decodePath(meta, node) } diff --git a/pkg/rule_engine/nodes/action_log_node.go b/pkg/rule_engine/nodes/action_log_node.go index 6712388..8080baa 100644 --- a/pkg/rule_engine/nodes/action_log_node.go +++ b/pkg/rule_engine/nodes/action_log_node.go @@ -1,24 +1,24 @@ package nodes import ( + "dz-iot-server/rule_engine/message" "fmt" "log" - "pandax/pkg/rule_engine/message" ) type logNode struct { bareNode - Script string + Script string `json:"script"` } type logNodeFactory struct{} func (f logNodeFactory) Name() string { return "LogNode" } func (f logNodeFactory) Category() string { return NODE_CATEGORY_ACTION } +func (f logNodeFactory) Labels() []string { return []string{"Success", "Failure"} } func (f logNodeFactory) Create(id string, meta Metadata) (Node, error) { - labels := []string{"Success", "Failure"} node := &logNode{ - bareNode: newBareNode(f.Name(), id, meta, labels), + bareNode: newBareNode(f.Name(), id, meta, f.Labels()), } return decodePath(meta, node) } diff --git a/pkg/rule_engine/nodes/action_rpc_call_reply_node.go b/pkg/rule_engine/nodes/action_rpc_call_reply_node.go deleted file mode 100644 index 64b5c07..0000000 --- a/pkg/rule_engine/nodes/action_rpc_call_reply_node.go +++ /dev/null @@ -1,9 +0,0 @@ -package nodes - -type rpcCallReplyNodeFactory struct{} - -func (f rpcCallReplyNodeFactory) Name() string { return "RPCCallReplyNode" } -func (f rpcCallReplyNodeFactory) Category() string { return NODE_CATEGORY_ACTION } -func (f rpcCallReplyNodeFactory) Create(id string, meta Metadata) (Node, error) { - return nil, nil -} diff --git a/pkg/rule_engine/nodes/action_rpc_call_request_node.go b/pkg/rule_engine/nodes/action_rpc_call_request_node.go deleted file mode 100644 index e162009..0000000 --- a/pkg/rule_engine/nodes/action_rpc_call_request_node.go +++ /dev/null @@ -1,14 +0,0 @@ -package nodes - -type rPCCallRequestNode struct { - bareNode - TimeoutInSeconds int -} - -type rpcCallRequestNodeFactory struct{} - -func (f rpcCallRequestNodeFactory) Name() string { return "RPCCallRequestNode" } -func (f rpcCallRequestNodeFactory) Category() string { return NODE_CATEGORY_ACTION } -func (f rpcCallRequestNodeFactory) Create(id string, meta Metadata) (Node, error) { - return nil, nil -} diff --git a/pkg/rule_engine/nodes/action_save_attributes_node.go b/pkg/rule_engine/nodes/action_save_attributes_node.go index 5947ad8..5283fda 100644 --- a/pkg/rule_engine/nodes/action_save_attributes_node.go +++ b/pkg/rule_engine/nodes/action_save_attributes_node.go @@ -1,8 +1,8 @@ package nodes import ( + "dz-iot-server/rule_engine/message" "fmt" - "pandax/pkg/rule_engine/message" ) type SaveAttributesNode struct { @@ -13,10 +13,10 @@ type saveAttributesNodeFactory struct{} func (f saveAttributesNodeFactory) Name() string { return "SaveAttributesNode" } func (f saveAttributesNodeFactory) Category() string { return NODE_CATEGORY_ACTION } +func (f saveAttributesNodeFactory) Labels() []string { return []string{"Success", "Failure"} } func (f saveAttributesNodeFactory) Create(id string, meta Metadata) (Node, error) { - labels := []string{"Success", "Failure"} node := &SaveAttributesNode{ - bareNode: newBareNode(f.Name(), id, meta, labels), + bareNode: newBareNode(f.Name(), id, meta, f.Labels()), } return decodePath(meta, node) } diff --git a/pkg/rule_engine/nodes/action_save_timeseries_node.go b/pkg/rule_engine/nodes/action_save_timeseries_node.go index bfb96b8..f88e985 100644 --- a/pkg/rule_engine/nodes/action_save_timeseries_node.go +++ b/pkg/rule_engine/nodes/action_save_timeseries_node.go @@ -1,9 +1,25 @@ package nodes +import ( + "dz-iot-server/rule_engine/message" +) + +type SaveTimeSeriesNode struct { + bareNode +} + type saveTimeSeriesNodeFactory struct{} func (f saveTimeSeriesNodeFactory) Name() string { return "SaveTimeSeriesNode" } func (f saveTimeSeriesNodeFactory) Category() string { return NODE_CATEGORY_ACTION } +func (f saveTimeSeriesNodeFactory) Labels() []string { return []string{"Success", "Failure"} } func (f saveTimeSeriesNodeFactory) Create(id string, meta Metadata) (Node, error) { return nil, nil } + +func (n *SaveTimeSeriesNode) Handle(msg message.Message) error { + successLableNode := n.GetLinkedNode("Success") + //failureLableNode := n.GetLinkedNode("Failure") + + return successLableNode.Handle(msg) +} diff --git a/pkg/rule_engine/nodes/action_unassign_from_customer_node.go b/pkg/rule_engine/nodes/action_unassign_from_customer_node.go deleted file mode 100644 index 7576d27..0000000 --- a/pkg/rule_engine/nodes/action_unassign_from_customer_node.go +++ /dev/null @@ -1,9 +0,0 @@ -package nodes - -type unassignFromCustomerNodeFactory struct{} - -func (f unassignFromCustomerNodeFactory) Name() string { return "UnassignFromCustomerNode" } -func (f unassignFromCustomerNodeFactory) Category() string { return NODE_CATEGORY_ACTION } -func (f unassignFromCustomerNodeFactory) Create(id string, meta Metadata) (Node, error) { - return nil, nil -} diff --git a/pkg/rule_engine/nodes/enrichment_customer_attr_node.go b/pkg/rule_engine/nodes/enrichment_customer_attr_node.go deleted file mode 100644 index 0c9145a..0000000 --- a/pkg/rule_engine/nodes/enrichment_customer_attr_node.go +++ /dev/null @@ -1,23 +0,0 @@ -package nodes - -import "pandax/pkg/rule_engine/message" - -type enrichmentCustomerNode struct { - bareNode -} - -type enrichmentCustomerAttrNodeFactory struct{} - -func (f enrichmentCustomerAttrNodeFactory) Name() string { return "EnrichmentCustomerNode" } -func (f enrichmentCustomerAttrNodeFactory) Category() string { return NODE_CATEGORY_ENRICHMENT } -func (f enrichmentCustomerAttrNodeFactory) Create(id string, meta Metadata) (Node, error) { - labels := []string{"Success", "Failure"} - node := &enrichmentCustomerNode{ - bareNode: newBareNode(f.Name(), id, meta, labels), - } - return decodePath(meta, node) -} - -func (n *enrichmentCustomerNode) Handle(msg message.Message) error { - return nil -} diff --git a/pkg/rule_engine/nodes/enrichment_device_attr_node.go b/pkg/rule_engine/nodes/enrichment_device_attr_node.go deleted file mode 100644 index 35d414e..0000000 --- a/pkg/rule_engine/nodes/enrichment_device_attr_node.go +++ /dev/null @@ -1,9 +0,0 @@ -package nodes - -type enrichmentDeviceAttrNodeFactory struct{} - -func (f enrichmentDeviceAttrNodeFactory) Name() string { return "EnrichmentDeviceAttrbute" } -func (f enrichmentDeviceAttrNodeFactory) Category() string { return NODE_CATEGORY_ENRICHMENT } -func (f enrichmentDeviceAttrNodeFactory) Create(id string, meta Metadata) (Node, error) { - return nil, nil -} diff --git a/pkg/rule_engine/nodes/enrichment_originator_attr_node.go b/pkg/rule_engine/nodes/enrichment_originator_attr_node.go deleted file mode 100644 index 046d552..0000000 --- a/pkg/rule_engine/nodes/enrichment_originator_attr_node.go +++ /dev/null @@ -1,9 +0,0 @@ -package nodes - -type enrichmentOriginatorAttrNodeFactory struct{} - -func (f enrichmentOriginatorAttrNodeFactory) Name() string { return "EnrichmentOriginatorAttribute" } -func (f enrichmentOriginatorAttrNodeFactory) Category() string { return NODE_CATEGORY_ENRICHMENT } -func (f enrichmentOriginatorAttrNodeFactory) Create(id string, meta Metadata) (Node, error) { - return nil, nil -} diff --git a/pkg/rule_engine/nodes/enrichment_originator_fields_node.go b/pkg/rule_engine/nodes/enrichment_originator_fields_node.go deleted file mode 100644 index 0b36b1f..0000000 --- a/pkg/rule_engine/nodes/enrichment_originator_fields_node.go +++ /dev/null @@ -1,9 +0,0 @@ -package nodes - -type enrichmentOriginatorFieldsNodeFactory struct{} - -func (f enrichmentOriginatorFieldsNodeFactory) Name() string { return "EnrichmentOriginatorFieldsNode" } -func (f enrichmentOriginatorFieldsNodeFactory) Category() string { return NODE_CATEGORY_ENRICHMENT } -func (f enrichmentOriginatorFieldsNodeFactory) Create(id string, meta Metadata) (Node, error) { - return nil, nil -} diff --git a/pkg/rule_engine/nodes/enrichment_originator_telemetry_node.go b/pkg/rule_engine/nodes/enrichment_originator_telemetry_node.go deleted file mode 100644 index a95ec00..0000000 --- a/pkg/rule_engine/nodes/enrichment_originator_telemetry_node.go +++ /dev/null @@ -1,11 +0,0 @@ -package nodes - -type enrichmentOriginatorTelemetryNodeFactory struct{} - -func (f enrichmentOriginatorTelemetryNodeFactory) Name() string { - return "EnrichmentOriginatorTelemetryNode" -} -func (f enrichmentOriginatorTelemetryNodeFactory) Category() string { return NODE_CATEGORY_ENRICHMENT } -func (f enrichmentOriginatorTelemetryNodeFactory) Create(id string, meta Metadata) (Node, error) { - return nil, nil -} diff --git a/pkg/rule_engine/nodes/enrichment_tenant_attr.go b/pkg/rule_engine/nodes/enrichment_tenant_attr.go deleted file mode 100644 index 30bde52..0000000 --- a/pkg/rule_engine/nodes/enrichment_tenant_attr.go +++ /dev/null @@ -1,17 +0,0 @@ -package nodes - -type enrichmentTenantNode struct { - bareNode -} - -type enrichmentTenantNodeFactory struct{} - -func (f enrichmentTenantNodeFactory) Name() string { return "EnrichmentTenantNode" } -func (f enrichmentTenantNodeFactory) Category() string { return NODE_CATEGORY_ENRICHMENT } -func (f enrichmentTenantNodeFactory) Create(id string, meta Metadata) (Node, error) { - labels := []string{"Success", "Failure"} - node := &enrichmentTenantNode{ - bareNode: newBareNode(f.Name(), id, meta, labels), - } - return decodePath(meta, node) -} diff --git a/pkg/rule_engine/nodes/external_ding_node.go b/pkg/rule_engine/nodes/external_ding_node.go new file mode 100644 index 0000000..4b8eb01 --- /dev/null +++ b/pkg/rule_engine/nodes/external_ding_node.go @@ -0,0 +1,33 @@ +package nodes + +import ( + "dz-iot-server/rule_engine/message" + "github.com/sirupsen/logrus" +) + +type externalDingNode struct { + bareNode + WebHook string `json:"webHook" yaml:"webHook"` + MsgType string `json:"msgType" yaml:"msgType"` + Content string `json:"content" yaml:"content"` + IsAtAll bool `json:"isAtAll" yaml:"isAtAll"` + atMobiles []string `json:"atMobiles" yaml:"atMobiles"` +} + +type externalDingNodeFactory struct{} + +func (f externalDingNodeFactory) Name() string { return "ExternalDingNode" } +func (f externalDingNodeFactory) Category() string { return NODE_CATEGORY_EXTERNAL } +func (f externalDingNodeFactory) Labels() []string { return []string{"Success", "Failure"} } +func (f externalDingNodeFactory) Create(id string, meta Metadata) (Node, error) { + node := &externalDingNode{ + bareNode: newBareNode(f.Name(), id, meta, f.Labels()), + } + return decodePath(meta, node) +} + +func (n *externalDingNode) Handle(msg message.Message) error { + logrus.Infof("%s handle message '%s'", n.Name(), msg.GetType()) + + return nil +} diff --git a/pkg/rule_engine/nodes/external_kafka_node.go b/pkg/rule_engine/nodes/external_kafka_node.go new file mode 100644 index 0000000..5c112dd --- /dev/null +++ b/pkg/rule_engine/nodes/external_kafka_node.go @@ -0,0 +1,31 @@ +package nodes + +import ( + "dz-iot-server/rule_engine/message" + "github.com/sirupsen/logrus" +) + +type externalKafkaNode struct { + bareNode + Server string `json:"server" yaml:"server"` + Topic string `json:"topic" yaml:"topic"` + KafkaCli string +} + +type externalKafkaNodeFactory struct{} + +func (f externalKafkaNodeFactory) Name() string { return "ExternalKafkaNode" } +func (f externalKafkaNodeFactory) Category() string { return NODE_CATEGORY_EXTERNAL } +func (f externalKafkaNodeFactory) Labels() []string { return []string{"Success", "Failure"} } +func (f externalKafkaNodeFactory) Create(id string, meta Metadata) (Node, error) { + node := &externalKafkaNode{ + bareNode: newBareNode(f.Name(), id, meta, f.Labels()), + } + return decodePath(meta, node) +} + +func (n *externalKafkaNode) Handle(msg message.Message) error { + logrus.Infof("%s handle message '%s'", n.Name(), msg.GetType()) + + return nil +} diff --git a/pkg/rule_engine/nodes/external_mq_node.go b/pkg/rule_engine/nodes/external_mq_node.go new file mode 100644 index 0000000..eb341b9 --- /dev/null +++ b/pkg/rule_engine/nodes/external_mq_node.go @@ -0,0 +1,28 @@ +package nodes + +import ( + "dz-iot-server/rule_engine/message" + "github.com/sirupsen/logrus" +) + +type externalMqNode struct { + bareNode +} + +type externalMqNodeFactory struct{} + +func (f externalMqNodeFactory) Name() string { return "ExternalMqNode" } +func (f externalMqNodeFactory) Category() string { return NODE_CATEGORY_EXTERNAL } +func (f externalMqNodeFactory) Labels() []string { return []string{"Success", "Failure"} } +func (f externalMqNodeFactory) Create(id string, meta Metadata) (Node, error) { + node := &externalMqNode{ + bareNode: newBareNode(f.Name(), id, meta, f.Labels()), + } + return decodePath(meta, node) +} + +func (n *externalMqNode) Handle(msg message.Message) error { + logrus.Infof("%s handle message '%s'", n.Name(), msg.GetType()) + + return nil +} diff --git a/pkg/rule_engine/nodes/external_mqtt_node.go b/pkg/rule_engine/nodes/external_mqtt_node.go index 5c66ff1..b0172e4 100644 --- a/pkg/rule_engine/nodes/external_mqtt_node.go +++ b/pkg/rule_engine/nodes/external_mqtt_node.go @@ -1,8 +1,9 @@ package nodes import ( + "dz-iot-server/rule_engine/message" + "encoding/json" "fmt" - "pandax/pkg/rule_engine/message" "time" mqtt "github.com/eclipse/paho.mqtt.golang" @@ -11,13 +12,13 @@ import ( type externalMqttNode struct { bareNode - TopicPattern string - Host string - Port string - ConnectTimeoutSec int - ClientId string - CleanSession bool - Ssl bool + TopicPattern string `json:"topicPattern"` + Host string `json:"host"` + Port string `json:"port"` + ConnectTimeoutSec int `json:"connectTimeoutSec"` + ClientId string `json:"clientId"` + CleanSession bool `json:"cleanSession"` + Ssl bool `json:"ssl"` MqttCli mqtt.Client } @@ -25,11 +26,11 @@ type externalMqttNodeFactory struct{} func (f externalMqttNodeFactory) Name() string { return "ExternalMqttNode" } func (f externalMqttNodeFactory) Category() string { return NODE_CATEGORY_EXTERNAL } +func (f externalMqttNodeFactory) Labels() []string { return []string{"Success", "Failure"} } func (f externalMqttNodeFactory) Create(id string, meta Metadata) (Node, error) { - labels := []string{"Success", "Failure"} node := &externalMqttNode{ - bareNode: newBareNode(f.Name(), id, meta, labels), + bareNode: newBareNode(f.Name(), id, meta, f.Labels()), } broker := fmt.Sprintf("tcp://%s:%s", node.Host, node.Port) opts := mqtt.NewClientOptions().AddBroker(broker) @@ -51,7 +52,7 @@ func (n *externalMqttNode) Handle(msg message.Message) error { successLabelNode := n.GetLinkedNode("Success") failureLabelNode := n.GetLinkedNode("Failure") topic := n.TopicPattern //need fix add msg.metadata in it - sendmqttmsg, err := msg.MarshalBinary() + sendmqttmsg, err := json.Marshal(msg.GetMsg()) if err != nil { return nil } diff --git a/pkg/rule_engine/nodes/external_restapi_node.go b/pkg/rule_engine/nodes/external_restapi_node.go index 8457b7e..9d151e6 100644 --- a/pkg/rule_engine/nodes/external_restapi_node.go +++ b/pkg/rule_engine/nodes/external_restapi_node.go @@ -1,26 +1,37 @@ package nodes +import ( + "dz-iot-server/rule_engine/message" + "github.com/sirupsen/logrus" +) + type externalRestapiNode struct { bareNode - RestEndpointUrlPattern string - RequestMethod string - headers map[string]string - UseSimpleClientHttpFactory bool - ReadTimeoutMs int - MaxParallelRequestsCount int - UseRedisQueueForMsgPersistence bool - trimQueue bool - MaxQueueSize int + RestEndpointUrlPattern string `json:"restEndpointUrlPattern" yaml:"restEndpointUrlPattern"` + RequestMethod string `json:"requestMethod" yaml:"requestMethod"` + headers map[string]string `json:"headers" yaml:"headers"` + UseSimpleClientHttpFactory bool `json:"useSimpleClientHttpFactory" yaml:"useSimpleClientHttpFactory"` + ReadTimeoutMs int `json:"readTimeoutMs" yaml:"readTimeoutMs"` + MaxParallelRequestsCount int `json:"maxParallelRequestsCount" yaml:"maxParallelRequestsCount"` + UseRedisQueueForMsgPersistence bool `json:"useRedisQueueForMsgPersistence" yaml:"useRedisQueueForMsgPersistence"` + trimQueue bool `json:"trimQueue" yaml:"trimQueue"` + MaxQueueSize int `json:"maxQueueSize" yaml:"maxQueueSize"` } type externalRestapiNodeFactory struct{} func (f externalRestapiNodeFactory) Name() string { return "ExternalRestapiNode" } func (f externalRestapiNodeFactory) Category() string { return NODE_CATEGORY_EXTERNAL } +func (f externalRestapiNodeFactory) Labels() []string { return []string{"Success", "Failure"} } func (f externalRestapiNodeFactory) Create(id string, meta Metadata) (Node, error) { - labels := []string{"True", "False"} node := &externalRestapiNode{ - bareNode: newBareNode(f.Name(), id, meta, labels), + bareNode: newBareNode(f.Name(), id, meta, f.Labels()), } return decodePath(meta, node) } + +func (n *externalRestapiNode) Handle(msg message.Message) error { + logrus.Infof("%s handle message '%s'", n.Name(), msg.GetType()) + + return nil +} diff --git a/pkg/rule_engine/nodes/external_rule_chain_node.go b/pkg/rule_engine/nodes/external_rule_chain_node.go new file mode 100644 index 0000000..6d344e1 --- /dev/null +++ b/pkg/rule_engine/nodes/external_rule_chain_node.go @@ -0,0 +1,32 @@ +package nodes + +import ( + "dz-iot-server/rule_engine/message" + "github.com/sirupsen/logrus" +) + +type externalRuleChainNode struct { + bareNode + RuleId string `json:"ruleId" yaml:"ruleId"` +} + +type externalRuleChainNodeFactory struct{} + +func (f externalRuleChainNodeFactory) Name() string { return "ExternalRuleChainNode" } +func (f externalRuleChainNodeFactory) Category() string { return NODE_CATEGORY_EXTERNAL } +func (f externalRuleChainNodeFactory) Labels() []string { return []string{"Success", "Failure"} } +func (f externalRuleChainNodeFactory) Create(id string, meta Metadata) (Node, error) { + labels := []string{"Success", "Failure"} + + node := &externalRuleChainNode{ + bareNode: newBareNode(f.Name(), id, meta, labels), + } + + return decodePath(meta, node) +} + +func (n *externalRuleChainNode) Handle(msg message.Message) error { + logrus.Infof("%s handle message '%s'", n.Name(), msg.GetType()) + + return nil +} diff --git a/pkg/rule_engine/nodes/transform_to_email_node.go b/pkg/rule_engine/nodes/external_send_email_node.go similarity index 62% rename from pkg/rule_engine/nodes/transform_to_email_node.go rename to pkg/rule_engine/nodes/external_send_email_node.go index a5608ee..b9a7832 100644 --- a/pkg/rule_engine/nodes/transform_to_email_node.go +++ b/pkg/rule_engine/nodes/external_send_email_node.go @@ -1,11 +1,11 @@ package nodes import ( + "dz-iot-server/rule_engine/message" "github.com/sirupsen/logrus" - "pandax/pkg/rule_engine/message" ) -type transformToEmailNode struct { +type externalSendEmailNode struct { bareNode From string `json:"from" yaml:"from"` To string `json:"to" yaml:"to"` @@ -15,21 +15,21 @@ type transformToEmailNode struct { Body string `json:"body" yaml:"body"` } -type transformToEmailNodeFactory struct{} +type externalSendEmailNodeFactory struct{} -func (f transformToEmailNodeFactory) Name() string { return "TransformToEmailNode" } -func (f transformToEmailNodeFactory) Category() string { return NODE_CATEGORY_TRANSFORM } - -func (f transformToEmailNodeFactory) Create(id string, meta Metadata) (Node, error) { +func (f externalSendEmailNodeFactory) Name() string { return "ExternalSendEmailNode" } +func (f externalSendEmailNodeFactory) Category() string { return NODE_CATEGORY_EXTERNAL } +func (f externalSendEmailNodeFactory) Labels() []string { return []string{"Success", "Failure"} } +func (f externalSendEmailNodeFactory) Create(id string, meta Metadata) (Node, error) { labels := []string{"Success", "Failure"} - node := &transformToEmailNode{ + node := &externalSendEmailNode{ bareNode: newBareNode(f.Name(), id, meta, labels), } return decodePath(meta, node) } -func (n *transformToEmailNode) Handle(msg message.Message) error { +func (n *externalSendEmailNode) Handle(msg message.Message) error { logrus.Infof("%s handle message '%s'", n.Name(), msg.GetType()) successLabelNode := n.GetLinkedNode("Success") diff --git a/pkg/rule_engine/nodes/external_send_sms_node.go b/pkg/rule_engine/nodes/external_send_sms_node.go new file mode 100644 index 0000000..0ff3e70 --- /dev/null +++ b/pkg/rule_engine/nodes/external_send_sms_node.go @@ -0,0 +1,31 @@ +package nodes + +import ( + "dz-iot-server/rule_engine/message" + "github.com/sirupsen/logrus" +) + +type externalSendSmsNode struct { + bareNode +} + +type externalSendSmsNodeFactory struct{} + +func (f externalSendSmsNodeFactory) Name() string { return "ExternalSendSmslNode" } +func (f externalSendSmsNodeFactory) Category() string { return NODE_CATEGORY_EXTERNAL } +func (f externalSendSmsNodeFactory) Labels() []string { return []string{"Success", "Failure"} } +func (f externalSendSmsNodeFactory) Create(id string, meta Metadata) (Node, error) { + node := &externalSendSmsNode{ + bareNode: newBareNode(f.Name(), id, meta, f.Labels()), + } + return decodePath(meta, node) +} + +func (n *externalSendSmsNode) Handle(msg message.Message) error { + logrus.Infof("%s handle message '%s'", n.Name(), msg.GetType()) + + successLabelNode := n.GetLinkedNode("Success") + //failureLabelNode := n.GetLinkedNode("Failure") + + return successLabelNode.Handle(msg) +} diff --git a/pkg/rule_engine/nodes/external_wechat_node.go b/pkg/rule_engine/nodes/external_wechat_node.go new file mode 100644 index 0000000..dec8b46 --- /dev/null +++ b/pkg/rule_engine/nodes/external_wechat_node.go @@ -0,0 +1,33 @@ +package nodes + +import ( + "dz-iot-server/rule_engine/message" + "github.com/sirupsen/logrus" +) + +type externalWechatNode struct { + bareNode + WebHook string `json:"webHook" yaml:"webHook"` + MsgType string `json:"msgType" yaml:"msgType"` + Content string `json:"content" yaml:"content"` + IsAtAll bool `json:"isAtAll" yaml:"isAtAll"` + atMobiles []string `json:"atMobiles" yaml:"atMobiles"` +} + +type externalWechatNodeFactory struct{} + +func (f externalWechatNodeFactory) Name() string { return "ExternalWechatNode" } +func (f externalWechatNodeFactory) Category() string { return NODE_CATEGORY_EXTERNAL } +func (f externalWechatNodeFactory) Labels() []string { return []string{"Success", "Failure"} } +func (f externalWechatNodeFactory) Create(id string, meta Metadata) (Node, error) { + node := &externalWechatNode{ + bareNode: newBareNode(f.Name(), id, meta, f.Labels()), + } + return decodePath(meta, node) +} + +func (n *externalWechatNode) Handle(msg message.Message) error { + logrus.Infof("%s handle message '%s'", n.Name(), msg.GetType()) + + return nil +} diff --git a/pkg/rule_engine/nodes/filter_check_existence_fields_node.go b/pkg/rule_engine/nodes/filter_check_existence_fields_node.go deleted file mode 100644 index a737654..0000000 --- a/pkg/rule_engine/nodes/filter_check_existence_fields_node.go +++ /dev/null @@ -1,34 +0,0 @@ -package nodes - -import ( - "github.com/sirupsen/logrus" - "pandax/pkg/rule_engine/message" -) - -//检查关联关系 -//该消息来自与哪个实体或到那个实体 -type checkExistenceFieldsNode struct { - bareNode -} - -type checkExistenceFieldsNodeFactory struct{} - -func (f checkExistenceFieldsNodeFactory) Name() string { return "CheckExistenceFieldsNode" } -func (f checkExistenceFieldsNodeFactory) Category() string { return NODE_CATEGORY_FILTER } - -func (f checkExistenceFieldsNodeFactory) Create(id string, meta Metadata) (Node, error) { - labels := []string{"True", "False"} - node := &checkExistenceFieldsNode{ - bareNode: newBareNode(f.Name(), id, meta, labels), - } - return decodePath(meta, node) -} - -func (n *checkExistenceFieldsNode) Handle(msg message.Message) error { - logrus.Infof("%s handle message '%s'", n.Name(), msg.GetType()) - - //trueLabelNode := n.GetLinkedNode("True") - //falseLabelNode := n.GetLinkedNode("False") - - return nil -} diff --git a/pkg/rule_engine/nodes/filter_check_relation_node.go b/pkg/rule_engine/nodes/filter_check_relation_node.go deleted file mode 100644 index e33ecee..0000000 --- a/pkg/rule_engine/nodes/filter_check_relation_node.go +++ /dev/null @@ -1,67 +0,0 @@ -package nodes - -import ( - "github.com/sirupsen/logrus" - "pandax/pkg/rule_engine/message" -) - -const ( - RelationTypeContains = "Contains" - RelationTypeNotContains = "NotContains" -) - -//检查关联关系 -//该消息来自与哪个实体或到那个实体 -type checkRelationFilterNode struct { - bareNode - Direction string `json:"direction" yaml:"direction"` - RelationType string `json:"relationType" yaml:"relationType"` - InstanceType string `json:"instanceType" yaml:"instanceType"` - Values []string `json:"values" yaml:"values"` -} - -type checkRelationFilterNodeFactory struct{} - -func (f checkRelationFilterNodeFactory) Name() string { return "CheckRelationFilterNode" } -func (f checkRelationFilterNodeFactory) Category() string { return NODE_CATEGORY_FILTER } - -func (f checkRelationFilterNodeFactory) Create(id string, meta Metadata) (Node, error) { - labels := []string{"True", "False"} - node := &checkRelationFilterNode{ - bareNode: newBareNode(f.Name(), id, meta, labels), - Values: []string{}, - } - return decodePath(meta, node) -} - -func (n *checkRelationFilterNode) Handle(msg message.Message) error { - logrus.Infof("%s handle message '%s'", n.Name(), msg.GetType()) - - trueLabelNode := n.GetLinkedNode("True") - falseLabelNode := n.GetLinkedNode("False") - - //direction := msg.GetDirection() - attr := msg.GetMetadata().GetKeyValue(n.InstanceType) - switch n.RelationType { - case RelationTypeContains: - for _, val := range n.Values { - // specified attribute exist in names - if attr == val { - return trueLabelNode.Handle(msg) - } - } - // not found - return falseLabelNode.Handle(msg) - - case RelationTypeNotContains: - for _, val := range n.Values { - // specified attribute exist in names - if attr == val { - return falseLabelNode.Handle(msg) - } - } - // not found - return trueLabelNode.Handle(msg) - } - return nil -} diff --git a/pkg/rule_engine/nodes/filter_device_type_switch_node.go b/pkg/rule_engine/nodes/filter_device_type_switch_node.go new file mode 100644 index 0000000..ce6d0ed --- /dev/null +++ b/pkg/rule_engine/nodes/filter_device_type_switch_node.go @@ -0,0 +1,46 @@ +package nodes + +import ( + "dz-iot-server/rule_engine/message" + "fmt" + "github.com/sirupsen/logrus" +) + +const ( + DEVICE = "DEVICE" + GATEWAY = "GATEWAY" +) + +//检查关联关系 +//该消息来自与哪个实体或到那个实体 +type deviceTypeSwitchNode struct { + bareNode +} + +type deviceTypeSwitchNodeFactory struct{} + +func (f deviceTypeSwitchNodeFactory) Name() string { return "DeviceTypeSwitch" } +func (f deviceTypeSwitchNodeFactory) Category() string { return NODE_CATEGORY_FILTER } +func (f deviceTypeSwitchNodeFactory) Labels() []string { return []string{DEVICE, GATEWAY} } +func (f deviceTypeSwitchNodeFactory) Create(id string, meta Metadata) (Node, error) { + node := &deviceTypeSwitchNode{ + bareNode: newBareNode(f.Name(), id, meta, f.Labels()), + } + return decodePath(meta, node) +} + +func (n *deviceTypeSwitchNode) Handle(msg message.Message) error { + logrus.Infof("%s handle message '%s'", n.Name(), msg.GetType()) + + deviceLabelNode := n.GetLinkedNode(DEVICE) + gatewayLabelNode := n.GetLinkedNode(GATEWAY) + + if deviceLabelNode == nil && gatewayLabelNode == nil { + return fmt.Errorf("no device and gateway label linked node in %s", n.Name()) + } + + if msg.GetMetadata().GetKeyValue("deviceType") == DEVICE { + return deviceLabelNode.Handle(msg) + } + return gatewayLabelNode.Handle(msg) +} diff --git a/pkg/rule_engine/nodes/filter_message_type_node.go b/pkg/rule_engine/nodes/filter_message_type_node.go index a61ff92..8e98f86 100644 --- a/pkg/rule_engine/nodes/filter_message_type_node.go +++ b/pkg/rule_engine/nodes/filter_message_type_node.go @@ -1,9 +1,8 @@ package nodes import ( - "fmt" + "dz-iot-server/rule_engine/message" "github.com/sirupsen/logrus" - "pandax/pkg/rule_engine/message" ) type messageTypeFilterNode struct { @@ -15,11 +14,11 @@ type messageTypeFilterNodeFactory struct{} func (f messageTypeFilterNodeFactory) Name() string { return "MessageTypeFilterNode" } func (f messageTypeFilterNodeFactory) Category() string { return NODE_CATEGORY_FILTER } +func (f messageTypeFilterNodeFactory) Labels() []string { return []string{"True", "False"} } func (f messageTypeFilterNodeFactory) Create(id string, meta Metadata) (Node, error) { - labels := []string{"True", "False"} node := &messageTypeFilterNode{ - bareNode: newBareNode(f.Name(), id, meta, labels), + bareNode: newBareNode(f.Name(), id, meta, f.Labels()), MessageTypes: []string{}, } return decodePath(meta, node) @@ -30,16 +29,10 @@ func (n *messageTypeFilterNode) Handle(msg message.Message) error { trueLabelNode := n.GetLinkedNode("True") falseLabelNode := n.GetLinkedNode("False") - if trueLabelNode == nil || falseLabelNode == nil { - return fmt.Errorf("no true or false label linked node in %s", n.Name()) - } messageType := msg.GetType() - // TODO: how to resolve user customized message type dynamically - //userMessageType := msg.GetMetadata().GetKeyValue(n.Metadata().MessageTypeKey) - userMessageType := "TODO" for _, filterType := range n.MessageTypes { - if filterType == messageType || filterType == userMessageType { + if filterType == messageType { return trueLabelNode.Handle(msg) } } diff --git a/pkg/rule_engine/nodes/filter_message_type_switch_node.go b/pkg/rule_engine/nodes/filter_message_type_switch_node.go index b648142..9d6fa0d 100644 --- a/pkg/rule_engine/nodes/filter_message_type_switch_node.go +++ b/pkg/rule_engine/nodes/filter_message_type_switch_node.go @@ -1,8 +1,8 @@ package nodes import ( + "dz-iot-server/rule_engine/message" "fmt" - "pandax/pkg/rule_engine/message" "github.com/sirupsen/logrus" ) @@ -14,7 +14,15 @@ type messageTypeSwitchNodeFactory struct{} func (f messageTypeSwitchNodeFactory) Name() string { return "MessageTypeSwitchNode" } func (f messageTypeSwitchNodeFactory) Category() string { return NODE_CATEGORY_FILTER } -func (f messageTypeSwitchNodeFactory) Labels() []string { return []string{"True", "False"} } +func (f messageTypeSwitchNodeFactory) Labels() []string { + return []string{ + message.MessageTypePostAttributesRequest, + message.MessageTypePostTelemetryRequest, + message.MessageTypeConnectEvent, + message.MessageTypeDisconnectEvent, + "Other", + } +} func (f messageTypeSwitchNodeFactory) Create(id string, meta Metadata) (Node, error) { node := &messageTypeSwitchNode{ bareNode: newBareNode(f.Name(), id, meta, f.Labels()), @@ -27,19 +35,16 @@ func (n *messageTypeSwitchNode) Handle(msg message.Message) error { nodes := n.GetLinkedNodes() messageType := msg.GetType() - messageTypeKey, _ := n.Metadata().Value(NODE_CONFIG_MESSAGE_TYPE_KEY) - userMessageType := msg.GetMetadata().GetKeyValue(messageTypeKey.(string)) for label, node := range nodes { - if messageType == label || userMessageType == label { + if messageType == label { return node.Handle(msg) } } - // if other label exist + // 自定义类型 或 未识别类型 if node := n.GetLinkedNode("Other"); node != nil { return node.Handle(msg) } - // not found return fmt.Errorf("%s no label to handle message", n.Name()) } diff --git a/pkg/rule_engine/nodes/filter_originator_type_node.go b/pkg/rule_engine/nodes/filter_originator_type_node.go deleted file mode 100644 index cb4b1f5..0000000 --- a/pkg/rule_engine/nodes/filter_originator_type_node.go +++ /dev/null @@ -1,43 +0,0 @@ -package nodes - -import ( - "github.com/sirupsen/logrus" - "pandax/pkg/rule_engine/message" -) - -type originatorTypeFilterNode struct { - bareNode - Filters []string `json:"filters" yaml:"filters"` -} - -type originatorFilterNodeFactory struct{} - -func (f originatorFilterNodeFactory) Name() string { return "OriginatorFilterNode" } -func (f originatorFilterNodeFactory) Category() string { return NODE_CATEGORY_FILTER } - -func (f originatorFilterNodeFactory) Create(id string, meta Metadata) (Node, error) { - labels := []string{"True", "False"} - node := &originatorTypeFilterNode{ - bareNode: newBareNode(f.Name(), id, meta, labels), - Filters: []string{}, - } - return decodePath(meta, node) -} - -func (n *originatorTypeFilterNode) Handle(msg message.Message) error { - logrus.Infof("%s handle message '%s'", n.Name(), msg.GetType()) - - trueLabelNode := n.GetLinkedNode("True") - falseLabelNode := n.GetLinkedNode("False") - - //links := n.GetLinks() - originatorType := msg.GetOriginator() - - for _, filter := range n.Filters { - if originatorType == filter { - return trueLabelNode.Handle(msg) - } - } - // not found - return falseLabelNode.Handle(msg) -} diff --git a/pkg/rule_engine/nodes/filter_originator_type_switch_node.go b/pkg/rule_engine/nodes/filter_originator_type_switch_node.go deleted file mode 100644 index d71191b..0000000 --- a/pkg/rule_engine/nodes/filter_originator_type_switch_node.go +++ /dev/null @@ -1,39 +0,0 @@ -package nodes - -import ( - "fmt" - "github.com/sirupsen/logrus" - "pandax/pkg/rule_engine/message" -) - -type originatorTypeSwitchNode struct { - bareNode -} - -type originatorTypeSwitchNodeFactory struct{} - -func (f originatorTypeSwitchNodeFactory) Name() string { return "OriginatorTypeSwitchNode" } -func (f originatorTypeSwitchNodeFactory) Category() string { return NODE_CATEGORY_FILTER } - -func (f originatorTypeSwitchNodeFactory) Create(id string, meta Metadata) (Node, error) { - labels := []string{} - node := &originatorTypeSwitchNode{ - bareNode: newBareNode(f.Name(), id, meta, labels), - } - return decodePath(meta, node) -} - -func (n *originatorTypeSwitchNode) Handle(msg message.Message) error { - logrus.Infof("%s handle message '%s'", n.Name(), msg.GetType()) - - nodes := n.GetLinkedNodes() - originatorType := msg.GetOriginator() - - for label, node := range nodes { - if originatorType == label { - return node.Handle(msg) - } - } - // not found - return fmt.Errorf("%s no label to handle message", n.Name()) -} diff --git a/pkg/rule_engine/nodes/filter_script_node.go b/pkg/rule_engine/nodes/filter_script_node.go index 48516ab..886da26 100644 --- a/pkg/rule_engine/nodes/filter_script_node.go +++ b/pkg/rule_engine/nodes/filter_script_node.go @@ -1,8 +1,8 @@ package nodes import ( + "dz-iot-server/rule_engine/message" "github.com/sirupsen/logrus" - "pandax/pkg/rule_engine/message" ) const ScriptFilterNodeName = "ScriptFilterNode" @@ -14,13 +14,12 @@ type scriptFilterNode struct { type scriptFilterNodeFactory struct{} -func (f scriptFilterNodeFactory) Name() string { return "ScriptFilterNode" } +func (f scriptFilterNodeFactory) Name() string { return ScriptFilterNodeName } func (f scriptFilterNodeFactory) Category() string { return NODE_CATEGORY_FILTER } - +func (f scriptFilterNodeFactory) Labels() []string { return []string{"True", "False"} } func (f scriptFilterNodeFactory) Create(id string, meta Metadata) (Node, error) { - labels := []string{"True", "False"} node := &scriptFilterNode{ - bareNode: newBareNode(f.Name(), id, meta, labels), + bareNode: newBareNode(f.Name(), id, meta, f.Labels()), } return decodePath(meta, node) } diff --git a/pkg/rule_engine/nodes/filter_switch_node.go b/pkg/rule_engine/nodes/filter_switch_node.go index 53c6689..3f37752 100644 --- a/pkg/rule_engine/nodes/filter_switch_node.go +++ b/pkg/rule_engine/nodes/filter_switch_node.go @@ -12,8 +12,8 @@ package nodes import ( + "dz-iot-server/rule_engine/message" "github.com/sirupsen/logrus" - "pandax/pkg/rule_engine/message" ) type switchFilterNode struct { @@ -27,7 +27,7 @@ func (f switchFilterNodeFactory) Name() string { return "SwitchNode" } func (f switchFilterNodeFactory) Category() string { return NODE_CATEGORY_FILTER } func (f switchFilterNodeFactory) Labels() []string { return []string{ - "True", "False", message.MessageTypePostTelemetryRequest, + "Failure", "True", "False", message.MessageTypePostTelemetryRequest, message.MessageTypeConnectEvent, } } @@ -44,7 +44,7 @@ func (n *switchFilterNode) Handle(msg message.Message) error { scriptEngine := NewScriptEngine() SwitchResults, err := scriptEngine.ScriptOnSwitch(msg, n.Scripts) if err != nil { - return err + return nil } nodes := n.GetLinkedNodes() for label, node := range nodes { diff --git a/pkg/rule_engine/nodes/init.go b/pkg/rule_engine/nodes/init.go index bbf47a2..976d87a 100644 --- a/pkg/rule_engine/nodes/init.go +++ b/pkg/rule_engine/nodes/init.go @@ -4,5 +4,31 @@ package nodes func init() { RegisterFactory(inputNodeFactory{}) RegisterFactory(switchFilterNodeFactory{}) + RegisterFactory(scriptFilterNodeFactory{}) + RegisterFactory(messageTypeFilterNodeFactory{}) + RegisterFactory(messageTypeSwitchNodeFactory{}) + RegisterFactory(deviceTypeSwitchNodeFactory{}) + + RegisterFactory(transformDeleteKeyNodeFactory{}) + RegisterFactory(transformRenameKeyNodeFactory{}) + RegisterFactory(transformScriptNodeFactory{}) + + RegisterFactory(createAlarmNodeFactory{}) + RegisterFactory(clearAlarmNodeFactory{}) + RegisterFactory(messageGeneratorNodeFactory{}) + RegisterFactory(logNodeFactory{}) + RegisterFactory(saveAttributesNodeFactory{}) + RegisterFactory(saveTimeSeriesNodeFactory{}) RegisterFactory(delayNodeFactory{}) + + RegisterFactory(externalDingNodeFactory{}) + RegisterFactory(externalWechatNodeFactory{}) + RegisterFactory(externalKafkaNodeFactory{}) + RegisterFactory(externalMqNodeFactory{}) + RegisterFactory(externalMqttNodeFactory{}) + RegisterFactory(externalRestapiNodeFactory{}) + RegisterFactory(externalSendEmailNodeFactory{}) + RegisterFactory(externalSendSmsNodeFactory{}) + RegisterFactory(externalRuleChainNodeFactory{}) + } diff --git a/pkg/rule_engine/nodes/input_node.go b/pkg/rule_engine/nodes/input_node.go index 9fa3c1c..4f30f85 100644 --- a/pkg/rule_engine/nodes/input_node.go +++ b/pkg/rule_engine/nodes/input_node.go @@ -1,8 +1,8 @@ package nodes import ( + "dz-iot-server/rule_engine/message" "github.com/sirupsen/logrus" - "pandax/pkg/rule_engine/message" ) const InputNodeName = "InputNode" diff --git a/pkg/rule_engine/nodes/script_engine.go b/pkg/rule_engine/nodes/script_engine.go index 260b146..b4da5d0 100644 --- a/pkg/rule_engine/nodes/script_engine.go +++ b/pkg/rule_engine/nodes/script_engine.go @@ -23,6 +23,23 @@ func NewScriptEngine() ScriptEngine { } func (bse *baseScriptEngine) ScriptOnMessage(msg message.Message, script string) (message.Message, error) { + vm := goja.New() + _, err := vm.RunString(script) + if err != nil { + logrus.Info("JS代码有问题") + return nil, err + } + var fn func(map[string]interface{}, map[string]interface{}, string) map[string]interface{} + err = vm.ExportTo(vm.Get("Transform"), &fn) + if err != nil { + logrus.Info("Js函数映射到 Go 函数失败!") + return nil, err + } + datas := fn(msg.GetMsg(), msg.GetMetadata().GetValues(), msg.GetType()) + msg.SetMsg(datas["msg"].(map[string]interface{})) + msg.SetMetadata(message.NewDefaultMetadata(datas["metadata"].(map[string]interface{}))) + msg.SetType(datas["msgType"].(string)) + return msg, nil return nil, nil } @@ -34,13 +51,13 @@ func (bse *baseScriptEngine) ScriptOnSwitch(msg message.Message, script string) logrus.Info("JS代码有问题") return nil, err } - var fn func(message.Message, message.Metadata, string) []string + var fn func(map[string]interface{}, map[string]interface{}, string) []string err = vm.ExportTo(vm.Get("Switch"), &fn) if err != nil { logrus.Info("Js函数映射到 Go 函数失败!") return nil, err } - datas := fn(msg, msg.GetMetadata(), msg.GetType()) + datas := fn(msg.GetMsg(), msg.GetMetadata().GetValues(), msg.GetType()) return datas, nil } @@ -51,13 +68,13 @@ func (bse *baseScriptEngine) ScriptOnFilter(msg message.Message, script string) logrus.Info("JS代码有问题") return false, err } - var fn func(message.Message, message.Metadata, string) bool + var fn func(map[string]interface{}, map[string]interface{}, string) bool err = vm.ExportTo(vm.Get("Filter"), &fn) if err != nil { logrus.Info("Js函数映射到 Go 函数失败!") return false, err } - datas := fn(msg, msg.GetMetadata(), msg.GetType()) + datas := fn(msg.GetMsg(), msg.GetMetadata().GetValues(), msg.GetType()) return datas, nil } diff --git a/pkg/rule_engine/nodes/transform_change_originator_node.go b/pkg/rule_engine/nodes/transform_change_originator_node.go deleted file mode 100644 index c7aefc4..0000000 --- a/pkg/rule_engine/nodes/transform_change_originator_node.go +++ /dev/null @@ -1,43 +0,0 @@ -package nodes - -import ( - "github.com/sirupsen/logrus" - "pandax/pkg/rule_engine/message" -) - -type transformChangeOriginatorNode struct { - bareNode - OriginatorSource string `json:"originatorSource" yaml:"originatorSource"` - Direction string `json:"direction" yaml:"direction"` - MaxRelationLevel int `json:"maxRelationLevel" yaml:"maxRelationLevel"` - //RelationFilters []runtime.RelationFilter `json:"relationFilters" yaml:"relationFilters"` -} - -type transformChangeOriginatorNodeFactory struct{} - -func (f transformChangeOriginatorNodeFactory) Name() string { return "TransformChangeOriginatorNode" } -func (f transformChangeOriginatorNodeFactory) Category() string { return NODE_CATEGORY_TRANSFORM } - -func (f transformChangeOriginatorNodeFactory) Create(id string, meta Metadata) (Node, error) { - labels := []string{"Success", "Failure"} - node := &transformChangeOriginatorNode{ - bareNode: newBareNode(f.Name(), id, meta, labels), - //RelationFilters: []runtime.RelationFilter{}, - } - return decodePath(meta, node) -} - -func (n *transformChangeOriginatorNode) Handle(msg message.Message) error { - logrus.Infof("%s handle message '%s'", n.Name(), msg.GetType()) - - //successLabelNode := n.GetLinkedNode("Sucess") - failureLabelNode := n.GetLinkedNode("Failure") - //relationQuery := runtime.NewRelationQuery() - - //entities := relationQuery.QueryEntities(n.Direction, n.MaxRelationLevel, n.RelationFilters) - /*if len(entities) > 0 && entities[0] == msg.GetOriginator() { - msg.SetOriginator(entities[0]) - return successLabelNode.Handle(msg) - }*/ - return failureLabelNode.Handle(msg) -} diff --git a/pkg/rule_engine/nodes/transform_delete_key_node.go b/pkg/rule_engine/nodes/transform_delete_key_node.go new file mode 100644 index 0000000..44ef3f1 --- /dev/null +++ b/pkg/rule_engine/nodes/transform_delete_key_node.go @@ -0,0 +1,52 @@ +package nodes + +import ( + "dz-iot-server/rule_engine/message" + "github.com/sirupsen/logrus" +) + +type transformDeleteKeyNode struct { + bareNode + FormType string `json:"formType" yaml:"formType"` //msg metadata + Keys []string `json:"keys" yaml:"keys"` +} +type transformDeleteKeyNodeFactory struct{} + +func (f transformDeleteKeyNodeFactory) Name() string { return "TransformDeleteKeyNode" } +func (f transformDeleteKeyNodeFactory) Category() string { return NODE_CATEGORY_TRANSFORM } +func (f transformDeleteKeyNodeFactory) Labels() []string { return []string{"Success", "Failure"} } +func (f transformDeleteKeyNodeFactory) Create(id string, meta Metadata) (Node, error) { + node := &transformDeleteKeyNode{ + bareNode: newBareNode(f.Name(), id, meta, f.Labels()), + } + return decodePath(meta, node) +} + +func (n *transformDeleteKeyNode) Handle(msg message.Message) error { + logrus.Infof("%s handle message '%s'", n.Name(), msg.GetType()) + + successLabelNode := n.GetLinkedNode("Success") + failureLabelNode := n.GetLinkedNode("Failure") + if n.FormType == "msg" { + data := msg.GetMsg() + for _, key := range n.Keys { + if _, found := data[key]; found { + delete(data, key) + msg.SetMsg(data) + } + } + } else if n.FormType == "metadata" { + data := msg.GetMetadata() + for _, key := range n.Keys { + if data.GetKeyValue(key) != nil { + values := data.GetValues() + delete(values, key) + msg.SetMetadata(message.NewDefaultMetadata(values)) + } + } + } else { + failureLabelNode.Handle(msg) + } + + return successLabelNode.Handle(msg) +} diff --git a/pkg/rule_engine/nodes/transform_rename_key_node.go b/pkg/rule_engine/nodes/transform_rename_key_node.go new file mode 100644 index 0000000..5c1c0e0 --- /dev/null +++ b/pkg/rule_engine/nodes/transform_rename_key_node.go @@ -0,0 +1,58 @@ +package nodes + +import ( + "dz-iot-server/rule_engine/message" + "github.com/sirupsen/logrus" +) + +type transformRenameKeyNode struct { + bareNode + FormType string `json:"formType" yaml:"formType"` //msg metadata + Keys []KeyName `json:"keys" yaml:"keys"` +} +type KeyName struct { + oldName string `json:"oldName" yaml:"oldName"` + newName string `json:"newName" yaml:"newName"` +} +type transformRenameKeyNodeFactory struct{} + +func (f transformRenameKeyNodeFactory) Name() string { return "TransformRenameKeyNode" } +func (f transformRenameKeyNodeFactory) Category() string { return NODE_CATEGORY_TRANSFORM } +func (f transformRenameKeyNodeFactory) Labels() []string { return []string{"Success", "Failure"} } +func (f transformRenameKeyNodeFactory) Create(id string, meta Metadata) (Node, error) { + node := &transformScriptNode{ + bareNode: newBareNode(f.Name(), id, meta, f.Labels()), + } + return decodePath(meta, node) +} + +func (n *transformRenameKeyNode) Handle(msg message.Message) error { + logrus.Infof("%s handle message '%s'", n.Name(), msg.GetType()) + + successLabelNode := n.GetLinkedNode("Success") + failureLabelNode := n.GetLinkedNode("Failure") + if n.FormType == "msg" { + data := msg.GetMsg() + for _, key := range n.Keys { + if _, found := data[key.oldName]; found { + data[key.newName] = data[key.oldName] + delete(data, key.oldName) + msg.SetMsg(data) + } + } + } else if n.FormType == "metadata" { + data := msg.GetMetadata() + for _, key := range n.Keys { + if data.GetKeyValue(key.oldName) != nil { + values := data.GetValues() + values[key.newName] = values[key.oldName] + delete(values, key.oldName) + msg.SetMetadata(message.NewDefaultMetadata(values)) + } + } + } else { + failureLabelNode.Handle(msg) + } + + return successLabelNode.Handle(msg) +} diff --git a/pkg/rule_engine/nodes/transform_script_node.go b/pkg/rule_engine/nodes/transform_script_node.go index 134511f..079d78e 100644 --- a/pkg/rule_engine/nodes/transform_script_node.go +++ b/pkg/rule_engine/nodes/transform_script_node.go @@ -1,8 +1,8 @@ package nodes import ( + "dz-iot-server/rule_engine/message" "github.com/sirupsen/logrus" - "pandax/pkg/rule_engine/message" ) type transformScriptNode struct { @@ -14,7 +14,7 @@ type transformScriptNodeFactory struct{} func (f transformScriptNodeFactory) Name() string { return "TransformScriptNode" } func (f transformScriptNodeFactory) Category() string { return NODE_CATEGORY_TRANSFORM } - +func (f transformScriptNodeFactory) Labels() []string { return []string{"Success", "Failure"} } func (f transformScriptNodeFactory) Create(id string, meta Metadata) (Node, error) { labels := []string{"Success", "Failure"} node := &transformScriptNode{