mirror of
https://gitee.com/XM-GO/PandaX.git
synced 2026-04-23 02:48:34 +08:00
规则链结构优化
This commit is contained in:
@@ -107,11 +107,6 @@
|
||||
|
||||
更多功能请访问系统。
|
||||
|
||||
|
||||
## ❤特别鸣谢
|
||||
|
||||
* 感谢[VUE-NEXT-ADMIN](https://gitee.com/lyt-top/vue-next-admin)
|
||||
|
||||
---
|
||||
版权说明
|
||||
---
|
||||
|
||||
@@ -685,3 +685,12 @@
|
||||
|
||||
2023-08-24 15:27:41.492 [INFO] [D:/workspace/go/project/PandaX/PandaX/pkg/middleware/log.go:34] [uname=panda] [GET=/system/dict/data/type] [uid=1] : 获取字典数据列表通过字典类型 ->6ms
|
||||
2023-08-24 15:27:41.504 [INFO] [D:/workspace/go/project/PandaX/PandaX/pkg/middleware/log.go:34] [uid=1] [uname=panda] [GET=/log/logLogin/list] : 获取登录日志列表 ->8ms
|
||||
2023-08-24 17:35:27.159 [DEBUG] [D:/workspace/go/project/PandaX/PandaX/pkg/transport/http_server.go:59] : no Route found (in 2 routes) that matches HTTP method OPTIONS
|
||||
|
||||
2023-08-24 17:35:27.197 [INFO] [D:/workspace/go/project/PandaX/PandaX/pkg/middleware/log.go:34] [GET=/visual/screen/zt_08ca17dbc53c916369ffb3d0] [uid=1] [uname=panda] : 获取Screen信息 ->6ms
|
||||
2023-08-24 17:35:28.459 [DEBUG] [D:/workspace/go/project/PandaX/PandaX/pkg/transport/http_server.go:59] : no Route found (in 2 routes) that matches HTTP method OPTIONS
|
||||
|
||||
2023-08-24 17:35:28.491 [INFO] [D:/workspace/go/project/PandaX/PandaX/pkg/middleware/log.go:34] [uname=panda] [GET=/rule/chain/rule_a37571bb6c45378b57803793] [uid=1] : 获取规则引擎信息 ->4ms
|
||||
2023-08-24 17:35:28.749 [DEBUG] [D:/workspace/go/project/PandaX/PandaX/pkg/transport/http_server.go:59] : no Route found (in 3 routes) that matches HTTP method OPTIONS
|
||||
|
||||
2023-08-24 17:35:28.756 [INFO] [D:/workspace/go/project/PandaX/PandaX/pkg/middleware/log.go:34] [GET=/rule/chain/nodeLabels] [uid=1] [uname=panda] : 获取所有节点标签 ->0ms
|
||||
|
||||
@@ -41,7 +41,7 @@ func newInstanceWithManifest(m *manifest.Manifest) (*ruleChainInstance, []error)
|
||||
}
|
||||
|
||||
// StartRuleChain TODO 是否需要添加context
|
||||
func (c *ruleChainInstance) StartRuleChain(context context.Context, message message.Message) error {
|
||||
func (c *ruleChainInstance) StartRuleChain(context context.Context, message *message.Message) error {
|
||||
if node, found := c.nodes[c.firstRuleNodeId]; found {
|
||||
node.Handle(message)
|
||||
}
|
||||
|
||||
@@ -18,13 +18,13 @@ func TestNewRuleChainInstance(t *testing.T) {
|
||||
t.Error(errs[0])
|
||||
}
|
||||
|
||||
metadata := message.NewDefaultMetadata(map[string]interface{}{
|
||||
metadata := message.Metadata{
|
||||
"deviceName": "ws432",
|
||||
"deviceId": "d_1928b99619910dae5a001fa7",
|
||||
"deviceType": "direct",
|
||||
"productId": "p_3ba460634520cf4590dc90e5",
|
||||
})
|
||||
msg := message.NewMessageWithDetail("1", message.TelemetryMes, map[string]interface{}{"temperature": 60.4, "humidity": 32.5}, metadata)
|
||||
}
|
||||
msg := message.NewMessage("1", message.TelemetryMes, message.Msg{"temperature": 60.4, "humidity": 32.5}, metadata)
|
||||
t.Log("开始执行力流程")
|
||||
err = instance.StartRuleChain(context.Background(), msg)
|
||||
if err != nil {
|
||||
@@ -33,8 +33,7 @@ func TestNewRuleChainInstance(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestScriptEngine(t *testing.T) {
|
||||
metadata := message.NewDefaultMetadata(map[string]interface{}{"device": "aa"})
|
||||
msg := message.NewMessageWithDetail("1", message.UpEventMes, map[string]interface{}{"aa": 5}, metadata)
|
||||
msg := message.NewMessage("1", message.UpEventMes, map[string]interface{}{"aa": 5}, map[string]interface{}{"device": "aa"})
|
||||
const baseScript = `
|
||||
function nextRelation(metadata, msg) {
|
||||
return ['one','nine'];
|
||||
@@ -47,7 +46,7 @@ func TestScriptEngine(t *testing.T) {
|
||||
}
|
||||
return nextRelation(metadata, msg);
|
||||
`
|
||||
scriptEngine := nodes.NewScriptEngine(msg, "Switch", baseScript)
|
||||
scriptEngine := nodes.NewScriptEngine(*msg, "Switch", baseScript)
|
||||
SwitchResults, err := scriptEngine.ScriptOnSwitch()
|
||||
|
||||
if err != nil {
|
||||
@@ -57,19 +56,18 @@ func TestScriptEngine(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestScriptOnMessage(t *testing.T) {
|
||||
metadata := message.NewDefaultMetadata(map[string]interface{}{"device": "aa"})
|
||||
msg := message.NewMessageWithDetail("1", message.UpEventMes, map[string]interface{}{"aa": 5}, metadata)
|
||||
msg := message.NewMessage("1", message.UpEventMes, map[string]interface{}{"aa": 5}, map[string]interface{}{"device": "aa"})
|
||||
|
||||
const baseScript = `
|
||||
msg.bb = "33"
|
||||
metadata.event = 55
|
||||
return {msg: msg, metadata: metadata, msgType: msgType};
|
||||
`
|
||||
scriptEngine := nodes.NewScriptEngine(msg, "Transform", baseScript)
|
||||
scriptEngine := nodes.NewScriptEngine(*msg, "Transform", baseScript)
|
||||
ScriptOnMessageResults, err := scriptEngine.ScriptOnMessage()
|
||||
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
t.Log(ScriptOnMessageResults.GetMetadata())
|
||||
t.Log(ScriptOnMessageResults.Metadata)
|
||||
}
|
||||
|
||||
@@ -1,11 +1,6 @@
|
||||
package message
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"github.com/google/uuid"
|
||||
"time"
|
||||
)
|
||||
|
||||
/*
|
||||
// 消息类型
|
||||
const (
|
||||
ConnectMes = "Connect"
|
||||
@@ -154,3 +149,4 @@ func (t *defaultMetadata) GetValues() map[string]interface{} {
|
||||
func (t *defaultMetadata) SetValues(values map[string]interface{}) {
|
||||
t.values = values
|
||||
}
|
||||
*/
|
||||
|
||||
99
pkg/rule_engine/message/message1.go
Normal file
99
pkg/rule_engine/message/message1.go
Normal file
@@ -0,0 +1,99 @@
|
||||
package message
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"github.com/google/uuid"
|
||||
"time"
|
||||
)
|
||||
|
||||
// 消息类型
|
||||
const (
|
||||
ConnectMes = "Connect"
|
||||
DisConnectMes = "Disconnect"
|
||||
RpcRequestMes = "RpcRequest"
|
||||
UpEventMes = "Event"
|
||||
AlarmMes = "Alarm"
|
||||
RowMes = "Row"
|
||||
TelemetryMes = "Telemetry"
|
||||
AttributesMes = "Attributes"
|
||||
)
|
||||
|
||||
// 数据类型Originator
|
||||
const (
|
||||
DEVICE = "DEVICE"
|
||||
GATEWAY = "GATEWAY"
|
||||
MONITOR = "MONITOR" //监控
|
||||
)
|
||||
|
||||
type Msg map[string]interface{}
|
||||
type Metadata map[string]interface{}
|
||||
|
||||
type Message struct {
|
||||
Id string //uuid 消息Id
|
||||
Ts time.Time //时间戳
|
||||
MsgType string //消息类型, attributes(参数),telemetry(遥测),Connect连接事件
|
||||
UserId string //客户Id UUID 设备发布人
|
||||
Msg Msg //数据 数据结构JSON 设备原始数据 msg
|
||||
Metadata Metadata //消息的元数据 包括设备Id,设备类型,产品ID等
|
||||
}
|
||||
|
||||
// NewMessage ...
|
||||
func NewMessage(userId, messageType string, msg Msg, metadata Metadata) *Message {
|
||||
return &Message{
|
||||
Id: uuid.New().String(),
|
||||
Ts: time.Now(),
|
||||
UserId: userId,
|
||||
MsgType: messageType,
|
||||
Msg: msg,
|
||||
Metadata: metadata,
|
||||
}
|
||||
}
|
||||
|
||||
func (t *Message) GetAllMap() map[string]interface{} {
|
||||
data := make(map[string]interface{})
|
||||
for msgKey, msgValue := range t.Msg {
|
||||
for metaKey, metaValue := range t.Metadata {
|
||||
if msgKey == metaKey {
|
||||
data[msgKey] = metaValue
|
||||
} else {
|
||||
if _, ok := data[msgKey]; !ok {
|
||||
data[msgKey] = msgValue
|
||||
}
|
||||
if _, ok := data[metaKey]; !ok {
|
||||
data[metaKey] = metaValue
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return data
|
||||
}
|
||||
|
||||
func (t *Message) MarshalBinary() ([]byte, error) {
|
||||
return json.Marshal(t)
|
||||
}
|
||||
|
||||
func (meta *Metadata) Keys() []string {
|
||||
keys := make([]string, 0)
|
||||
for key := range *meta {
|
||||
keys = append(keys, key)
|
||||
}
|
||||
return keys
|
||||
}
|
||||
|
||||
func (meta *Metadata) GetValue(key string) any {
|
||||
if _, found := (*meta)[key]; !found {
|
||||
return nil
|
||||
}
|
||||
return (*meta)[key]
|
||||
}
|
||||
|
||||
func (meta *Metadata) SetValue(key string, val interface{}) {
|
||||
(*meta)[key] = val
|
||||
}
|
||||
|
||||
func (msg *Msg) GetValue(key string) any {
|
||||
if _, found := (*msg)[key]; !found {
|
||||
return nil
|
||||
}
|
||||
return (*msg)[key]
|
||||
}
|
||||
@@ -29,16 +29,16 @@ func (f clearAlarmNodeFactory) Create(id string, meta Metadata) (Node, error) {
|
||||
return decodePath(meta, node)
|
||||
}
|
||||
|
||||
func (n *clearAlarmNode) Handle(msg message.Message) error {
|
||||
logrus.Infof("%s handle message '%s'", n.Name(), msg.GetType())
|
||||
func (n *clearAlarmNode) Handle(msg *message.Message) error {
|
||||
logrus.Infof("%s handle message '%s'", n.Name(), msg.MsgType)
|
||||
cleared := n.GetLinkedNode("Cleared")
|
||||
failure := n.GetLinkedNode("Failure")
|
||||
|
||||
alarm := services.DeviceAlarmModelDao.FindOneByType(msg.GetMetadata().GetKeyValue("deviceId").(string), n.AlarmType, "0")
|
||||
alarm := services.DeviceAlarmModelDao.FindOneByType(msg.Metadata.GetValue("deviceId").(string), n.AlarmType, "0")
|
||||
if alarm.DeviceId != "" {
|
||||
log.Println("清除告警")
|
||||
alarm.State = global.CLEARED
|
||||
marshal, _ := json.Marshal(msg.GetMsg())
|
||||
marshal, _ := json.Marshal(msg.Msg)
|
||||
alarm.Details = string(marshal)
|
||||
err := services.DeviceAlarmModelDao.Update(*alarm)
|
||||
if err != nil {
|
||||
|
||||
@@ -29,16 +29,16 @@ func (f createAlarmNodeFactory) Create(id string, meta Metadata) (Node, error) {
|
||||
return decodePath(meta, node)
|
||||
}
|
||||
|
||||
func (n *createAlarmNode) Handle(msg message.Message) error {
|
||||
logrus.Infof("%s handle message '%s'", n.Name(), msg.GetType())
|
||||
func (n *createAlarmNode) Handle(msg *message.Message) error {
|
||||
logrus.Infof("%s handle message '%s'", n.Name(), msg.MsgType)
|
||||
|
||||
created := n.GetLinkedNode("Created")
|
||||
updated := n.GetLinkedNode("Updated")
|
||||
failure := n.GetLinkedNode("Failure")
|
||||
|
||||
alarm := services.DeviceAlarmModelDao.FindOneByType(msg.GetMetadata().GetKeyValue("deviceId").(string), n.AlarmType, "0")
|
||||
alarm := services.DeviceAlarmModelDao.FindOneByType(msg.Metadata.GetValue("deviceId").(string), n.AlarmType, "0")
|
||||
if alarm.DeviceId != "" {
|
||||
marshal, _ := json.Marshal(msg.GetMsg())
|
||||
marshal, _ := json.Marshal(msg.Msg)
|
||||
alarm.Details = string(marshal)
|
||||
err := services.DeviceAlarmModelDao.Update(*alarm)
|
||||
if err != nil {
|
||||
@@ -53,14 +53,14 @@ func (n *createAlarmNode) Handle(msg message.Message) error {
|
||||
} else {
|
||||
alarm = &entity.DeviceAlarm{}
|
||||
alarm.Id = kgo.KStr.Uniqid("a")
|
||||
alarm.DeviceId = msg.GetMetadata().GetKeyValue("deviceId").(string)
|
||||
alarm.ProductId = msg.GetMetadata().GetKeyValue("productId").(string)
|
||||
alarm.Name = msg.GetMetadata().GetKeyValue("deviceName").(string)
|
||||
alarm.DeviceId = msg.Metadata.GetValue("deviceId").(string)
|
||||
alarm.ProductId = msg.Metadata.GetValue("productId").(string)
|
||||
alarm.Name = msg.Metadata.GetValue("deviceName").(string)
|
||||
alarm.Level = n.AlarmSeverity
|
||||
alarm.State = global.ALARMING
|
||||
alarm.Type = n.AlarmType
|
||||
alarm.Time = time.Now()
|
||||
marshal, _ := json.Marshal(msg.GetMsg())
|
||||
marshal, _ := json.Marshal(msg.Msg)
|
||||
alarm.Details = string(marshal)
|
||||
err := services.DeviceAlarmModelDao.Insert(*alarm)
|
||||
if err != nil {
|
||||
|
||||
@@ -15,7 +15,7 @@ type delayNode struct {
|
||||
bareNode
|
||||
PeriodTs int `json:"periodTs" yaml:"periodTs" jpath:"periodTs"` //周期时间
|
||||
MaxPendingMessages int `json:"maxPendingMessages" yaml:"maxPendingMessages" jpath:"maxPendingMessages"` //最大等待消息数
|
||||
messageQueue []message.Message `jpath:"-"`
|
||||
messageQueue []*message.Message `jpath:"-"`
|
||||
delayTimer *time.Timer `jpath:"-"`
|
||||
lock sync.Mutex `jpath:"-"`
|
||||
}
|
||||
@@ -31,12 +31,12 @@ func (f delayNodeFactory) Create(id string, meta Metadata) (Node, error) {
|
||||
lock: sync.Mutex{},
|
||||
}
|
||||
_, err := decodePath(meta, node)
|
||||
node.messageQueue = make([]message.Message, node.MaxPendingMessages)
|
||||
node.messageQueue = make([]*message.Message, node.MaxPendingMessages)
|
||||
return node, err
|
||||
}
|
||||
|
||||
func (n *delayNode) Handle(msg message.Message) error {
|
||||
logrus.Infof("%s handle message '%s'", n.Name(), msg.GetType())
|
||||
func (n *delayNode) Handle(msg *message.Message) error {
|
||||
logrus.Infof("%s handle message '%s'", n.Name(), msg.MsgType)
|
||||
|
||||
successLabelNode := n.GetLinkedNode("Success")
|
||||
failureLabelNode := n.GetLinkedNode("Failure")
|
||||
|
||||
@@ -25,8 +25,8 @@ func (f messageGeneratorNodeFactory) Create(id string, meta Metadata) (Node, err
|
||||
return decodePath(meta, node)
|
||||
}
|
||||
|
||||
func (n *messageGeneratorNode) Handle(msg message.Message) error {
|
||||
logrus.Infof("%s handle message '%s'", n.Name(), msg.GetType())
|
||||
func (n *messageGeneratorNode) Handle(msg *message.Message) error {
|
||||
logrus.Infof("%s handle message '%s'", n.Name(), msg.MsgType)
|
||||
|
||||
successLabelNode := n.GetLinkedNode("Success")
|
||||
failureLabelNode := n.GetLinkedNode("Failure")
|
||||
@@ -42,7 +42,7 @@ func (n *messageGeneratorNode) Handle(msg message.Message) error {
|
||||
ticker.Stop()
|
||||
return
|
||||
}
|
||||
scriptEngine := NewScriptEngine(msg, "Generate", n.Script)
|
||||
scriptEngine := NewScriptEngine(*msg, "Generate", n.Script)
|
||||
generate, err := scriptEngine.ScriptGenerate()
|
||||
if err != nil {
|
||||
if failureLabelNode != nil {
|
||||
@@ -50,9 +50,9 @@ func (n *messageGeneratorNode) Handle(msg message.Message) error {
|
||||
}
|
||||
return
|
||||
}
|
||||
msg.SetMsg(generate["msg"].(map[string]interface{}))
|
||||
msg.SetType(generate["msgType"].(string))
|
||||
msg.SetMetadata(message.NewDefaultMetadata(generate["metadata"].(map[string]interface{})))
|
||||
msg.Msg = generate["msg"].(message.Msg)
|
||||
msg.Metadata = generate["metadata"].(message.Metadata)
|
||||
msg.MsgType = generate["msgType"].(string)
|
||||
if successLabelNode != nil {
|
||||
go successLabelNode.Handle(msg)
|
||||
}
|
||||
|
||||
@@ -24,11 +24,11 @@ func (f logNodeFactory) Create(id string, meta Metadata) (Node, error) {
|
||||
return decodePath(meta, node)
|
||||
}
|
||||
|
||||
func (n *logNode) Handle(msg message.Message) error {
|
||||
func (n *logNode) Handle(msg *message.Message) error {
|
||||
successLableNode := n.GetLinkedNode("Success")
|
||||
failureLableNode := n.GetLinkedNode("Failure")
|
||||
|
||||
scriptEngine := NewScriptEngine(msg, "ToString", n.Script)
|
||||
scriptEngine := NewScriptEngine(*msg, "ToString", n.Script)
|
||||
logMessage, err := scriptEngine.ScriptToString()
|
||||
if err != nil {
|
||||
if failureLableNode != nil {
|
||||
@@ -38,10 +38,10 @@ func (n *logNode) Handle(msg message.Message) error {
|
||||
}
|
||||
}
|
||||
services.RuleChainMsgLogModelDao.Insert(entity.RuleChainMsgLog{
|
||||
MessageId: msg.GetId(),
|
||||
MsgType: msg.GetType(),
|
||||
DeviceName: msg.GetMetadata().GetValues()["deviceName"].(string),
|
||||
Ts: msg.GetTs(),
|
||||
MessageId: msg.Id,
|
||||
MsgType: msg.MsgType,
|
||||
DeviceName: msg.Metadata["deviceName"].(string),
|
||||
Ts: msg.Ts,
|
||||
Content: logMessage,
|
||||
})
|
||||
global.Log.Info(logMessage)
|
||||
|
||||
@@ -24,7 +24,7 @@ func (f rpcRequestNodeFactory) Create(id string, meta Metadata) (Node, error) {
|
||||
return decodePath(meta, node)
|
||||
}
|
||||
|
||||
func (n *rpcRequestNode) Handle(msg message.Message) error {
|
||||
func (n *rpcRequestNode) Handle(msg *message.Message) error {
|
||||
successLableNode := n.GetLinkedNode("Success")
|
||||
failureLableNode := n.GetLinkedNode("Failure")
|
||||
|
||||
@@ -38,9 +38,9 @@ func (n *rpcRequestNode) Handle(msg message.Message) error {
|
||||
return err
|
||||
}
|
||||
}
|
||||
msgM := msg.GetMsg()
|
||||
msgM := msg.Msg
|
||||
msgM["payload"] = respPayload
|
||||
msg.SetMsg(msgM)
|
||||
msg.Msg = msgM
|
||||
if successLableNode != nil {
|
||||
return successLableNode.Handle(msg)
|
||||
}
|
||||
|
||||
@@ -23,16 +23,16 @@ func (f rpcRespondFactory) Create(id string, meta Metadata) (Node, error) {
|
||||
return decodePath(meta, node)
|
||||
}
|
||||
|
||||
func (n *rpcRespondNode) Handle(msg message.Message) error {
|
||||
func (n *rpcRespondNode) Handle(msg *message.Message) error {
|
||||
successLableNode := n.GetLinkedNode("Success")
|
||||
failureLableNode := n.GetLinkedNode("Failure")
|
||||
RequestId := n.RequestId
|
||||
if RequestId == 0 {
|
||||
RequestId = int(msg.GetMetadata().GetKeyValue("requestId").(float64))
|
||||
RequestId = int(msg.Metadata.GetValue("requestId").(float64))
|
||||
}
|
||||
var datas = mqtt.RpcPayload{
|
||||
Method: msg.GetMsg()["method"].(string),
|
||||
Params: msg.GetMsg()["params"],
|
||||
Method: msg.Msg.GetValue("method").(string),
|
||||
Params: msg.Msg.GetValue("params"),
|
||||
}
|
||||
rpc := &mqtt.RpcRequest{Client: global.MqttClient, RequestId: RequestId}
|
||||
err := rpc.RespondTpc(datas)
|
||||
|
||||
@@ -22,11 +22,11 @@ func (f saveAttributesNodeFactory) Create(id string, meta Metadata) (Node, error
|
||||
return decodePath(meta, node)
|
||||
}
|
||||
|
||||
func (n *saveAttributesNode) Handle(msg message.Message) error {
|
||||
logrus.Infof("%s handle message '%s'", n.Name(), msg.GetType())
|
||||
func (n *saveAttributesNode) Handle(msg *message.Message) error {
|
||||
logrus.Infof("%s handle message '%s'", n.Name(), msg.MsgType)
|
||||
successLabelNode := n.GetLinkedNode("Success")
|
||||
failureLabelNode := n.GetLinkedNode("Failure")
|
||||
if msg.GetType() != message.AttributesMes {
|
||||
if msg.MsgType != message.AttributesMes {
|
||||
if failureLabelNode != nil {
|
||||
return failureLabelNode.Handle(msg)
|
||||
} else {
|
||||
@@ -34,8 +34,8 @@ func (n *saveAttributesNode) Handle(msg message.Message) error {
|
||||
}
|
||||
}
|
||||
//deviceId := msg.GetMetadata().GetValues()["deviceId"].(string)
|
||||
deviceName := msg.GetMetadata().GetValues()["deviceName"].(string)
|
||||
err := global.TdDb.InsertDevice(deviceName+"_attributes", msg.GetMsg())
|
||||
deviceName := msg.Metadata["deviceName"].(string)
|
||||
err := global.TdDb.InsertDevice(deviceName+"_attributes", msg.Msg)
|
||||
if err != nil {
|
||||
if failureLabelNode != nil {
|
||||
return failureLabelNode.Handle(msg)
|
||||
|
||||
@@ -23,11 +23,11 @@ func (f saveTimeSeriesNodeFactory) Create(id string, meta Metadata) (Node, error
|
||||
return decodePath(meta, node)
|
||||
}
|
||||
|
||||
func (n *saveTimeSeriesNode) Handle(msg message.Message) error {
|
||||
logrus.Infof("%s handle message '%s'", n.Name(), msg.GetType())
|
||||
func (n *saveTimeSeriesNode) Handle(msg *message.Message) error {
|
||||
logrus.Infof("%s handle message '%s'", n.Name(), msg.MsgType)
|
||||
successLabelNode := n.GetLinkedNode("Success")
|
||||
failureLabelNode := n.GetLinkedNode("Failure")
|
||||
if msg.GetType() != message.TelemetryMes {
|
||||
if msg.MsgType != message.TelemetryMes {
|
||||
if failureLabelNode != nil {
|
||||
return failureLabelNode.Handle(msg)
|
||||
} else {
|
||||
@@ -35,9 +35,10 @@ func (n *saveTimeSeriesNode) Handle(msg message.Message) error {
|
||||
}
|
||||
}
|
||||
//deviceId := msg.GetMetadata().GetValues()["deviceId"].(string)
|
||||
deviceName := msg.GetMetadata().GetValues()["deviceName"].(string)
|
||||
log.Println("telemetry", msg.GetMsg())
|
||||
err := global.TdDb.InsertDevice(deviceName+"_telemetry", msg.GetMsg())
|
||||
deviceName := msg.Metadata["deviceName"].(string)
|
||||
log.Println(msg.Msg)
|
||||
log.Println(msg.Metadata)
|
||||
err := global.TdDb.InsertDevice(deviceName+"_telemetry", msg.Msg)
|
||||
log.Println(err)
|
||||
if err != nil {
|
||||
if failureLabelNode != nil {
|
||||
|
||||
@@ -35,8 +35,8 @@ func (f externalDingNodeFactory) Create(id string, meta Metadata) (Node, error)
|
||||
return decodePath(meta, node)
|
||||
}
|
||||
|
||||
func (n *externalDingNode) Handle(msg message.Message) error {
|
||||
logrus.Infof("%s handle message '%s'", n.Name(), msg.GetType())
|
||||
func (n *externalDingNode) Handle(msg *message.Message) error {
|
||||
logrus.Infof("%s handle message '%s'", n.Name(), msg.MsgType)
|
||||
|
||||
successLabelNode := n.GetLinkedNode("Success")
|
||||
failureLabelNode := n.GetLinkedNode("Failure")
|
||||
|
||||
@@ -45,20 +45,20 @@ func (f externalKafkaNodeFactory) Create(id string, meta Metadata) (Node, error)
|
||||
return node, nil
|
||||
}
|
||||
|
||||
func (n *externalKafkaNode) Handle(msg message.Message) error {
|
||||
logrus.Infof("%s handle message '%s'", n.Name(), msg.GetType())
|
||||
func (n *externalKafkaNode) Handle(msg *message.Message) error {
|
||||
logrus.Infof("%s handle message '%s'", n.Name(), msg.MsgType)
|
||||
defer n.KafkaCli.Close()
|
||||
successLabelNode := n.GetLinkedNode("Success")
|
||||
failureLabelNode := n.GetLinkedNode("Failure")
|
||||
value := sarama.ByteEncoder("")
|
||||
if n.KeyPattern == "metadataKey" {
|
||||
marshal, err := json.Marshal(msg.GetMetadata().GetValues())
|
||||
marshal, err := json.Marshal(msg.Metadata)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
value = marshal
|
||||
} else {
|
||||
marshal, err := json.Marshal(msg.GetMsg())
|
||||
marshal, err := json.Marshal(msg.Msg)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -59,13 +59,13 @@ func (f externalMqttNodeFactory) Create(id string, meta Metadata) (Node, error)
|
||||
return node, nil
|
||||
}
|
||||
|
||||
func (n *externalMqttNode) Handle(msg message.Message) error {
|
||||
logrus.Infof("%s handle message '%s'", n.Name(), msg.GetType())
|
||||
func (n *externalMqttNode) Handle(msg *message.Message) error {
|
||||
logrus.Infof("%s handle message '%s'", n.Name(), msg.MsgType)
|
||||
defer n.MqttCli.Disconnect(1000)
|
||||
successLabelNode := n.GetLinkedNode("Success")
|
||||
failureLabelNode := n.GetLinkedNode("Failure")
|
||||
topic := n.TopicPattern //need fix add msg.metadata in it
|
||||
sendmqttmsg, err := json.Marshal(msg.GetMsg())
|
||||
sendmqttmsg, err := json.Marshal(msg.Msg)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -35,8 +35,8 @@ func (f externalNatsNodeFactory) Create(id string, meta Metadata) (Node, error)
|
||||
return node, nil
|
||||
}
|
||||
|
||||
func (n *externalNatsNode) Handle(msg message.Message) error {
|
||||
logrus.Infof("%s handle message '%s'", n.Name(), msg.GetType())
|
||||
func (n *externalNatsNode) Handle(msg *message.Message) error {
|
||||
logrus.Infof("%s handle message '%s'", n.Name(), msg.MsgType)
|
||||
defer n.client.Close()
|
||||
successLabelNode := n.GetLinkedNode("Success")
|
||||
failureLabelNode := n.GetLinkedNode("Failure")
|
||||
|
||||
@@ -27,8 +27,8 @@ func (f externalRestapiNodeFactory) Create(id string, meta Metadata) (Node, erro
|
||||
return decodePath(meta, node)
|
||||
}
|
||||
|
||||
func (n *externalRestapiNode) Handle(msg message.Message) error {
|
||||
logrus.Infof("%s handle message '%s'", n.Name(), msg.GetType())
|
||||
func (n *externalRestapiNode) Handle(msg *message.Message) error {
|
||||
logrus.Infof("%s handle message '%s'", n.Name(), msg.MsgType)
|
||||
successLableNode := n.GetLinkedNode("Success")
|
||||
failureLableNode := n.GetLinkedNode("Failure")
|
||||
if n.RequestMethod == "GET" {
|
||||
@@ -42,11 +42,11 @@ func (n *externalRestapiNode) Handle(msg message.Message) error {
|
||||
return failureLableNode.Handle(msg)
|
||||
} else {
|
||||
if successLableNode != nil {
|
||||
metadata := msg.GetMetadata()
|
||||
metadata := msg.Metadata
|
||||
for key, value := range response {
|
||||
metadata.SetKeyValue(key, value)
|
||||
metadata.SetValue(key, value)
|
||||
}
|
||||
msg.SetMetadata(metadata)
|
||||
msg.Metadata = metadata
|
||||
return successLableNode.Handle(msg)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -27,8 +27,8 @@ func (f externalRuleChainNodeFactory) Create(id string, meta Metadata) (Node, er
|
||||
return decodePath(meta, node)
|
||||
}
|
||||
|
||||
func (n *externalRuleChainNode) Handle(msg message.Message) error {
|
||||
logrus.Infof("%s handle message '%s'", n.Name(), msg.GetType())
|
||||
func (n *externalRuleChainNode) Handle(msg *message.Message) error {
|
||||
logrus.Infof("%s handle message '%s'", n.Name(), msg.MsgType)
|
||||
data := services.RuleChainModelDao.FindOne(n.RuleId)
|
||||
if data == nil {
|
||||
return errors.New(fmt.Sprintf("节点 %s ,获取规则链失败", n.Name()))
|
||||
|
||||
@@ -38,8 +38,8 @@ func (f externalSendEmailNodeFactory) Create(id string, meta Metadata) (Node, er
|
||||
return decodePath(meta, node)
|
||||
}
|
||||
|
||||
func (n *externalSendEmailNode) Handle(msg message.Message) error {
|
||||
logrus.Infof("%s handle message '%s'", n.Name(), msg.GetType())
|
||||
func (n *externalSendEmailNode) Handle(msg *message.Message) error {
|
||||
logrus.Infof("%s handle message '%s'", n.Name(), msg.MsgType)
|
||||
|
||||
successLabelNode := n.GetLinkedNode("Success")
|
||||
failureLabelNode := n.GetLinkedNode("Failure")
|
||||
@@ -48,7 +48,7 @@ func (n *externalSendEmailNode) Handle(msg message.Message) error {
|
||||
if tos[len(tos)-1] == "" { // 判断切片的最后一个元素是否为空,为空则移除
|
||||
tos = tos[:len(tos)-1]
|
||||
}
|
||||
err := n.send(tos, msg)
|
||||
err := n.send(tos, *msg)
|
||||
if err != nil {
|
||||
if failureLabelNode != nil {
|
||||
return failureLabelNode.Handle(msg)
|
||||
@@ -73,7 +73,7 @@ func (m *externalSendEmailNode) send(to []string, msg message.Message) error {
|
||||
}
|
||||
e.To = to
|
||||
e.Subject = m.Subject
|
||||
template, err := ParseTemplate(m.Body, msg.GetMetadata().GetValues())
|
||||
template, err := ParseTemplate(m.Body, msg.Metadata)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -27,8 +27,8 @@ func (f externalSendSmsNodeFactory) Create(id string, meta Metadata) (Node, erro
|
||||
return decodePath(meta, node)
|
||||
}
|
||||
|
||||
func (n *externalSendSmsNode) Handle(msg message.Message) error {
|
||||
logrus.Infof("%s handle message '%s'", n.Name(), msg.GetType())
|
||||
func (n *externalSendSmsNode) Handle(msg *message.Message) error {
|
||||
logrus.Infof("%s handle message '%s'", n.Name(), msg.MsgType)
|
||||
|
||||
successLabelNode := n.GetLinkedNode("Success")
|
||||
//failureLabelNode := n.GetLinkedNode("Failure")
|
||||
|
||||
@@ -28,8 +28,8 @@ func (f externalWechatNodeFactory) Create(id string, meta Metadata) (Node, error
|
||||
return decodePath(meta, node)
|
||||
}
|
||||
|
||||
func (n *externalWechatNode) Handle(msg message.Message) error {
|
||||
logrus.Infof("%s handle message '%s'", n.Name(), msg.GetType())
|
||||
func (n *externalWechatNode) Handle(msg *message.Message) error {
|
||||
logrus.Infof("%s handle message '%s'", n.Name(), msg.MsgType)
|
||||
|
||||
successLabelNode := n.GetLinkedNode("Success")
|
||||
failureLabelNode := n.GetLinkedNode("Failure")
|
||||
|
||||
@@ -22,10 +22,10 @@ type Factory interface {
|
||||
}
|
||||
|
||||
var (
|
||||
// allNodeFactories hold all node's factory
|
||||
// 所有节点对应关系
|
||||
allNodeFactories map[string]Factory = make(map[string]Factory)
|
||||
|
||||
// allNodeCategories hold node's metadata by category
|
||||
// 所有节点的类型MAP
|
||||
allNodeCategories map[string][]map[string]interface{} = make(map[string][]map[string]interface{})
|
||||
allCategories []map[string]interface{} = make([]map[string]interface{}, 0)
|
||||
)
|
||||
@@ -48,7 +48,7 @@ func NewNode(nodeType string, id string, meta Metadata) (Node, error) {
|
||||
return nil, fmt.Errorf("invalid node type '%s'", nodeType)
|
||||
}
|
||||
|
||||
// GetCategoryNodes return specified category's all nodes
|
||||
// GetCategoryNodes 获取所有分类节点
|
||||
func GetCategoryNodes() map[string][]map[string]interface{} { return allNodeCategories }
|
||||
|
||||
func GetCategory() []map[string]interface{} { return allCategories }
|
||||
|
||||
@@ -5,8 +5,8 @@ import (
|
||||
"pandax/pkg/rule_engine/message"
|
||||
)
|
||||
|
||||
//检查关联关系
|
||||
//该消息来自与哪个实体或到那个实体
|
||||
// 检查关联关系
|
||||
// 该消息来自与哪个实体或到那个实体
|
||||
type deviceTypeSwitchNode struct {
|
||||
bareNode
|
||||
}
|
||||
@@ -25,18 +25,18 @@ func (f deviceTypeSwitchNodeFactory) Create(id string, meta Metadata) (Node, err
|
||||
return decodePath(meta, node)
|
||||
}
|
||||
|
||||
func (n *deviceTypeSwitchNode) Handle(msg message.Message) error {
|
||||
logrus.Infof("%s handle message '%s'", n.Name(), msg.GetType())
|
||||
func (n *deviceTypeSwitchNode) Handle(msg *message.Message) error {
|
||||
logrus.Infof("%s handle message '%s'", n.Name(), msg.MsgType)
|
||||
|
||||
deviceLabelNode := n.GetLinkedNode(message.DEVICE)
|
||||
gatewayLabelNode := n.GetLinkedNode(message.GATEWAY)
|
||||
|
||||
if msg.GetMetadata().GetKeyValue("deviceType") == message.DEVICE {
|
||||
if msg.Metadata.GetValue("deviceType").(string) == message.DEVICE {
|
||||
if deviceLabelNode != nil {
|
||||
return deviceLabelNode.Handle(msg)
|
||||
}
|
||||
}
|
||||
if msg.GetMetadata().GetKeyValue("deviceType") == message.GATEWAY {
|
||||
if msg.Metadata.GetValue("deviceType").(string) == message.GATEWAY {
|
||||
if gatewayLabelNode != nil {
|
||||
return gatewayLabelNode.Handle(msg)
|
||||
}
|
||||
|
||||
@@ -24,12 +24,12 @@ func (f messageTypeFilterNodeFactory) Create(id string, meta Metadata) (Node, er
|
||||
return decodePath(meta, node)
|
||||
}
|
||||
|
||||
func (n *messageTypeFilterNode) Handle(msg message.Message) error {
|
||||
logrus.Infof("%s handle message '%s'", n.Name(), msg.GetType())
|
||||
func (n *messageTypeFilterNode) Handle(msg *message.Message) error {
|
||||
logrus.Infof("%s handle message '%s'", n.Name(), msg.MsgType)
|
||||
|
||||
trueLabelNode := n.GetLinkedNode("True")
|
||||
falseLabelNode := n.GetLinkedNode("False")
|
||||
messageType := msg.GetType()
|
||||
messageType := msg.MsgType
|
||||
|
||||
for _, filterType := range n.MessageTypes {
|
||||
if filterType == messageType && trueLabelNode != nil {
|
||||
|
||||
@@ -32,11 +32,11 @@ func (f messageTypeSwitchNodeFactory) Create(id string, meta Metadata) (Node, er
|
||||
return decodePath(meta, node)
|
||||
}
|
||||
|
||||
func (n *messageTypeSwitchNode) Handle(msg message.Message) error {
|
||||
logrus.Infof("%s handle message '%s'", n.Name(), msg.GetType())
|
||||
|
||||
func (n *messageTypeSwitchNode) Handle(msg *message.Message) error {
|
||||
logrus.Infof("%s handle message '%s'", n.Name(), msg.MsgType)
|
||||
msg.Metadata = map[string]interface{}{"AA": "BB", "deviceName": "fff"}
|
||||
nodes := n.GetLinkedNodes()
|
||||
messageType := msg.GetType()
|
||||
messageType := msg.MsgType
|
||||
for label, node := range nodes {
|
||||
if messageType == label {
|
||||
return node.Handle(msg)
|
||||
|
||||
@@ -24,12 +24,12 @@ func (f scriptFilterNodeFactory) Create(id string, meta Metadata) (Node, error)
|
||||
return decodePath(meta, node)
|
||||
}
|
||||
|
||||
func (n *scriptFilterNode) Handle(msg message.Message) error {
|
||||
logrus.Infof("%s handle message '%s'", n.Name(), msg.GetType())
|
||||
func (n *scriptFilterNode) Handle(msg *message.Message) error {
|
||||
logrus.Infof("%s handle message '%s'", n.Name(), msg.MsgType)
|
||||
|
||||
trueLabelNode := n.GetLinkedNode("True")
|
||||
falseLabelNode := n.GetLinkedNode("False")
|
||||
scriptEngine := NewScriptEngine(msg, "Filter", n.Script)
|
||||
scriptEngine := NewScriptEngine(*msg, "Filter", n.Script)
|
||||
isTrue, error := scriptEngine.ScriptOnFilter()
|
||||
if isTrue == true && error == nil && trueLabelNode != nil {
|
||||
return trueLabelNode.Handle(msg)
|
||||
|
||||
@@ -34,10 +34,10 @@ func (f switchFilterNodeFactory) Create(id string, meta Metadata) (Node, error)
|
||||
return decodePath(meta, node)
|
||||
}
|
||||
|
||||
func (n *switchFilterNode) Handle(msg message.Message) error {
|
||||
logrus.Infof("%s handle message '%s'", n.Name(), msg.GetType())
|
||||
func (n *switchFilterNode) Handle(msg *message.Message) error {
|
||||
logrus.Infof("%s handle message '%s'", n.Name(), msg.MsgType)
|
||||
|
||||
scriptEngine := NewScriptEngine(msg, "Switch", n.Script)
|
||||
scriptEngine := NewScriptEngine(*msg, "Switch", n.Script)
|
||||
SwitchResults, err := scriptEngine.ScriptOnSwitch()
|
||||
if err != nil {
|
||||
return err
|
||||
|
||||
@@ -23,8 +23,8 @@ func (f inputNodeFactory) Create(id string, meta Metadata) (Node, error) {
|
||||
return node, nil
|
||||
}
|
||||
|
||||
func (n *inputNode) Handle(msg message.Message) error {
|
||||
logrus.Infof("%s handle message '%s'", n.Name(), msg.GetType())
|
||||
func (n *inputNode) Handle(msg *message.Message) error {
|
||||
logrus.Infof("%s handle message '%s'", n.Name(), msg.MsgType)
|
||||
|
||||
nodes := n.GetLinkedNodes()
|
||||
for _, node := range nodes {
|
||||
|
||||
@@ -10,6 +10,7 @@ const (
|
||||
NODE_CONFIG_ORIGINATOR_TYPE_KEY = "originatorTypeKey"
|
||||
)
|
||||
|
||||
// Metadata 前端 参数 Properties
|
||||
type Metadata interface {
|
||||
Keys() []string
|
||||
With(key string, val interface{}) Metadata
|
||||
|
||||
@@ -13,7 +13,7 @@ type Node interface {
|
||||
Id() string
|
||||
Metadata() Metadata
|
||||
MustLabels() []string
|
||||
Handle(message.Message) error
|
||||
Handle(*message.Message) error
|
||||
|
||||
AddLinkedNode(label string, node Node)
|
||||
GetLinkedNode(label string) Node
|
||||
@@ -55,7 +55,7 @@ func (n *bareNode) GetLinkedNodes() map[string]Node { return n.nodes }
|
||||
|
||||
func (n *bareNode) Metadata() Metadata { return n.meta }
|
||||
|
||||
func (n *bareNode) Handle(message.Message) error { return errors.New("not implemented") }
|
||||
func (n *bareNode) Handle(*message.Message) error { return errors.New("not implemented") }
|
||||
|
||||
func decodePath(meta Metadata, n Node) (Node, error) {
|
||||
if err := meta.DecodePath(n); err != nil {
|
||||
|
||||
@@ -8,7 +8,7 @@ import (
|
||||
)
|
||||
|
||||
type ScriptEngine interface {
|
||||
ScriptOnMessage() (message.Message, error)
|
||||
ScriptOnMessage() (*message.Message, error)
|
||||
ScriptOnSwitch() ([]string, error)
|
||||
ScriptOnFilter() (bool, error)
|
||||
ScriptToString() (string, error)
|
||||
@@ -29,7 +29,7 @@ func NewScriptEngine(msg message.Message, fun string, script string) ScriptEngin
|
||||
}
|
||||
}
|
||||
|
||||
func (bse *baseScriptEngine) ScriptOnMessage() (message.Message, error) {
|
||||
func (bse *baseScriptEngine) ScriptOnMessage() (*message.Message, error) {
|
||||
msg := bse.Msg
|
||||
vm := goja.New()
|
||||
_, err := vm.RunString(bse.Script)
|
||||
@@ -43,11 +43,11 @@ func (bse *baseScriptEngine) ScriptOnMessage() (message.Message, error) {
|
||||
logrus.Info("Js函数映射到 Go 函数失败!")
|
||||
return nil, err
|
||||
}
|
||||
datas := fn(msg.GetMsg(), msg.GetMetadata().GetValues(), msg.GetType())
|
||||
msg.SetMsg(datas["msg"].(map[string]interface{}))
|
||||
msg.SetMetadata(message.NewDefaultMetadata(datas["metadata"].(map[string]interface{})))
|
||||
msg.SetType(datas["msgType"].(string))
|
||||
return msg, nil
|
||||
datas := fn(msg.Msg, msg.Metadata, msg.MsgType)
|
||||
msg.Msg = datas["msg"].(map[string]interface{})
|
||||
msg.Metadata = datas["metadata"].(map[string]interface{})
|
||||
msg.MsgType = datas["msgType"].(string)
|
||||
return &msg, nil
|
||||
}
|
||||
|
||||
func (bse *baseScriptEngine) ScriptOnSwitch() ([]string, error) {
|
||||
@@ -64,7 +64,7 @@ func (bse *baseScriptEngine) ScriptOnSwitch() ([]string, error) {
|
||||
logrus.Info("Js函数映射到 Go 函数失败!")
|
||||
return nil, err
|
||||
}
|
||||
datas := fn(msg.GetMsg(), msg.GetMetadata().GetValues(), msg.GetType())
|
||||
datas := fn(msg.Msg, msg.Metadata, msg.MsgType)
|
||||
return datas, nil
|
||||
}
|
||||
|
||||
@@ -82,7 +82,7 @@ func (bse *baseScriptEngine) ScriptOnFilter() (bool, error) {
|
||||
logrus.Info("Js函数映射到 Go 函数失败!")
|
||||
return false, err
|
||||
}
|
||||
datas := fn(msg.GetMsg(), msg.GetMetadata().GetValues(), msg.GetType())
|
||||
datas := fn(msg.Msg, msg.Metadata, msg.MsgType)
|
||||
return datas, nil
|
||||
}
|
||||
|
||||
@@ -100,7 +100,7 @@ func (bse *baseScriptEngine) ScriptToString() (string, error) {
|
||||
logrus.Info("Js函数映射到 Go 函数失败!")
|
||||
return "", err
|
||||
}
|
||||
data := fn(msg.GetMsg(), msg.GetMetadata().GetValues(), msg.GetType())
|
||||
data := fn(msg.Msg, msg.Metadata, msg.MsgType)
|
||||
return data, nil
|
||||
}
|
||||
|
||||
@@ -118,6 +118,6 @@ func (bse *baseScriptEngine) ScriptGenerate() (map[string]interface{}, error) {
|
||||
logrus.Info("Js函数映射到 Go 函数失败!")
|
||||
return nil, err
|
||||
}
|
||||
datas := fn(msg.GetMsg(), msg.GetMetadata().GetValues(), msg.GetType())
|
||||
datas := fn(msg.Msg, msg.Metadata, msg.MsgType)
|
||||
return datas, nil
|
||||
}
|
||||
|
||||
@@ -23,27 +23,26 @@ func (f transformDeleteKeyNodeFactory) Create(id string, meta Metadata) (Node, e
|
||||
return decodePath(meta, node)
|
||||
}
|
||||
|
||||
func (n *transformDeleteKeyNode) Handle(msg message.Message) error {
|
||||
logrus.Infof("%s handle message '%s'", n.Name(), msg.GetType())
|
||||
func (n *transformDeleteKeyNode) Handle(msg *message.Message) error {
|
||||
logrus.Infof("%s handle message '%s'", n.Name(), msg.MsgType)
|
||||
|
||||
successLabelNode := n.GetLinkedNode("Success")
|
||||
failureLabelNode := n.GetLinkedNode("Failure")
|
||||
keys := strings.Split(n.Keys, ",")
|
||||
if n.FormType == "msg" {
|
||||
data := msg.GetMsg()
|
||||
data := msg.Msg
|
||||
for _, key := range keys {
|
||||
if _, found := data[key]; found {
|
||||
delete(data, key)
|
||||
msg.SetMsg(data)
|
||||
msg.Msg = data
|
||||
}
|
||||
}
|
||||
} else if n.FormType == "metadata" {
|
||||
data := msg.GetMetadata()
|
||||
data := msg.Metadata
|
||||
for _, key := range keys {
|
||||
if data.GetKeyValue(key) != nil {
|
||||
values := data.GetValues()
|
||||
delete(values, key)
|
||||
msg.SetMetadata(message.NewDefaultMetadata(values))
|
||||
if data.GetValue(key) != nil {
|
||||
delete(data, key)
|
||||
msg.Metadata = data
|
||||
}
|
||||
}
|
||||
} else {
|
||||
|
||||
@@ -26,28 +26,27 @@ func (f transformRenameKeyNodeFactory) Create(id string, meta Metadata) (Node, e
|
||||
return decodePath(meta, node)
|
||||
}
|
||||
|
||||
func (n *transformRenameKeyNode) Handle(msg message.Message) error {
|
||||
logrus.Infof("%s handle message '%s'", n.Name(), msg.GetType())
|
||||
func (n *transformRenameKeyNode) Handle(msg *message.Message) error {
|
||||
logrus.Infof("%s handle message '%s'", n.Name(), msg.MsgType)
|
||||
|
||||
successLabelNode := n.GetLinkedNode("Success")
|
||||
failureLabelNode := n.GetLinkedNode("Failure")
|
||||
if n.FormType == "msg" {
|
||||
data := msg.GetMsg()
|
||||
data := msg.Msg
|
||||
for _, key := range n.Keys {
|
||||
if _, found := data[key.OldName]; found {
|
||||
data[key.NewName] = data[key.OldName]
|
||||
delete(data, key.OldName)
|
||||
msg.SetMsg(data)
|
||||
msg.Msg = data
|
||||
}
|
||||
}
|
||||
} else if n.FormType == "metadata" {
|
||||
data := msg.GetMetadata()
|
||||
data := msg.Metadata
|
||||
for _, key := range n.Keys {
|
||||
if data.GetKeyValue(key.OldName) != nil {
|
||||
values := data.GetValues()
|
||||
values[key.NewName] = values[key.OldName]
|
||||
delete(values, key.OldName)
|
||||
msg.SetMetadata(message.NewDefaultMetadata(values))
|
||||
if data.GetValue(key.OldName) != nil {
|
||||
data[key.NewName] = data[key.OldName]
|
||||
delete(data, key.OldName)
|
||||
msg.Metadata = data
|
||||
}
|
||||
}
|
||||
} else {
|
||||
|
||||
@@ -22,13 +22,13 @@ func (f transformScriptNodeFactory) Create(id string, meta Metadata) (Node, erro
|
||||
return decodePath(meta, node)
|
||||
}
|
||||
|
||||
func (n *transformScriptNode) Handle(msg message.Message) error {
|
||||
logrus.Infof("%s handle message '%s'", n.Name(), msg.GetType())
|
||||
func (n *transformScriptNode) Handle(msg *message.Message) error {
|
||||
logrus.Infof("%s handle message '%s'", n.Name(), msg.MsgType)
|
||||
|
||||
successLabelNode := n.GetLinkedNode("Success")
|
||||
failureLabelNode := n.GetLinkedNode("Failure")
|
||||
|
||||
scriptEngine := NewScriptEngine(msg, "Transform", n.Script)
|
||||
scriptEngine := NewScriptEngine(*msg, "Transform", n.Script)
|
||||
newMessage, err := scriptEngine.ScriptOnMessage()
|
||||
if err != nil {
|
||||
if failureLabelNode != nil {
|
||||
|
||||
Reference in New Issue
Block a user