diff --git a/pkg/rule_engine/message/message.go b/pkg/rule_engine/message/message.go index efdab97..b2c8a34 100644 --- a/pkg/rule_engine/message/message.go +++ b/pkg/rule_engine/message/message.go @@ -2,9 +2,11 @@ package message import ( "encoding/json" - "github.com/google/uuid" - "github.com/sirupsen/logrus" "time" + + "github.com/google/uuid" + "github.com/kakuilan/kgo" + "github.com/sirupsen/logrus" ) // 消息类型 @@ -14,7 +16,6 @@ const ( RpcRequestFromDevice = "RpcRequestFromDevice" RpcRequestToDevice = "RpcRequestToDevice" UpEventMes = "Event" - AlarmMes = "Alarm" RowMes = "Row" TelemetryMes = "Telemetry" AttributesMes = "Attributes" @@ -117,6 +118,26 @@ func (meta *Metadata) GetValue(key string) any { } return (*meta)[key] } +func (meta *Metadata) GetStringValue(key string) string { + if _, found := (*meta)[key]; !found { + return "" + } + return kgo.KConv.ToStr((*meta)[key]) +} + +func (meta *Metadata) GetIntValue(key string) int { + if _, found := (*meta)[key]; !found { + return 0 + } + return kgo.KConv.ToInt((*meta)[key]) +} + +func (meta *Metadata) GetFloat64Value(key string) float64 { + if _, found := (*meta)[key]; !found { + return 0 + } + return kgo.KConv.ToFloat((*meta)[key]) +} func (meta *Metadata) Has(key string) bool { _, ok := (*meta)[key] diff --git a/pkg/rule_engine/nodes/action_clear_alarm_node.go b/pkg/rule_engine/nodes/action_clear_alarm_node.go index b1b4dfa..b8c351b 100644 --- a/pkg/rule_engine/nodes/action_clear_alarm_node.go +++ b/pkg/rule_engine/nodes/action_clear_alarm_node.go @@ -2,6 +2,7 @@ package nodes import ( "encoding/json" + "errors" "pandax/apps/device/services" "pandax/pkg/global" "pandax/pkg/rule_engine/message" @@ -32,8 +33,14 @@ func (n *clearAlarmNode) Handle(msg *message.Message) error { cleared := n.GetLinkedNode("Cleared") failure := n.GetLinkedNode("Failure") + var deviceId string + if did, ok := msg.Metadata.GetValue("deviceId").(string); ok { + deviceId = did + } else { + return errors.New("元数据中为获取到设备ID") + } var err error - alarm, err := services.DeviceAlarmModelDao.FindOneByType(msg.Metadata.GetValue("deviceId").(string), n.AlarmType, "0") + alarm, err := services.DeviceAlarmModelDao.FindOneByType(deviceId, n.AlarmType, "0") if err == nil { alarm.State = global.CLEARED marshal, _ := json.Marshal(msg.Msg) diff --git a/pkg/rule_engine/nodes/action_create_alarm_node.go b/pkg/rule_engine/nodes/action_create_alarm_node.go index 8c6a828..7e39754 100644 --- a/pkg/rule_engine/nodes/action_create_alarm_node.go +++ b/pkg/rule_engine/nodes/action_create_alarm_node.go @@ -2,9 +2,10 @@ package nodes import ( "encoding/json" - "github.com/PandaXGO/PandaKit/utils" + "errors" "pandax/apps/device/entity" "pandax/apps/device/services" + "pandax/kit/utils" "pandax/pkg/global" "pandax/pkg/rule_engine/message" "time" @@ -34,8 +35,14 @@ func (n *createAlarmNode) Handle(msg *message.Message) error { created := n.GetLinkedNode("Created") updated := n.GetLinkedNode("Updated") failure := n.GetLinkedNode("Failure") + var deviceId string + if did, ok := msg.Metadata.GetValue("deviceId").(string); ok { + deviceId = did + } else { + return errors.New("元数据中为获取到设备ID") + } var err error - alarm, err := services.DeviceAlarmModelDao.FindOneByType(msg.Metadata.GetValue("deviceId").(string), n.AlarmType, "0") + alarm, err := services.DeviceAlarmModelDao.FindOneByType(deviceId, n.AlarmType, "0") if err == nil { marshal, _ := json.Marshal(msg.Msg) alarm.Details = string(marshal) @@ -48,16 +55,16 @@ func (n *createAlarmNode) Handle(msg *message.Message) error { } } else { alarm = &entity.DeviceAlarm{} - alarm.Id = utils.GenerateID("") - alarm.DeviceId = msg.Metadata.GetValue("deviceId").(string) - alarm.ProductId = msg.Metadata.GetValue("productId").(string) - alarm.Name = msg.Metadata.GetValue("deviceName").(string) + alarm.Id = utils.GenerateID() + alarm.DeviceId = msg.Metadata.GetStringValue("deviceId") + alarm.ProductId = msg.Metadata.GetStringValue("productId") + alarm.Name = msg.Metadata.GetStringValue("deviceName") alarm.Level = n.AlarmSeverity alarm.State = global.ALARMING alarm.Type = n.AlarmType alarm.Time = time.Now() - alarm.OrgId = msg.Metadata.GetValue("orgId").(int64) - alarm.Owner = msg.Metadata.GetValue("owner").(string) + alarm.OrgId = int64(msg.Metadata.GetIntValue("orgId")) + alarm.Owner = msg.Metadata.GetStringValue("owner") marshal, _ := json.Marshal(msg.Msg) alarm.Details = string(marshal) err = services.DeviceAlarmModelDao.Insert(*alarm) diff --git a/pkg/rule_engine/nodes/action_log_node.go b/pkg/rule_engine/nodes/action_log_node.go index 20d7e17..1ca2b6e 100644 --- a/pkg/rule_engine/nodes/action_log_node.go +++ b/pkg/rule_engine/nodes/action_log_node.go @@ -39,10 +39,10 @@ func (n *logNode) Handle(msg *message.Message) error { services.RuleChainMsgLogModelDao.Insert(entity.RuleChainMsgLog{ MessageId: msg.Id, MsgType: msg.MsgType, - DeviceId: msg.Metadata["deviceId"].(string), - OrgId: msg.Metadata["orgId"].(int64), - Owner: msg.Metadata["owner"].(string), - DeviceName: msg.Metadata["deviceName"].(string), + DeviceId: msg.Metadata.GetStringValue("deviceId"), + OrgId: int64(msg.Metadata.GetIntValue("orgId")), + Owner: msg.Metadata.GetStringValue("owner"), + DeviceName: msg.Metadata.GetStringValue("deviceName"), Ts: msg.Ts, Content: logMessage, }) diff --git a/pkg/rule_engine/nodes/action_rpc_request_from_device_node.go b/pkg/rule_engine/nodes/action_rpc_request_from_device_node.go index adc0854..85ffec2 100644 --- a/pkg/rule_engine/nodes/action_rpc_request_from_device_node.go +++ b/pkg/rule_engine/nodes/action_rpc_request_from_device_node.go @@ -2,11 +2,11 @@ package nodes import ( "errors" - "github.com/PandaXGO/PandaKit/utils" "pandax/apps/device/services" "pandax/iothub/client/mqttclient" "pandax/iothub/client/tcpclient" "pandax/iothub/client/udpclient" + "pandax/kit/utils" devicerpc "pandax/pkg/device_rpc" "pandax/pkg/global" "pandax/pkg/rule_engine/message" @@ -41,10 +41,20 @@ func (n *rpcRequestFromDeviceNode) Handle(msg *message.Message) error { return errors.New("指令请求格式错误") } + var deviceId string + if did, ok := msg.Metadata.GetValue("deviceId").(string); ok { + deviceId = did + } else { + return errors.New("元数据中为获取到设备ID") + } var rpcp = devicerpc.RpcPayload{ - Method: msg.Msg.GetValue("method").(string), Params: msg.Msg.GetValue("params"), } + if method, ok := msg.Metadata.GetValue("method").(string); ok { + rpcp.Method = method + } else { + return errors.New("指令方法格式错误") + } var err error // 指令下发响应 if rpcp.Method == "cmdResp" { @@ -63,21 +73,19 @@ func (n *rpcRequestFromDeviceNode) Handle(msg *message.Message) error { } // 判断设备协议,根据不通协议,发送不通内容 deviceProtocol := global.MQTTProtocol - if msg.Metadata.GetValue("deviceProtocol") != nil && msg.Metadata.GetValue("deviceProtocol").(string) != "" { + if msg.Metadata.GetValue("deviceProtocol") != nil && msg.Metadata.GetStringValue("deviceProtocol") != "" { deviceProtocol = msg.Metadata.GetValue("deviceProtocol").(string) } - deviceId := msg.Metadata.GetValue("deviceId").(string) if deviceProtocol == global.MQTTProtocol || deviceProtocol == global.CoAPProtocol || deviceProtocol == global.LwM2MProtocol { rpc := &mqttclient.RpcRequest{} - RequestId := n.RequestId - if RequestId == "" { - if msg.Metadata.GetValue("requestId") == nil { - rpc.RequestId = utils.GenerateID("") + if n.RequestId == "" { + if msg.Metadata.GetStringValue("requestId") == "" { + rpc.RequestId = utils.GenerateID() } else { - rpc.RequestId = msg.Metadata.GetValue("requestId").(string) + rpc.RequestId = msg.Metadata.GetStringValue("requestId") } } else { - rpc.RequestId = RequestId + rpc.RequestId = n.RequestId } err = rpc.Pub(deviceId, result) } diff --git a/pkg/rule_engine/nodes/action_rpc_request_to_device_node.go b/pkg/rule_engine/nodes/action_rpc_request_to_device_node.go index 38d633f..9fb197a 100644 --- a/pkg/rule_engine/nodes/action_rpc_request_to_device_node.go +++ b/pkg/rule_engine/nodes/action_rpc_request_to_device_node.go @@ -3,12 +3,12 @@ package nodes import ( "encoding/json" "errors" - "github.com/PandaXGO/PandaKit/utils" "pandax/apps/device/entity" "pandax/apps/device/services" "pandax/iothub/client/mqttclient" "pandax/iothub/client/tcpclient" "pandax/iothub/client/udpclient" + "pandax/kit/utils" "pandax/pkg/global" "pandax/pkg/global/model" "pandax/pkg/rule_engine/message" @@ -41,21 +41,32 @@ func (n *rpcRequestToDeviceNode) Handle(msg *message.Message) error { if msg.Msg.GetValue("method") == nil || msg.Msg.GetValue("params") == nil { return errors.New("指令下发格式错误") } - deviceId := msg.Metadata.GetValue("deviceId").(string) + var deviceId string + if did, ok := msg.Metadata.GetValue("deviceId").(string); ok { + deviceId = did + } else { + return errors.New("元数据中为获取到设备ID") + } // 创建请求格式 var datas = model.RpcPayload{ - Method: msg.Msg.GetValue("method").(string), Params: msg.Msg.GetValue("params"), } + if method, ok := msg.Metadata.GetValue("method").(string); ok { + datas.Method = method + } else { + return errors.New("指令方法格式错误") + } payload, _ := json.Marshal(datas) // 构建指令记录 var data entity.DeviceCmdLog - data.Id = utils.GenerateID("") + data.Id = utils.GenerateID() data.DeviceId = deviceId data.CmdName = datas.Method data.CmdContent = kgo.KConv.ToStr(datas.Params) - data.Mode = msg.Metadata.GetValue("mode").(string) + if mode, ok := msg.Metadata.GetValue("mode").(string); ok { + data.Mode = mode + } data.State = "2" data.RequestTime = time.Now().Format("2006-01-02 15:04:05") @@ -65,8 +76,8 @@ func (n *rpcRequestToDeviceNode) Handle(msg *message.Message) error { } // 判断设备协议,根据不通协议,发送不通内容 deviceProtocol := global.MQTTProtocol - if msg.Metadata.GetValue("deviceProtocol") != nil && msg.Metadata.GetValue("deviceProtocol").(string) != "" { - deviceProtocol = msg.Metadata.GetValue("deviceProtocol").(string) + if msg.Metadata.GetValue("deviceProtocol") != nil && msg.Metadata.GetStringValue("deviceProtocol") != "" { + deviceProtocol = msg.Metadata.GetStringValue("deviceProtocol") } var err error if deviceProtocol == global.MQTTProtocol || deviceProtocol == global.CoAPProtocol || deviceProtocol == global.LwM2MProtocol { diff --git a/pkg/rule_engine/nodes/action_save_attributes_node.go b/pkg/rule_engine/nodes/action_save_attributes_node.go index 20d5b5f..111bf4c 100644 --- a/pkg/rule_engine/nodes/action_save_attributes_node.go +++ b/pkg/rule_engine/nodes/action_save_attributes_node.go @@ -1,6 +1,7 @@ package nodes import ( + "errors" "pandax/pkg/global" "pandax/pkg/rule_engine/message" ) @@ -33,7 +34,12 @@ func (n *saveAttributesNode) Handle(msg *message.Message) error { } }*/ //deviceId := msg.GetMetadata().GetValues()["deviceId"].(string) - deviceName := msg.Metadata["deviceName"].(string) + var deviceName string + if dn, ok := msg.Metadata.GetValue("deviceName").(string); ok { + deviceName = dn + } else { + return errors.New("元数据中为获取到设备ID") + } err := global.TdDb.InsertDevice(deviceName+"_attributes", msg.Msg) if err != nil { n.Debug(msg, message.DEBUGOUT, err.Error()) diff --git a/pkg/rule_engine/nodes/action_save_timeseries_node.go b/pkg/rule_engine/nodes/action_save_timeseries_node.go index 35fa96d..448d8e9 100644 --- a/pkg/rule_engine/nodes/action_save_timeseries_node.go +++ b/pkg/rule_engine/nodes/action_save_timeseries_node.go @@ -1,6 +1,7 @@ package nodes import ( + "errors" "pandax/pkg/global" "pandax/pkg/rule_engine/message" ) @@ -33,7 +34,12 @@ func (n *saveTimeSeriesNode) Handle(msg *message.Message) error { return nil } }*/ - deviceName := msg.Metadata["deviceName"].(string) + var deviceName string + if dn, ok := msg.Metadata.GetValue("deviceName").(string); ok { + deviceName = dn + } else { + return errors.New("元数据中为获取到设备ID") + } err := global.TdDb.InsertDevice(deviceName+"_telemetry", msg.Msg) if err != nil { n.Debug(msg, message.DEBUGOUT, err.Error()) diff --git a/pkg/rule_engine/nodes/external_ding_node.go b/pkg/rule_engine/nodes/external_ding_node.go index 1db7ce7..137f227 100644 --- a/pkg/rule_engine/nodes/external_ding_node.go +++ b/pkg/rule_engine/nodes/external_ding_node.go @@ -6,8 +6,8 @@ import ( "encoding/base64" "encoding/json" "fmt" - "github.com/PandaXGO/PandaKit/httpclient" "net/url" + "pandax/kit/httpclient" "pandax/pkg/rule_engine/message" "time" ) diff --git a/pkg/rule_engine/nodes/external_restapi_node.go b/pkg/rule_engine/nodes/external_restapi_node.go index 90a6f95..7f870e7 100644 --- a/pkg/rule_engine/nodes/external_restapi_node.go +++ b/pkg/rule_engine/nodes/external_restapi_node.go @@ -3,7 +3,7 @@ package nodes import ( "encoding/json" "errors" - "github.com/PandaXGO/PandaKit/httpclient" + "pandax/kit/httpclient" "pandax/pkg/rule_engine/message" ) diff --git a/pkg/rule_engine/nodes/external_wechat_node.go b/pkg/rule_engine/nodes/external_wechat_node.go index cdb8841..09a9b8c 100644 --- a/pkg/rule_engine/nodes/external_wechat_node.go +++ b/pkg/rule_engine/nodes/external_wechat_node.go @@ -2,7 +2,7 @@ package nodes import ( "encoding/json" - "github.com/PandaXGO/PandaKit/httpclient" + "pandax/kit/httpclient" "pandax/pkg/rule_engine/message" ) diff --git a/pkg/rule_engine/nodes/filter_device_type_switch_node.go b/pkg/rule_engine/nodes/filter_device_type_switch_node.go index aa82b38..dc8e00e 100644 --- a/pkg/rule_engine/nodes/filter_device_type_switch_node.go +++ b/pkg/rule_engine/nodes/filter_device_type_switch_node.go @@ -29,8 +29,7 @@ func (n *deviceTypeSwitchNode) Handle(msg *message.Message) error { deviceLabelNode := n.GetLinkedNode(message.DEVICE) gatewayLabelNode := n.GetLinkedNode(message.GATEWAY) - - deviceType := msg.Metadata.GetValue("deviceType").(string) + deviceType := msg.Metadata.GetStringValue("deviceType") if deviceType == message.DEVICE { if deviceLabelNode != nil { n.Debug(msg, message.DEBUGOUT, "") diff --git a/pkg/rule_engine/nodes/filter_message_type_switch_node.go b/pkg/rule_engine/nodes/filter_message_type_switch_node.go index f2757f0..cb6ce38 100644 --- a/pkg/rule_engine/nodes/filter_message_type_switch_node.go +++ b/pkg/rule_engine/nodes/filter_message_type_switch_node.go @@ -18,10 +18,8 @@ func (f messageTypeSwitchNodeFactory) Labels() []string { message.TelemetryMes, message.RpcRequestFromDevice, message.RpcRequestToDevice, - message.AlarmMes, message.UpEventMes, message.ConnectMes, - message.ConnectMes, message.DisConnectMes, } }