diff --git a/README.md b/README.md index e0c4780..01cf344 100644 --- a/README.md +++ b/README.md @@ -107,11 +107,6 @@ 更多功能请访问系统。 - -## ❤特别鸣谢 - - * 感谢[VUE-NEXT-ADMIN](https://gitee.com/lyt-top/vue-next-admin) - --- 版权说明 --- diff --git a/default.log b/default.log index 3ed62de..3f58ee3 100644 --- a/default.log +++ b/default.log @@ -685,3 +685,12 @@ 2023-08-24 15:27:41.492 [INFO] [D:/workspace/go/project/PandaX/PandaX/pkg/middleware/log.go:34] [uname=panda] [GET=/system/dict/data/type] [uid=1] : 获取字典数据列表通过字典类型 ->6ms 2023-08-24 15:27:41.504 [INFO] [D:/workspace/go/project/PandaX/PandaX/pkg/middleware/log.go:34] [uid=1] [uname=panda] [GET=/log/logLogin/list] : 获取登录日志列表 ->8ms +2023-08-24 17:35:27.159 [DEBUG] [D:/workspace/go/project/PandaX/PandaX/pkg/transport/http_server.go:59] : no Route found (in 2 routes) that matches HTTP method OPTIONS + +2023-08-24 17:35:27.197 [INFO] [D:/workspace/go/project/PandaX/PandaX/pkg/middleware/log.go:34] [GET=/visual/screen/zt_08ca17dbc53c916369ffb3d0] [uid=1] [uname=panda] : 获取Screen信息 ->6ms +2023-08-24 17:35:28.459 [DEBUG] [D:/workspace/go/project/PandaX/PandaX/pkg/transport/http_server.go:59] : no Route found (in 2 routes) that matches HTTP method OPTIONS + +2023-08-24 17:35:28.491 [INFO] [D:/workspace/go/project/PandaX/PandaX/pkg/middleware/log.go:34] [uname=panda] [GET=/rule/chain/rule_a37571bb6c45378b57803793] [uid=1] : 获取规则引擎信息 ->4ms +2023-08-24 17:35:28.749 [DEBUG] [D:/workspace/go/project/PandaX/PandaX/pkg/transport/http_server.go:59] : no Route found (in 3 routes) that matches HTTP method OPTIONS + +2023-08-24 17:35:28.756 [INFO] [D:/workspace/go/project/PandaX/PandaX/pkg/middleware/log.go:34] [GET=/rule/chain/nodeLabels] [uid=1] [uname=panda] : 获取所有节点标签 ->0ms diff --git a/pkg/rule_engine/instance.go b/pkg/rule_engine/instance.go index 34197df..869788a 100644 --- a/pkg/rule_engine/instance.go +++ b/pkg/rule_engine/instance.go @@ -41,7 +41,7 @@ func newInstanceWithManifest(m *manifest.Manifest) (*ruleChainInstance, []error) } // StartRuleChain TODO 是否需要添加context -func (c *ruleChainInstance) StartRuleChain(context context.Context, message message.Message) error { +func (c *ruleChainInstance) StartRuleChain(context context.Context, message *message.Message) error { if node, found := c.nodes[c.firstRuleNodeId]; found { node.Handle(message) } diff --git a/pkg/rule_engine/instance_test.go b/pkg/rule_engine/instance_test.go index 13556b4..d4fc16b 100644 --- a/pkg/rule_engine/instance_test.go +++ b/pkg/rule_engine/instance_test.go @@ -18,13 +18,13 @@ func TestNewRuleChainInstance(t *testing.T) { t.Error(errs[0]) } - metadata := message.NewDefaultMetadata(map[string]interface{}{ + metadata := message.Metadata{ "deviceName": "ws432", "deviceId": "d_1928b99619910dae5a001fa7", "deviceType": "direct", "productId": "p_3ba460634520cf4590dc90e5", - }) - msg := message.NewMessageWithDetail("1", message.TelemetryMes, map[string]interface{}{"temperature": 60.4, "humidity": 32.5}, metadata) + } + msg := message.NewMessage("1", message.TelemetryMes, message.Msg{"temperature": 60.4, "humidity": 32.5}, metadata) t.Log("开始执行力流程") err = instance.StartRuleChain(context.Background(), msg) if err != nil { @@ -33,8 +33,7 @@ func TestNewRuleChainInstance(t *testing.T) { } func TestScriptEngine(t *testing.T) { - metadata := message.NewDefaultMetadata(map[string]interface{}{"device": "aa"}) - msg := message.NewMessageWithDetail("1", message.UpEventMes, map[string]interface{}{"aa": 5}, metadata) + msg := message.NewMessage("1", message.UpEventMes, map[string]interface{}{"aa": 5}, map[string]interface{}{"device": "aa"}) const baseScript = ` function nextRelation(metadata, msg) { return ['one','nine']; @@ -47,7 +46,7 @@ func TestScriptEngine(t *testing.T) { } return nextRelation(metadata, msg); ` - scriptEngine := nodes.NewScriptEngine(msg, "Switch", baseScript) + scriptEngine := nodes.NewScriptEngine(*msg, "Switch", baseScript) SwitchResults, err := scriptEngine.ScriptOnSwitch() if err != nil { @@ -57,19 +56,18 @@ func TestScriptEngine(t *testing.T) { } func TestScriptOnMessage(t *testing.T) { - metadata := message.NewDefaultMetadata(map[string]interface{}{"device": "aa"}) - msg := message.NewMessageWithDetail("1", message.UpEventMes, map[string]interface{}{"aa": 5}, metadata) + msg := message.NewMessage("1", message.UpEventMes, map[string]interface{}{"aa": 5}, map[string]interface{}{"device": "aa"}) const baseScript = ` msg.bb = "33" metadata.event = 55 return {msg: msg, metadata: metadata, msgType: msgType}; ` - scriptEngine := nodes.NewScriptEngine(msg, "Transform", baseScript) + scriptEngine := nodes.NewScriptEngine(*msg, "Transform", baseScript) ScriptOnMessageResults, err := scriptEngine.ScriptOnMessage() if err != nil { t.Error(err) } - t.Log(ScriptOnMessageResults.GetMetadata()) + t.Log(ScriptOnMessageResults.Metadata) } diff --git a/pkg/rule_engine/message/message.go b/pkg/rule_engine/message/message.go index 4379799..6daeb52 100644 --- a/pkg/rule_engine/message/message.go +++ b/pkg/rule_engine/message/message.go @@ -1,11 +1,6 @@ package message -import ( - "encoding/json" - "github.com/google/uuid" - "time" -) - +/* // 消息类型 const ( ConnectMes = "Connect" @@ -154,3 +149,4 @@ func (t *defaultMetadata) GetValues() map[string]interface{} { func (t *defaultMetadata) SetValues(values map[string]interface{}) { t.values = values } +*/ diff --git a/pkg/rule_engine/message/message1.go b/pkg/rule_engine/message/message1.go new file mode 100644 index 0000000..c3539c7 --- /dev/null +++ b/pkg/rule_engine/message/message1.go @@ -0,0 +1,99 @@ +package message + +import ( + "encoding/json" + "github.com/google/uuid" + "time" +) + +// 消息类型 +const ( + ConnectMes = "Connect" + DisConnectMes = "Disconnect" + RpcRequestMes = "RpcRequest" + UpEventMes = "Event" + AlarmMes = "Alarm" + RowMes = "Row" + TelemetryMes = "Telemetry" + AttributesMes = "Attributes" +) + +// 数据类型Originator +const ( + DEVICE = "DEVICE" + GATEWAY = "GATEWAY" + MONITOR = "MONITOR" //监控 +) + +type Msg map[string]interface{} +type Metadata map[string]interface{} + +type Message struct { + Id string //uuid 消息Id + Ts time.Time //时间戳 + MsgType string //消息类型, attributes(参数),telemetry(遥测),Connect连接事件 + UserId string //客户Id UUID 设备发布人 + Msg Msg //数据 数据结构JSON 设备原始数据 msg + Metadata Metadata //消息的元数据 包括设备Id,设备类型,产品ID等 +} + +// NewMessage ... +func NewMessage(userId, messageType string, msg Msg, metadata Metadata) *Message { + return &Message{ + Id: uuid.New().String(), + Ts: time.Now(), + UserId: userId, + MsgType: messageType, + Msg: msg, + Metadata: metadata, + } +} + +func (t *Message) GetAllMap() map[string]interface{} { + data := make(map[string]interface{}) + for msgKey, msgValue := range t.Msg { + for metaKey, metaValue := range t.Metadata { + if msgKey == metaKey { + data[msgKey] = metaValue + } else { + if _, ok := data[msgKey]; !ok { + data[msgKey] = msgValue + } + if _, ok := data[metaKey]; !ok { + data[metaKey] = metaValue + } + } + } + } + return data +} + +func (t *Message) MarshalBinary() ([]byte, error) { + return json.Marshal(t) +} + +func (meta *Metadata) Keys() []string { + keys := make([]string, 0) + for key := range *meta { + keys = append(keys, key) + } + return keys +} + +func (meta *Metadata) GetValue(key string) any { + if _, found := (*meta)[key]; !found { + return nil + } + return (*meta)[key] +} + +func (meta *Metadata) SetValue(key string, val interface{}) { + (*meta)[key] = val +} + +func (msg *Msg) GetValue(key string) any { + if _, found := (*msg)[key]; !found { + return nil + } + return (*msg)[key] +} diff --git a/pkg/rule_engine/nodes/action_clear_alarm_node.go b/pkg/rule_engine/nodes/action_clear_alarm_node.go index b9bf1fb..1d22f93 100644 --- a/pkg/rule_engine/nodes/action_clear_alarm_node.go +++ b/pkg/rule_engine/nodes/action_clear_alarm_node.go @@ -29,16 +29,16 @@ func (f clearAlarmNodeFactory) Create(id string, meta Metadata) (Node, error) { return decodePath(meta, node) } -func (n *clearAlarmNode) Handle(msg message.Message) error { - logrus.Infof("%s handle message '%s'", n.Name(), msg.GetType()) +func (n *clearAlarmNode) Handle(msg *message.Message) error { + logrus.Infof("%s handle message '%s'", n.Name(), msg.MsgType) cleared := n.GetLinkedNode("Cleared") failure := n.GetLinkedNode("Failure") - alarm := services.DeviceAlarmModelDao.FindOneByType(msg.GetMetadata().GetKeyValue("deviceId").(string), n.AlarmType, "0") + alarm := services.DeviceAlarmModelDao.FindOneByType(msg.Metadata.GetValue("deviceId").(string), n.AlarmType, "0") if alarm.DeviceId != "" { log.Println("清除告警") alarm.State = global.CLEARED - marshal, _ := json.Marshal(msg.GetMsg()) + marshal, _ := json.Marshal(msg.Msg) alarm.Details = string(marshal) err := services.DeviceAlarmModelDao.Update(*alarm) if err != nil { diff --git a/pkg/rule_engine/nodes/action_create_alarm_node.go b/pkg/rule_engine/nodes/action_create_alarm_node.go index 68b681f..9d8ef4f 100644 --- a/pkg/rule_engine/nodes/action_create_alarm_node.go +++ b/pkg/rule_engine/nodes/action_create_alarm_node.go @@ -29,16 +29,16 @@ func (f createAlarmNodeFactory) Create(id string, meta Metadata) (Node, error) { return decodePath(meta, node) } -func (n *createAlarmNode) Handle(msg message.Message) error { - logrus.Infof("%s handle message '%s'", n.Name(), msg.GetType()) +func (n *createAlarmNode) Handle(msg *message.Message) error { + logrus.Infof("%s handle message '%s'", n.Name(), msg.MsgType) created := n.GetLinkedNode("Created") updated := n.GetLinkedNode("Updated") failure := n.GetLinkedNode("Failure") - alarm := services.DeviceAlarmModelDao.FindOneByType(msg.GetMetadata().GetKeyValue("deviceId").(string), n.AlarmType, "0") + alarm := services.DeviceAlarmModelDao.FindOneByType(msg.Metadata.GetValue("deviceId").(string), n.AlarmType, "0") if alarm.DeviceId != "" { - marshal, _ := json.Marshal(msg.GetMsg()) + marshal, _ := json.Marshal(msg.Msg) alarm.Details = string(marshal) err := services.DeviceAlarmModelDao.Update(*alarm) if err != nil { @@ -53,14 +53,14 @@ func (n *createAlarmNode) Handle(msg message.Message) error { } else { alarm = &entity.DeviceAlarm{} alarm.Id = kgo.KStr.Uniqid("a") - alarm.DeviceId = msg.GetMetadata().GetKeyValue("deviceId").(string) - alarm.ProductId = msg.GetMetadata().GetKeyValue("productId").(string) - alarm.Name = msg.GetMetadata().GetKeyValue("deviceName").(string) + alarm.DeviceId = msg.Metadata.GetValue("deviceId").(string) + alarm.ProductId = msg.Metadata.GetValue("productId").(string) + alarm.Name = msg.Metadata.GetValue("deviceName").(string) alarm.Level = n.AlarmSeverity alarm.State = global.ALARMING alarm.Type = n.AlarmType alarm.Time = time.Now() - marshal, _ := json.Marshal(msg.GetMsg()) + marshal, _ := json.Marshal(msg.Msg) alarm.Details = string(marshal) err := services.DeviceAlarmModelDao.Insert(*alarm) if err != nil { diff --git a/pkg/rule_engine/nodes/action_delay_node.go b/pkg/rule_engine/nodes/action_delay_node.go index 2105669..f0e35b0 100644 --- a/pkg/rule_engine/nodes/action_delay_node.go +++ b/pkg/rule_engine/nodes/action_delay_node.go @@ -13,11 +13,11 @@ const DelayNodeName = "DelayNode" type delayNode struct { bareNode - PeriodTs int `json:"periodTs" yaml:"periodTs" jpath:"periodTs"` //周期时间 - MaxPendingMessages int `json:"maxPendingMessages" yaml:"maxPendingMessages" jpath:"maxPendingMessages"` //最大等待消息数 - messageQueue []message.Message `jpath:"-"` - delayTimer *time.Timer `jpath:"-"` - lock sync.Mutex `jpath:"-"` + PeriodTs int `json:"periodTs" yaml:"periodTs" jpath:"periodTs"` //周期时间 + MaxPendingMessages int `json:"maxPendingMessages" yaml:"maxPendingMessages" jpath:"maxPendingMessages"` //最大等待消息数 + messageQueue []*message.Message `jpath:"-"` + delayTimer *time.Timer `jpath:"-"` + lock sync.Mutex `jpath:"-"` } type delayNodeFactory struct{} @@ -31,12 +31,12 @@ func (f delayNodeFactory) Create(id string, meta Metadata) (Node, error) { lock: sync.Mutex{}, } _, err := decodePath(meta, node) - node.messageQueue = make([]message.Message, node.MaxPendingMessages) + node.messageQueue = make([]*message.Message, node.MaxPendingMessages) return node, err } -func (n *delayNode) Handle(msg message.Message) error { - logrus.Infof("%s handle message '%s'", n.Name(), msg.GetType()) +func (n *delayNode) Handle(msg *message.Message) error { + logrus.Infof("%s handle message '%s'", n.Name(), msg.MsgType) successLabelNode := n.GetLinkedNode("Success") failureLabelNode := n.GetLinkedNode("Failure") diff --git a/pkg/rule_engine/nodes/action_generator_node.go b/pkg/rule_engine/nodes/action_generator_node.go index 4cffe8c..84ea52c 100644 --- a/pkg/rule_engine/nodes/action_generator_node.go +++ b/pkg/rule_engine/nodes/action_generator_node.go @@ -25,8 +25,8 @@ func (f messageGeneratorNodeFactory) Create(id string, meta Metadata) (Node, err return decodePath(meta, node) } -func (n *messageGeneratorNode) Handle(msg message.Message) error { - logrus.Infof("%s handle message '%s'", n.Name(), msg.GetType()) +func (n *messageGeneratorNode) Handle(msg *message.Message) error { + logrus.Infof("%s handle message '%s'", n.Name(), msg.MsgType) successLabelNode := n.GetLinkedNode("Success") failureLabelNode := n.GetLinkedNode("Failure") @@ -42,7 +42,7 @@ func (n *messageGeneratorNode) Handle(msg message.Message) error { ticker.Stop() return } - scriptEngine := NewScriptEngine(msg, "Generate", n.Script) + scriptEngine := NewScriptEngine(*msg, "Generate", n.Script) generate, err := scriptEngine.ScriptGenerate() if err != nil { if failureLabelNode != nil { @@ -50,9 +50,9 @@ func (n *messageGeneratorNode) Handle(msg message.Message) error { } return } - msg.SetMsg(generate["msg"].(map[string]interface{})) - msg.SetType(generate["msgType"].(string)) - msg.SetMetadata(message.NewDefaultMetadata(generate["metadata"].(map[string]interface{}))) + msg.Msg = generate["msg"].(message.Msg) + msg.Metadata = generate["metadata"].(message.Metadata) + msg.MsgType = generate["msgType"].(string) if successLabelNode != nil { go successLabelNode.Handle(msg) } diff --git a/pkg/rule_engine/nodes/action_log_node.go b/pkg/rule_engine/nodes/action_log_node.go index 2939cf5..5cf3471 100644 --- a/pkg/rule_engine/nodes/action_log_node.go +++ b/pkg/rule_engine/nodes/action_log_node.go @@ -24,11 +24,11 @@ func (f logNodeFactory) Create(id string, meta Metadata) (Node, error) { return decodePath(meta, node) } -func (n *logNode) Handle(msg message.Message) error { +func (n *logNode) Handle(msg *message.Message) error { successLableNode := n.GetLinkedNode("Success") failureLableNode := n.GetLinkedNode("Failure") - scriptEngine := NewScriptEngine(msg, "ToString", n.Script) + scriptEngine := NewScriptEngine(*msg, "ToString", n.Script) logMessage, err := scriptEngine.ScriptToString() if err != nil { if failureLableNode != nil { @@ -38,10 +38,10 @@ func (n *logNode) Handle(msg message.Message) error { } } services.RuleChainMsgLogModelDao.Insert(entity.RuleChainMsgLog{ - MessageId: msg.GetId(), - MsgType: msg.GetType(), - DeviceName: msg.GetMetadata().GetValues()["deviceName"].(string), - Ts: msg.GetTs(), + MessageId: msg.Id, + MsgType: msg.MsgType, + DeviceName: msg.Metadata["deviceName"].(string), + Ts: msg.Ts, Content: logMessage, }) global.Log.Info(logMessage) diff --git a/pkg/rule_engine/nodes/action_rpc_request_node.go b/pkg/rule_engine/nodes/action_rpc_request_node.go index 090a702..e709505 100644 --- a/pkg/rule_engine/nodes/action_rpc_request_node.go +++ b/pkg/rule_engine/nodes/action_rpc_request_node.go @@ -24,7 +24,7 @@ func (f rpcRequestNodeFactory) Create(id string, meta Metadata) (Node, error) { return decodePath(meta, node) } -func (n *rpcRequestNode) Handle(msg message.Message) error { +func (n *rpcRequestNode) Handle(msg *message.Message) error { successLableNode := n.GetLinkedNode("Success") failureLableNode := n.GetLinkedNode("Failure") @@ -38,9 +38,9 @@ func (n *rpcRequestNode) Handle(msg message.Message) error { return err } } - msgM := msg.GetMsg() + msgM := msg.Msg msgM["payload"] = respPayload - msg.SetMsg(msgM) + msg.Msg = msgM if successLableNode != nil { return successLableNode.Handle(msg) } diff --git a/pkg/rule_engine/nodes/action_rpc_respond_node.go b/pkg/rule_engine/nodes/action_rpc_respond_node.go index acbc5d5..871988e 100644 --- a/pkg/rule_engine/nodes/action_rpc_respond_node.go +++ b/pkg/rule_engine/nodes/action_rpc_respond_node.go @@ -23,16 +23,16 @@ func (f rpcRespondFactory) Create(id string, meta Metadata) (Node, error) { return decodePath(meta, node) } -func (n *rpcRespondNode) Handle(msg message.Message) error { +func (n *rpcRespondNode) Handle(msg *message.Message) error { successLableNode := n.GetLinkedNode("Success") failureLableNode := n.GetLinkedNode("Failure") RequestId := n.RequestId if RequestId == 0 { - RequestId = int(msg.GetMetadata().GetKeyValue("requestId").(float64)) + RequestId = int(msg.Metadata.GetValue("requestId").(float64)) } var datas = mqtt.RpcPayload{ - Method: msg.GetMsg()["method"].(string), - Params: msg.GetMsg()["params"], + Method: msg.Msg.GetValue("method").(string), + Params: msg.Msg.GetValue("params"), } rpc := &mqtt.RpcRequest{Client: global.MqttClient, RequestId: RequestId} err := rpc.RespondTpc(datas) diff --git a/pkg/rule_engine/nodes/action_save_attributes_node.go b/pkg/rule_engine/nodes/action_save_attributes_node.go index d7cbc80..302121e 100644 --- a/pkg/rule_engine/nodes/action_save_attributes_node.go +++ b/pkg/rule_engine/nodes/action_save_attributes_node.go @@ -22,11 +22,11 @@ func (f saveAttributesNodeFactory) Create(id string, meta Metadata) (Node, error return decodePath(meta, node) } -func (n *saveAttributesNode) Handle(msg message.Message) error { - logrus.Infof("%s handle message '%s'", n.Name(), msg.GetType()) +func (n *saveAttributesNode) Handle(msg *message.Message) error { + logrus.Infof("%s handle message '%s'", n.Name(), msg.MsgType) successLabelNode := n.GetLinkedNode("Success") failureLabelNode := n.GetLinkedNode("Failure") - if msg.GetType() != message.AttributesMes { + if msg.MsgType != message.AttributesMes { if failureLabelNode != nil { return failureLabelNode.Handle(msg) } else { @@ -34,8 +34,8 @@ func (n *saveAttributesNode) Handle(msg message.Message) error { } } //deviceId := msg.GetMetadata().GetValues()["deviceId"].(string) - deviceName := msg.GetMetadata().GetValues()["deviceName"].(string) - err := global.TdDb.InsertDevice(deviceName+"_attributes", msg.GetMsg()) + deviceName := msg.Metadata["deviceName"].(string) + err := global.TdDb.InsertDevice(deviceName+"_attributes", msg.Msg) if err != nil { if failureLabelNode != nil { return failureLabelNode.Handle(msg) diff --git a/pkg/rule_engine/nodes/action_save_timeseries_node.go b/pkg/rule_engine/nodes/action_save_timeseries_node.go index 1cbd4b7..10a387c 100644 --- a/pkg/rule_engine/nodes/action_save_timeseries_node.go +++ b/pkg/rule_engine/nodes/action_save_timeseries_node.go @@ -23,11 +23,11 @@ func (f saveTimeSeriesNodeFactory) Create(id string, meta Metadata) (Node, error return decodePath(meta, node) } -func (n *saveTimeSeriesNode) Handle(msg message.Message) error { - logrus.Infof("%s handle message '%s'", n.Name(), msg.GetType()) +func (n *saveTimeSeriesNode) Handle(msg *message.Message) error { + logrus.Infof("%s handle message '%s'", n.Name(), msg.MsgType) successLabelNode := n.GetLinkedNode("Success") failureLabelNode := n.GetLinkedNode("Failure") - if msg.GetType() != message.TelemetryMes { + if msg.MsgType != message.TelemetryMes { if failureLabelNode != nil { return failureLabelNode.Handle(msg) } else { @@ -35,9 +35,10 @@ func (n *saveTimeSeriesNode) Handle(msg message.Message) error { } } //deviceId := msg.GetMetadata().GetValues()["deviceId"].(string) - deviceName := msg.GetMetadata().GetValues()["deviceName"].(string) - log.Println("telemetry", msg.GetMsg()) - err := global.TdDb.InsertDevice(deviceName+"_telemetry", msg.GetMsg()) + deviceName := msg.Metadata["deviceName"].(string) + log.Println(msg.Msg) + log.Println(msg.Metadata) + err := global.TdDb.InsertDevice(deviceName+"_telemetry", msg.Msg) log.Println(err) if err != nil { if failureLabelNode != nil { diff --git a/pkg/rule_engine/nodes/external_ding_node.go b/pkg/rule_engine/nodes/external_ding_node.go index 5f2fe38..9b4ef25 100644 --- a/pkg/rule_engine/nodes/external_ding_node.go +++ b/pkg/rule_engine/nodes/external_ding_node.go @@ -35,8 +35,8 @@ func (f externalDingNodeFactory) Create(id string, meta Metadata) (Node, error) return decodePath(meta, node) } -func (n *externalDingNode) Handle(msg message.Message) error { - logrus.Infof("%s handle message '%s'", n.Name(), msg.GetType()) +func (n *externalDingNode) Handle(msg *message.Message) error { + logrus.Infof("%s handle message '%s'", n.Name(), msg.MsgType) successLabelNode := n.GetLinkedNode("Success") failureLabelNode := n.GetLinkedNode("Failure") diff --git a/pkg/rule_engine/nodes/external_kafka_node.go b/pkg/rule_engine/nodes/external_kafka_node.go index 6bff75a..a235425 100644 --- a/pkg/rule_engine/nodes/external_kafka_node.go +++ b/pkg/rule_engine/nodes/external_kafka_node.go @@ -45,20 +45,20 @@ func (f externalKafkaNodeFactory) Create(id string, meta Metadata) (Node, error) return node, nil } -func (n *externalKafkaNode) Handle(msg message.Message) error { - logrus.Infof("%s handle message '%s'", n.Name(), msg.GetType()) +func (n *externalKafkaNode) Handle(msg *message.Message) error { + logrus.Infof("%s handle message '%s'", n.Name(), msg.MsgType) defer n.KafkaCli.Close() successLabelNode := n.GetLinkedNode("Success") failureLabelNode := n.GetLinkedNode("Failure") value := sarama.ByteEncoder("") if n.KeyPattern == "metadataKey" { - marshal, err := json.Marshal(msg.GetMetadata().GetValues()) + marshal, err := json.Marshal(msg.Metadata) if err != nil { return err } value = marshal } else { - marshal, err := json.Marshal(msg.GetMsg()) + marshal, err := json.Marshal(msg.Msg) if err != nil { return err } diff --git a/pkg/rule_engine/nodes/external_mqtt_node.go b/pkg/rule_engine/nodes/external_mqtt_node.go index f47835c..48527a9 100644 --- a/pkg/rule_engine/nodes/external_mqtt_node.go +++ b/pkg/rule_engine/nodes/external_mqtt_node.go @@ -59,13 +59,13 @@ func (f externalMqttNodeFactory) Create(id string, meta Metadata) (Node, error) return node, nil } -func (n *externalMqttNode) Handle(msg message.Message) error { - logrus.Infof("%s handle message '%s'", n.Name(), msg.GetType()) +func (n *externalMqttNode) Handle(msg *message.Message) error { + logrus.Infof("%s handle message '%s'", n.Name(), msg.MsgType) defer n.MqttCli.Disconnect(1000) successLabelNode := n.GetLinkedNode("Success") failureLabelNode := n.GetLinkedNode("Failure") topic := n.TopicPattern //need fix add msg.metadata in it - sendmqttmsg, err := json.Marshal(msg.GetMsg()) + sendmqttmsg, err := json.Marshal(msg.Msg) if err != nil { return err } diff --git a/pkg/rule_engine/nodes/external_nats_node.go b/pkg/rule_engine/nodes/external_nats_node.go index e52c270..5298f10 100644 --- a/pkg/rule_engine/nodes/external_nats_node.go +++ b/pkg/rule_engine/nodes/external_nats_node.go @@ -35,8 +35,8 @@ func (f externalNatsNodeFactory) Create(id string, meta Metadata) (Node, error) return node, nil } -func (n *externalNatsNode) Handle(msg message.Message) error { - logrus.Infof("%s handle message '%s'", n.Name(), msg.GetType()) +func (n *externalNatsNode) Handle(msg *message.Message) error { + logrus.Infof("%s handle message '%s'", n.Name(), msg.MsgType) defer n.client.Close() successLabelNode := n.GetLinkedNode("Success") failureLabelNode := n.GetLinkedNode("Failure") diff --git a/pkg/rule_engine/nodes/external_restapi_node.go b/pkg/rule_engine/nodes/external_restapi_node.go index cffcd88..525c05c 100644 --- a/pkg/rule_engine/nodes/external_restapi_node.go +++ b/pkg/rule_engine/nodes/external_restapi_node.go @@ -27,8 +27,8 @@ func (f externalRestapiNodeFactory) Create(id string, meta Metadata) (Node, erro return decodePath(meta, node) } -func (n *externalRestapiNode) Handle(msg message.Message) error { - logrus.Infof("%s handle message '%s'", n.Name(), msg.GetType()) +func (n *externalRestapiNode) Handle(msg *message.Message) error { + logrus.Infof("%s handle message '%s'", n.Name(), msg.MsgType) successLableNode := n.GetLinkedNode("Success") failureLableNode := n.GetLinkedNode("Failure") if n.RequestMethod == "GET" { @@ -42,11 +42,11 @@ func (n *externalRestapiNode) Handle(msg message.Message) error { return failureLableNode.Handle(msg) } else { if successLableNode != nil { - metadata := msg.GetMetadata() + metadata := msg.Metadata for key, value := range response { - metadata.SetKeyValue(key, value) + metadata.SetValue(key, value) } - msg.SetMetadata(metadata) + msg.Metadata = metadata return successLableNode.Handle(msg) } } diff --git a/pkg/rule_engine/nodes/external_rule_chain_node.go b/pkg/rule_engine/nodes/external_rule_chain_node.go index c762a78..d099c71 100644 --- a/pkg/rule_engine/nodes/external_rule_chain_node.go +++ b/pkg/rule_engine/nodes/external_rule_chain_node.go @@ -27,8 +27,8 @@ func (f externalRuleChainNodeFactory) Create(id string, meta Metadata) (Node, er return decodePath(meta, node) } -func (n *externalRuleChainNode) Handle(msg message.Message) error { - logrus.Infof("%s handle message '%s'", n.Name(), msg.GetType()) +func (n *externalRuleChainNode) Handle(msg *message.Message) error { + logrus.Infof("%s handle message '%s'", n.Name(), msg.MsgType) data := services.RuleChainModelDao.FindOne(n.RuleId) if data == nil { return errors.New(fmt.Sprintf("节点 %s ,获取规则链失败", n.Name())) diff --git a/pkg/rule_engine/nodes/external_send_email_node.go b/pkg/rule_engine/nodes/external_send_email_node.go index a5b4ede..167b954 100644 --- a/pkg/rule_engine/nodes/external_send_email_node.go +++ b/pkg/rule_engine/nodes/external_send_email_node.go @@ -38,8 +38,8 @@ func (f externalSendEmailNodeFactory) Create(id string, meta Metadata) (Node, er return decodePath(meta, node) } -func (n *externalSendEmailNode) Handle(msg message.Message) error { - logrus.Infof("%s handle message '%s'", n.Name(), msg.GetType()) +func (n *externalSendEmailNode) Handle(msg *message.Message) error { + logrus.Infof("%s handle message '%s'", n.Name(), msg.MsgType) successLabelNode := n.GetLinkedNode("Success") failureLabelNode := n.GetLinkedNode("Failure") @@ -48,7 +48,7 @@ func (n *externalSendEmailNode) Handle(msg message.Message) error { if tos[len(tos)-1] == "" { // 判断切片的最后一个元素是否为空,为空则移除 tos = tos[:len(tos)-1] } - err := n.send(tos, msg) + err := n.send(tos, *msg) if err != nil { if failureLabelNode != nil { return failureLabelNode.Handle(msg) @@ -73,7 +73,7 @@ func (m *externalSendEmailNode) send(to []string, msg message.Message) error { } e.To = to e.Subject = m.Subject - template, err := ParseTemplate(m.Body, msg.GetMetadata().GetValues()) + template, err := ParseTemplate(m.Body, msg.Metadata) if err != nil { return err } diff --git a/pkg/rule_engine/nodes/external_send_sms_node.go b/pkg/rule_engine/nodes/external_send_sms_node.go index ff854dc..e0a6e5a 100644 --- a/pkg/rule_engine/nodes/external_send_sms_node.go +++ b/pkg/rule_engine/nodes/external_send_sms_node.go @@ -27,8 +27,8 @@ func (f externalSendSmsNodeFactory) Create(id string, meta Metadata) (Node, erro return decodePath(meta, node) } -func (n *externalSendSmsNode) Handle(msg message.Message) error { - logrus.Infof("%s handle message '%s'", n.Name(), msg.GetType()) +func (n *externalSendSmsNode) Handle(msg *message.Message) error { + logrus.Infof("%s handle message '%s'", n.Name(), msg.MsgType) successLabelNode := n.GetLinkedNode("Success") //failureLabelNode := n.GetLinkedNode("Failure") diff --git a/pkg/rule_engine/nodes/external_wechat_node.go b/pkg/rule_engine/nodes/external_wechat_node.go index 0600ae6..07cf75f 100644 --- a/pkg/rule_engine/nodes/external_wechat_node.go +++ b/pkg/rule_engine/nodes/external_wechat_node.go @@ -28,8 +28,8 @@ func (f externalWechatNodeFactory) Create(id string, meta Metadata) (Node, error return decodePath(meta, node) } -func (n *externalWechatNode) Handle(msg message.Message) error { - logrus.Infof("%s handle message '%s'", n.Name(), msg.GetType()) +func (n *externalWechatNode) Handle(msg *message.Message) error { + logrus.Infof("%s handle message '%s'", n.Name(), msg.MsgType) successLabelNode := n.GetLinkedNode("Success") failureLabelNode := n.GetLinkedNode("Failure") diff --git a/pkg/rule_engine/nodes/factory.go b/pkg/rule_engine/nodes/factory.go index 8360126..042b241 100644 --- a/pkg/rule_engine/nodes/factory.go +++ b/pkg/rule_engine/nodes/factory.go @@ -22,10 +22,10 @@ type Factory interface { } var ( - // allNodeFactories hold all node's factory + // 所有节点对应关系 allNodeFactories map[string]Factory = make(map[string]Factory) - // allNodeCategories hold node's metadata by category + // 所有节点的类型MAP allNodeCategories map[string][]map[string]interface{} = make(map[string][]map[string]interface{}) allCategories []map[string]interface{} = make([]map[string]interface{}, 0) ) @@ -48,7 +48,7 @@ func NewNode(nodeType string, id string, meta Metadata) (Node, error) { return nil, fmt.Errorf("invalid node type '%s'", nodeType) } -// GetCategoryNodes return specified category's all nodes +// GetCategoryNodes 获取所有分类节点 func GetCategoryNodes() map[string][]map[string]interface{} { return allNodeCategories } func GetCategory() []map[string]interface{} { return allCategories } diff --git a/pkg/rule_engine/nodes/filter_device_type_switch_node.go b/pkg/rule_engine/nodes/filter_device_type_switch_node.go index ea38b28..b0b821e 100644 --- a/pkg/rule_engine/nodes/filter_device_type_switch_node.go +++ b/pkg/rule_engine/nodes/filter_device_type_switch_node.go @@ -5,8 +5,8 @@ import ( "pandax/pkg/rule_engine/message" ) -//检查关联关系 -//该消息来自与哪个实体或到那个实体 +// 检查关联关系 +// 该消息来自与哪个实体或到那个实体 type deviceTypeSwitchNode struct { bareNode } @@ -25,18 +25,18 @@ func (f deviceTypeSwitchNodeFactory) Create(id string, meta Metadata) (Node, err return decodePath(meta, node) } -func (n *deviceTypeSwitchNode) Handle(msg message.Message) error { - logrus.Infof("%s handle message '%s'", n.Name(), msg.GetType()) +func (n *deviceTypeSwitchNode) Handle(msg *message.Message) error { + logrus.Infof("%s handle message '%s'", n.Name(), msg.MsgType) deviceLabelNode := n.GetLinkedNode(message.DEVICE) gatewayLabelNode := n.GetLinkedNode(message.GATEWAY) - if msg.GetMetadata().GetKeyValue("deviceType") == message.DEVICE { + if msg.Metadata.GetValue("deviceType").(string) == message.DEVICE { if deviceLabelNode != nil { return deviceLabelNode.Handle(msg) } } - if msg.GetMetadata().GetKeyValue("deviceType") == message.GATEWAY { + if msg.Metadata.GetValue("deviceType").(string) == message.GATEWAY { if gatewayLabelNode != nil { return gatewayLabelNode.Handle(msg) } diff --git a/pkg/rule_engine/nodes/filter_message_type_node.go b/pkg/rule_engine/nodes/filter_message_type_node.go index 31bfa5c..30b2ac2 100644 --- a/pkg/rule_engine/nodes/filter_message_type_node.go +++ b/pkg/rule_engine/nodes/filter_message_type_node.go @@ -24,12 +24,12 @@ func (f messageTypeFilterNodeFactory) Create(id string, meta Metadata) (Node, er return decodePath(meta, node) } -func (n *messageTypeFilterNode) Handle(msg message.Message) error { - logrus.Infof("%s handle message '%s'", n.Name(), msg.GetType()) +func (n *messageTypeFilterNode) Handle(msg *message.Message) error { + logrus.Infof("%s handle message '%s'", n.Name(), msg.MsgType) trueLabelNode := n.GetLinkedNode("True") falseLabelNode := n.GetLinkedNode("False") - messageType := msg.GetType() + messageType := msg.MsgType for _, filterType := range n.MessageTypes { if filterType == messageType && trueLabelNode != nil { diff --git a/pkg/rule_engine/nodes/filter_message_type_switch_node.go b/pkg/rule_engine/nodes/filter_message_type_switch_node.go index 18a2ce6..0797117 100644 --- a/pkg/rule_engine/nodes/filter_message_type_switch_node.go +++ b/pkg/rule_engine/nodes/filter_message_type_switch_node.go @@ -32,11 +32,11 @@ func (f messageTypeSwitchNodeFactory) Create(id string, meta Metadata) (Node, er return decodePath(meta, node) } -func (n *messageTypeSwitchNode) Handle(msg message.Message) error { - logrus.Infof("%s handle message '%s'", n.Name(), msg.GetType()) - +func (n *messageTypeSwitchNode) Handle(msg *message.Message) error { + logrus.Infof("%s handle message '%s'", n.Name(), msg.MsgType) + msg.Metadata = map[string]interface{}{"AA": "BB", "deviceName": "fff"} nodes := n.GetLinkedNodes() - messageType := msg.GetType() + messageType := msg.MsgType for label, node := range nodes { if messageType == label { return node.Handle(msg) diff --git a/pkg/rule_engine/nodes/filter_script_node.go b/pkg/rule_engine/nodes/filter_script_node.go index aa77322..369b48a 100644 --- a/pkg/rule_engine/nodes/filter_script_node.go +++ b/pkg/rule_engine/nodes/filter_script_node.go @@ -24,12 +24,12 @@ func (f scriptFilterNodeFactory) Create(id string, meta Metadata) (Node, error) return decodePath(meta, node) } -func (n *scriptFilterNode) Handle(msg message.Message) error { - logrus.Infof("%s handle message '%s'", n.Name(), msg.GetType()) +func (n *scriptFilterNode) Handle(msg *message.Message) error { + logrus.Infof("%s handle message '%s'", n.Name(), msg.MsgType) trueLabelNode := n.GetLinkedNode("True") falseLabelNode := n.GetLinkedNode("False") - scriptEngine := NewScriptEngine(msg, "Filter", n.Script) + scriptEngine := NewScriptEngine(*msg, "Filter", n.Script) isTrue, error := scriptEngine.ScriptOnFilter() if isTrue == true && error == nil && trueLabelNode != nil { return trueLabelNode.Handle(msg) diff --git a/pkg/rule_engine/nodes/filter_switch_node.go b/pkg/rule_engine/nodes/filter_switch_node.go index a4f0181..b128ed1 100644 --- a/pkg/rule_engine/nodes/filter_switch_node.go +++ b/pkg/rule_engine/nodes/filter_switch_node.go @@ -34,10 +34,10 @@ func (f switchFilterNodeFactory) Create(id string, meta Metadata) (Node, error) return decodePath(meta, node) } -func (n *switchFilterNode) Handle(msg message.Message) error { - logrus.Infof("%s handle message '%s'", n.Name(), msg.GetType()) +func (n *switchFilterNode) Handle(msg *message.Message) error { + logrus.Infof("%s handle message '%s'", n.Name(), msg.MsgType) - scriptEngine := NewScriptEngine(msg, "Switch", n.Script) + scriptEngine := NewScriptEngine(*msg, "Switch", n.Script) SwitchResults, err := scriptEngine.ScriptOnSwitch() if err != nil { return err diff --git a/pkg/rule_engine/nodes/input_node.go b/pkg/rule_engine/nodes/input_node.go index 89f794e..34e1211 100644 --- a/pkg/rule_engine/nodes/input_node.go +++ b/pkg/rule_engine/nodes/input_node.go @@ -23,8 +23,8 @@ func (f inputNodeFactory) Create(id string, meta Metadata) (Node, error) { return node, nil } -func (n *inputNode) Handle(msg message.Message) error { - logrus.Infof("%s handle message '%s'", n.Name(), msg.GetType()) +func (n *inputNode) Handle(msg *message.Message) error { + logrus.Infof("%s handle message '%s'", n.Name(), msg.MsgType) nodes := n.GetLinkedNodes() for _, node := range nodes { diff --git a/pkg/rule_engine/nodes/metadata.go b/pkg/rule_engine/nodes/metadata.go index 84c38c6..a723021 100644 --- a/pkg/rule_engine/nodes/metadata.go +++ b/pkg/rule_engine/nodes/metadata.go @@ -10,6 +10,7 @@ const ( NODE_CONFIG_ORIGINATOR_TYPE_KEY = "originatorTypeKey" ) +// Metadata 前端 参数 Properties type Metadata interface { Keys() []string With(key string, val interface{}) Metadata diff --git a/pkg/rule_engine/nodes/node.go b/pkg/rule_engine/nodes/node.go index ed12f8f..a290850 100644 --- a/pkg/rule_engine/nodes/node.go +++ b/pkg/rule_engine/nodes/node.go @@ -13,7 +13,7 @@ type Node interface { Id() string Metadata() Metadata MustLabels() []string - Handle(message.Message) error + Handle(*message.Message) error AddLinkedNode(label string, node Node) GetLinkedNode(label string) Node @@ -55,7 +55,7 @@ func (n *bareNode) GetLinkedNodes() map[string]Node { return n.nodes } func (n *bareNode) Metadata() Metadata { return n.meta } -func (n *bareNode) Handle(message.Message) error { return errors.New("not implemented") } +func (n *bareNode) Handle(*message.Message) error { return errors.New("not implemented") } func decodePath(meta Metadata, n Node) (Node, error) { if err := meta.DecodePath(n); err != nil { diff --git a/pkg/rule_engine/nodes/script_engine.go b/pkg/rule_engine/nodes/script_engine.go index 52a4537..63ae1ae 100644 --- a/pkg/rule_engine/nodes/script_engine.go +++ b/pkg/rule_engine/nodes/script_engine.go @@ -8,7 +8,7 @@ import ( ) type ScriptEngine interface { - ScriptOnMessage() (message.Message, error) + ScriptOnMessage() (*message.Message, error) ScriptOnSwitch() ([]string, error) ScriptOnFilter() (bool, error) ScriptToString() (string, error) @@ -29,7 +29,7 @@ func NewScriptEngine(msg message.Message, fun string, script string) ScriptEngin } } -func (bse *baseScriptEngine) ScriptOnMessage() (message.Message, error) { +func (bse *baseScriptEngine) ScriptOnMessage() (*message.Message, error) { msg := bse.Msg vm := goja.New() _, err := vm.RunString(bse.Script) @@ -43,11 +43,11 @@ func (bse *baseScriptEngine) ScriptOnMessage() (message.Message, error) { logrus.Info("Js函数映射到 Go 函数失败!") return nil, err } - datas := fn(msg.GetMsg(), msg.GetMetadata().GetValues(), msg.GetType()) - msg.SetMsg(datas["msg"].(map[string]interface{})) - msg.SetMetadata(message.NewDefaultMetadata(datas["metadata"].(map[string]interface{}))) - msg.SetType(datas["msgType"].(string)) - return msg, nil + datas := fn(msg.Msg, msg.Metadata, msg.MsgType) + msg.Msg = datas["msg"].(map[string]interface{}) + msg.Metadata = datas["metadata"].(map[string]interface{}) + msg.MsgType = datas["msgType"].(string) + return &msg, nil } func (bse *baseScriptEngine) ScriptOnSwitch() ([]string, error) { @@ -64,7 +64,7 @@ func (bse *baseScriptEngine) ScriptOnSwitch() ([]string, error) { logrus.Info("Js函数映射到 Go 函数失败!") return nil, err } - datas := fn(msg.GetMsg(), msg.GetMetadata().GetValues(), msg.GetType()) + datas := fn(msg.Msg, msg.Metadata, msg.MsgType) return datas, nil } @@ -82,7 +82,7 @@ func (bse *baseScriptEngine) ScriptOnFilter() (bool, error) { logrus.Info("Js函数映射到 Go 函数失败!") return false, err } - datas := fn(msg.GetMsg(), msg.GetMetadata().GetValues(), msg.GetType()) + datas := fn(msg.Msg, msg.Metadata, msg.MsgType) return datas, nil } @@ -100,7 +100,7 @@ func (bse *baseScriptEngine) ScriptToString() (string, error) { logrus.Info("Js函数映射到 Go 函数失败!") return "", err } - data := fn(msg.GetMsg(), msg.GetMetadata().GetValues(), msg.GetType()) + data := fn(msg.Msg, msg.Metadata, msg.MsgType) return data, nil } @@ -118,6 +118,6 @@ func (bse *baseScriptEngine) ScriptGenerate() (map[string]interface{}, error) { logrus.Info("Js函数映射到 Go 函数失败!") return nil, err } - datas := fn(msg.GetMsg(), msg.GetMetadata().GetValues(), msg.GetType()) + datas := fn(msg.Msg, msg.Metadata, msg.MsgType) return datas, nil } diff --git a/pkg/rule_engine/nodes/transform_delete_key_node.go b/pkg/rule_engine/nodes/transform_delete_key_node.go index f20e418..7e45be5 100644 --- a/pkg/rule_engine/nodes/transform_delete_key_node.go +++ b/pkg/rule_engine/nodes/transform_delete_key_node.go @@ -23,27 +23,26 @@ func (f transformDeleteKeyNodeFactory) Create(id string, meta Metadata) (Node, e return decodePath(meta, node) } -func (n *transformDeleteKeyNode) Handle(msg message.Message) error { - logrus.Infof("%s handle message '%s'", n.Name(), msg.GetType()) +func (n *transformDeleteKeyNode) Handle(msg *message.Message) error { + logrus.Infof("%s handle message '%s'", n.Name(), msg.MsgType) successLabelNode := n.GetLinkedNode("Success") failureLabelNode := n.GetLinkedNode("Failure") keys := strings.Split(n.Keys, ",") if n.FormType == "msg" { - data := msg.GetMsg() + data := msg.Msg for _, key := range keys { if _, found := data[key]; found { delete(data, key) - msg.SetMsg(data) + msg.Msg = data } } } else if n.FormType == "metadata" { - data := msg.GetMetadata() + data := msg.Metadata for _, key := range keys { - if data.GetKeyValue(key) != nil { - values := data.GetValues() - delete(values, key) - msg.SetMetadata(message.NewDefaultMetadata(values)) + if data.GetValue(key) != nil { + delete(data, key) + msg.Metadata = data } } } else { diff --git a/pkg/rule_engine/nodes/transform_rename_key_node.go b/pkg/rule_engine/nodes/transform_rename_key_node.go index ebbc62e..6730d7e 100644 --- a/pkg/rule_engine/nodes/transform_rename_key_node.go +++ b/pkg/rule_engine/nodes/transform_rename_key_node.go @@ -26,28 +26,27 @@ func (f transformRenameKeyNodeFactory) Create(id string, meta Metadata) (Node, e return decodePath(meta, node) } -func (n *transformRenameKeyNode) Handle(msg message.Message) error { - logrus.Infof("%s handle message '%s'", n.Name(), msg.GetType()) +func (n *transformRenameKeyNode) Handle(msg *message.Message) error { + logrus.Infof("%s handle message '%s'", n.Name(), msg.MsgType) successLabelNode := n.GetLinkedNode("Success") failureLabelNode := n.GetLinkedNode("Failure") if n.FormType == "msg" { - data := msg.GetMsg() + data := msg.Msg for _, key := range n.Keys { if _, found := data[key.OldName]; found { data[key.NewName] = data[key.OldName] delete(data, key.OldName) - msg.SetMsg(data) + msg.Msg = data } } } else if n.FormType == "metadata" { - data := msg.GetMetadata() + data := msg.Metadata for _, key := range n.Keys { - if data.GetKeyValue(key.OldName) != nil { - values := data.GetValues() - values[key.NewName] = values[key.OldName] - delete(values, key.OldName) - msg.SetMetadata(message.NewDefaultMetadata(values)) + if data.GetValue(key.OldName) != nil { + data[key.NewName] = data[key.OldName] + delete(data, key.OldName) + msg.Metadata = data } } } else { diff --git a/pkg/rule_engine/nodes/transform_script_node.go b/pkg/rule_engine/nodes/transform_script_node.go index ed86083..ea12a73 100644 --- a/pkg/rule_engine/nodes/transform_script_node.go +++ b/pkg/rule_engine/nodes/transform_script_node.go @@ -22,13 +22,13 @@ func (f transformScriptNodeFactory) Create(id string, meta Metadata) (Node, erro return decodePath(meta, node) } -func (n *transformScriptNode) Handle(msg message.Message) error { - logrus.Infof("%s handle message '%s'", n.Name(), msg.GetType()) +func (n *transformScriptNode) Handle(msg *message.Message) error { + logrus.Infof("%s handle message '%s'", n.Name(), msg.MsgType) successLabelNode := n.GetLinkedNode("Success") failureLabelNode := n.GetLinkedNode("Failure") - scriptEngine := NewScriptEngine(msg, "Transform", n.Script) + scriptEngine := NewScriptEngine(*msg, "Transform", n.Script) newMessage, err := scriptEngine.ScriptOnMessage() if err != nil { if failureLabelNode != nil {