From 47fa50d59f9535a1c0fc812a7e5a5f35ccbcc988 Mon Sep 17 00:00:00 2001 From: PandaX <18610165312@163.com> Date: Thu, 26 Oct 2023 09:00:38 +0800 Subject: [PATCH] =?UTF-8?q?[feat]udp=E5=8D=8F=E8=AE=AE=E6=95=B0=E6=8D=AE?= =?UTF-8?q?=E4=B8=8A=E4=BC=A0=E8=A7=A3=E6=9E=90?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/device/api/device.go | 31 ++++++-- iothub/client/updclient/udp.go | 13 +++- iothub/hook_message_work/hook_message_work.go | 26 +------ iothub/server/emqxserver/hook.go | 3 +- iothub/server/udpserver/hook.go | 73 ++++++++++++++----- 5 files changed, 93 insertions(+), 53 deletions(-) diff --git a/apps/device/api/device.go b/apps/device/api/device.go index 87bc243..bf6e30a 100644 --- a/apps/device/api/device.go +++ b/apps/device/api/device.go @@ -105,20 +105,37 @@ func (p *DeviceApi) GetDeviceStatus(rc *restfulx.ReqCtx) { rs = getDevice.TelemetryPoints } for _, tel := range *template { + if _, ok := rs[tel.Key]; !ok { + continue + } sdv := entity.DeviceStatusVo{ Name: tel.Name, Key: tel.Key, Type: tel.Type, Define: tel.Define, } - if v, ok := rs[tel.Key]; ok { - sdv.Value = v.Value - sdv.Time = v.UpdatedAt - } else { - if classify == global.TslAttributesType { - if value, ok := tel.Define["default_value"]; ok { - sdv.Value = value + if classify == global.TslTelemetryType { + value := rs[tel.Key].Value + // tsl转化 + /*var tslValue tsl.ValueType + err := tool.MapToStruct(tel.Define, &tslValue) + if err != nil { + value = rs[tel.Key].Value + } else { + tslValue.Type = tel.Type + // 此处rs[tel.Key].Value 变成字符串类型了 + value = tslValue.ConvertValue(rs[tel.Key].Value) + log.Println("value", value) + if value == nil { + value = rs[tel.Key] } + }*/ + sdv.Time = rs[tel.Key].UpdatedAt + sdv.Value = value + } + if classify == global.TslAttributesType { + if value, ok := tel.Define["default_value"]; ok { + sdv.Value = value } } res = append(res, sdv) diff --git a/iothub/client/updclient/udp.go b/iothub/client/updclient/udp.go index 99a0d8c..7eba940 100644 --- a/iothub/client/updclient/udp.go +++ b/iothub/client/updclient/udp.go @@ -1,4 +1,4 @@ -package tcpclient +package udpclient import ( "encoding/hex" @@ -6,12 +6,17 @@ import ( "pandax/pkg/global" ) -var UdpClient = make(map[string]*net.UDPConn) +type UdpClientT struct { + Conn *net.UDPConn + Addr *net.UDPAddr +} + +var UdpClient = make(map[string]*UdpClientT) func Send(deviceId, msg string) error { if conn, ok := UdpClient[deviceId]; ok { global.Log.Infof("设备%s, 发送指令%s", deviceId, msg) - _, err := conn.Write([]byte(msg)) + _, err := conn.Conn.WriteToUDP([]byte(msg), conn.Addr) if err != nil { return err } @@ -28,7 +33,7 @@ func SendHex(deviceId, msg string) error { if err != nil { return err } - _, err = conn.Write(b) + _, err = conn.Conn.WriteToUDP(b, conn.Addr) if err != nil { return err } diff --git a/iothub/hook_message_work/hook_message_work.go b/iothub/hook_message_work/hook_message_work.go index 58a3049..350026a 100644 --- a/iothub/hook_message_work/hook_message_work.go +++ b/iothub/hook_message_work/hook_message_work.go @@ -5,9 +5,7 @@ import ( "encoding/json" "fmt" "github.com/PandaXGO/PandaKit/biz" - "pandax/apps/device/entity" "pandax/apps/device/services" - "pandax/apps/device/tsl" ruleEntity "pandax/apps/rule/entity" ruleService "pandax/apps/rule/services" "pandax/iothub/netbase" @@ -19,7 +17,6 @@ import ( "pandax/pkg/shadow" "pandax/pkg/tool" "pandax/pkg/websocket" - "strings" ) // 消息处理模块 @@ -169,30 +166,13 @@ func SetDeviceShadow(etoken *global_model.DeviceAuth, msgVals map[string]interfa if msgType == message.RowMes { msgType = message.TelemetryMes } - template := services.ProductTemplateModelDao.FindList(entity.ProductTemplate{Classify: strings.ToLower(msgType), Pid: etoken.ProductId}) - for _, tel := range *template { - if _, ok := msgVals[tel.Key]; !ok { - continue - } + for key, value := range msgVals { if message.AttributesMes == msgType { - err := shadow.DeviceShadowInstance.SetDevicePoint(etoken.Name, global.TslAttributesType, tel.Key, msgVals[tel.Key]) + err := shadow.DeviceShadowInstance.SetDevicePoint(etoken.Name, global.TslAttributesType, key, value) biz.ErrIsNilAppendErr(err, "设置设备影子点失败") } if message.TelemetryMes == msgType { - var value interface{} - // tsl转化 - var tslValue tsl.ValueType - err := tool.MapToStruct(tel.Define, &tslValue) - if err != nil { - value = msgVals[tel.Key] - } else { - tslValue.Type = tel.Type - value = tslValue.ConvertValue(msgVals[tel.Key]) - if value == nil { - value = msgVals[tel.Key] - } - } - err = shadow.DeviceShadowInstance.SetDevicePoint(etoken.Name, global.TslTelemetryType, tel.Key, value) + err := shadow.DeviceShadowInstance.SetDevicePoint(etoken.Name, global.TslTelemetryType, key, value) biz.ErrIsNilAppendErr(err, "设置设备影子点失败") } } diff --git a/iothub/server/emqxserver/hook.go b/iothub/server/emqxserver/hook.go index e6db93e..d9057f1 100644 --- a/iothub/server/emqxserver/hook.go +++ b/iothub/server/emqxserver/hook.go @@ -176,7 +176,7 @@ func (s *HookGrpcService) OnMessagePublish(ctx context.Context, in *exhook2.Mess res.Type = exhook2.ValuedResponse_STOP_AND_RETURN res.Value = &exhook2.ValuedResponse_BoolResult{BoolResult: false} //服务端的HTTP请求放行 - if in.GetMessage().From == "http_api" { + if in.Message.From == "http_api" { res.Value = &exhook2.ValuedResponse_BoolResult{BoolResult: true} return res, nil } @@ -193,7 +193,6 @@ func (s *HookGrpcService) OnMessagePublish(ctx context.Context, in *exhook2.Mess DeviceId: etoken.DeviceId, DeviceAuth: etoken, } - // 如果是网关子设备单独处理 if eventType == message.GATEWAY { subData := make(map[string]interface{}) diff --git a/iothub/server/udpserver/hook.go b/iothub/server/udpserver/hook.go index a16624e..87fb57d 100644 --- a/iothub/server/udpserver/hook.go +++ b/iothub/server/udpserver/hook.go @@ -3,17 +3,20 @@ package updserver import ( "context" "encoding/hex" + "fmt" "net" + udpclient "pandax/iothub/client/updclient" "pandax/iothub/hook_message_work" "pandax/iothub/netbase" "pandax/pkg/global" + "pandax/pkg/global_model" "pandax/pkg/rule_engine/message" + "time" ) type HookUdpService struct { HookService *hook_message_work.HookService conn *net.UDPConn - addr *net.UDPAddr } func InitUdpHook(addr string, hs *hook_message_work.HookService) { @@ -26,40 +29,76 @@ func InitUdpHook(addr string, hs *hook_message_work.HookService) { global.Log.Infof("UDP IOTHUB HOOK Start SUCCESS, Server listen: %s", addr) } buffer := make([]byte, 1024) + isAuth := false + etoken := &global_model.DeviceAuth{} + hhs := &HookUdpService{ + HookService: hs, + conn: server.listener, + } for { n, client, err := server.listener.ReadFromUDP(buffer) if err != nil { global.Log.Error("Error accepting connection:", err) + _ = server.listener.Close() + //设置断开连接 + if isAuth { + data := netbase.CreateConnectionInfo(message.DisConnectMes, "udp", client.IP.String(), client.AddrPort().String(), etoken) + hhs.HookService.MessageCh <- data + } + delete(udpclient.UdpClient, etoken.DeviceId) + isAuth = false continue } - hhs := &HookUdpService{ - HookService: hs, - conn: server.listener, - addr: client, - } - go hhs.hook(buffer[:n]) + if !isAuth { + token := string(buffer[:n]) + etoken.GetDeviceToken(token) + auth := netbase.Auth(token) + // 认证成功,创建连接记录 + if auth { + global.Log.Infof("UDP协议 设备%s,认证成功", etoken.DeviceId) + data := netbase.CreateConnectionInfo(message.ConnectMes, "udp", client.IP.String(), client.AddrPort().String(), etoken) + hhs.HookService.MessageCh <- data + isAuth = true + udpclient.UdpClient[etoken.DeviceId] = &udpclient.UdpClientT{ + Conn: server.listener, + Addr: client, + } + hhs.Send(client, "success") + } else { + hhs.Send(client, "fail") + } + } else { + data := &netbase.DeviceEventInfo{ + DeviceId: etoken.DeviceId, + DeviceAuth: etoken, + } + hexData := hex.EncodeToString(buffer[:n]) + global.Log.Infof("UDP协议 设备%s, 接受消息%s", etoken.DeviceId, hexData) + ts := time.Now().Format("2006-01-02 15:04:05.000") + data.Type = message.RowMes + data.Datas = fmt.Sprintf(`{"ts": "%s","rowdata": "%s"}`, ts, hexData) + + // etoken中添加设备标识 + hhs.HookService.MessageCh <- data + } } } -func (hhs *HookUdpService) hook(data []byte) { - hhs.Send("success") +func (hhs *HookUdpService) Send(addr *net.UDPAddr, message string) error { + return hhs.SendBytes(addr, []byte(message)) } -func (hhs *HookUdpService) Send(message string) error { - return hhs.SendBytes([]byte(message)) -} - -func (hhs *HookUdpService) SendHex(msg string) error { +func (hhs *HookUdpService) SendHex(addr *net.UDPAddr, msg string) error { b, err := hex.DecodeString(msg) if err != nil { return err } - return hhs.SendBytes(b) + return hhs.SendBytes(addr, b) } -func (hhs *HookUdpService) SendBytes(msg []byte) error { - _, err := hhs.conn.WriteToUDP(msg, hhs.addr) +func (hhs *HookUdpService) SendBytes(addr *net.UDPAddr, msg []byte) error { + _, err := hhs.conn.WriteToUDP(msg, addr) if err != nil { hhs.conn.Close() data := &netbase.DeviceEventInfo{