From 58341b02363c479bbbef371a72c66be6064687f9 Mon Sep 17 00:00:00 2001 From: PandaX <18610165312@163.com> Date: Tue, 28 Nov 2023 17:22:36 +0800 Subject: [PATCH] =?UTF-8?q?[fix]=20=E5=AE=A2=E6=88=B7=E7=AB=AFmap=E9=94=99?= =?UTF-8?q?=E8=AF=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- iothub/client/mqttclient/mqtt_api.go | 3 ++- iothub/client/mqttclient/rpc.go | 13 +++++++++++-- iothub/client/tcpclient/tcp.go | 12 +++++++++--- iothub/client/updclient/udp.go | 10 +++++++--- iothub/netbase/hook_base.go | 2 -- iothub/server/emqxserver/hook.go | 4 ++-- iothub/server/tcpserver/hook.go | 4 ++-- iothub/server/udpserver/hook.go | 6 +++--- 8 files changed, 36 insertions(+), 18 deletions(-) diff --git a/iothub/client/mqttclient/mqtt_api.go b/iothub/client/mqttclient/mqtt_api.go index 4bd6c89..3c00d36 100644 --- a/iothub/client/mqttclient/mqtt_api.go +++ b/iothub/client/mqttclient/mqtt_api.go @@ -8,10 +8,11 @@ import ( "io/ioutil" "net/http" "pandax/pkg/global" + "sync" ) // key 设备id,value MQTT的clientID -var MqttClient = make(map[string]string) +var MqttClient sync.Map const ClientsInfo string = "client" const SubscribeTopicsInfo string = "subscribe" diff --git a/iothub/client/mqttclient/rpc.go b/iothub/client/mqttclient/rpc.go index 76c3ba6..2c5b3f1 100644 --- a/iothub/client/mqttclient/rpc.go +++ b/iothub/client/mqttclient/rpc.go @@ -1,6 +1,7 @@ package mqttclient import ( + "errors" "fmt" "math/rand" "time" @@ -25,12 +26,20 @@ type RpcRequest struct { // RequestCmd 下发指令 func (rpc RpcRequest) RequestCmd(deviceId, rpcPayload string) error { topic := fmt.Sprintf(RpcReqTopic, rpc.RequestId) - return Publish(topic, MqttClient[deviceId], rpcPayload) + value, ok := MqttClient.Load(deviceId) + if !ok { + return errors.New("为获取到设备的MQTT连接") + } + return Publish(topic, value.(string), rpcPayload) } func (rpc RpcRequest) Pub(deviceId, reqPayload string) error { topic := fmt.Sprintf(RpcRespTopic, rpc.RequestId) - return Publish(topic, MqttClient[deviceId], reqPayload) + value, ok := MqttClient.Load(deviceId) + if !ok { + return errors.New("为获取到设备的MQTT连接") + } + return Publish(topic, value.(string), reqPayload) } func (rpc *RpcRequest) GetRequestId() { diff --git a/iothub/client/tcpclient/tcp.go b/iothub/client/tcpclient/tcp.go index e4fa7ae..3a82420 100644 --- a/iothub/client/tcpclient/tcp.go +++ b/iothub/client/tcpclient/tcp.go @@ -2,14 +2,17 @@ package tcpclient import ( "encoding/hex" + "errors" "net" "pandax/pkg/global" + "sync" ) -var TcpClient = make(map[string]*net.TCPConn) +var TcpClient sync.Map func Send(deviceId, msg string) error { - if conn, ok := TcpClient[deviceId]; ok { + if conn, ok := TcpClient.Load(deviceId); ok { + conn := conn.(*net.TCPConn) global.Log.Infof("设备%s, 发送指令%s", deviceId, msg) _, err := conn.Write([]byte(msg)) if err != nil { @@ -17,12 +20,15 @@ func Send(deviceId, msg string) error { } } else { global.Log.Infof("设备%s TCP连接不存在, 发送指令失败", deviceId) + return errors.New("为获取到设备的MQTT连接") } return nil } func SendHex(deviceId, msg string) error { - if conn, ok := TcpClient[deviceId]; ok { + + if conn, ok := TcpClient.Load(deviceId); ok { + conn := conn.(*net.TCPConn) global.Log.Infof("设备%s, 发送指令%s", deviceId, msg) b, err := hex.DecodeString(msg) if err != nil { diff --git a/iothub/client/updclient/udp.go b/iothub/client/updclient/udp.go index 7eba940..748b1eb 100644 --- a/iothub/client/updclient/udp.go +++ b/iothub/client/updclient/udp.go @@ -4,6 +4,7 @@ import ( "encoding/hex" "net" "pandax/pkg/global" + "sync" ) type UdpClientT struct { @@ -11,10 +12,12 @@ type UdpClientT struct { Addr *net.UDPAddr } -var UdpClient = make(map[string]*UdpClientT) +// var UdpClient = make(map[string]*UdpClientT) +var UdpClient sync.Map func Send(deviceId, msg string) error { - if conn, ok := UdpClient[deviceId]; ok { + if conn, ok := UdpClient.Load(deviceId); ok { + conn := conn.(*UdpClientT) global.Log.Infof("设备%s, 发送指令%s", deviceId, msg) _, err := conn.Conn.WriteToUDP([]byte(msg), conn.Addr) if err != nil { @@ -27,7 +30,8 @@ func Send(deviceId, msg string) error { } func SendHex(deviceId, msg string) error { - if conn, ok := UdpClient[deviceId]; ok { + if conn, ok := UdpClient.Load(deviceId); ok { + conn := conn.(*UdpClientT) global.Log.Infof("设备%s, 发送指令%s", deviceId, msg) b, err := hex.DecodeString(msg) if err != nil { diff --git a/iothub/netbase/hook_base.go b/iothub/netbase/hook_base.go index a7f53d2..844bd92 100644 --- a/iothub/netbase/hook_base.go +++ b/iothub/netbase/hook_base.go @@ -2,7 +2,6 @@ package netbase import ( "encoding/json" - "log" "pandax/apps/device/entity" "pandax/apps/device/services" "pandax/iothub/server/emqxserver/protobuf" @@ -38,7 +37,6 @@ func Auth(authToken string) bool { return false } etoken = services.GetDeviceToken(&device.Device) - log.Println("设置设备协议", device.Product.ProtocolName) etoken.DeviceProtocol = device.Product.ProtocolName err = cache.SetDeviceEtoken(authToken, etoken.GetMarshal(), time.Hour*24*365) if err != nil { diff --git a/iothub/server/emqxserver/hook.go b/iothub/server/emqxserver/hook.go index 8d96133..3362ffb 100644 --- a/iothub/server/emqxserver/hook.go +++ b/iothub/server/emqxserver/hook.go @@ -87,7 +87,7 @@ func (s *HookGrpcService) OnClientConnected(ctx context.Context, in *exhook2.Cli return nil, err } //添加连接ID - mqttclient.MqttClient[etoken.DeviceId] = in.Clientinfo.Clientid + mqttclient.MqttClient.Store(etoken.DeviceId, in.Clientinfo.Clientid) data := netbase.CreateConnectionInfo(message.ConnectMes, "mqtt", in.Clientinfo.Clientid, in.Clientinfo.Peerhost, etoken) s.HookService.MessageCh <- data return &exhook2.EmptySuccess{}, nil @@ -102,7 +102,7 @@ func (s *HookGrpcService) OnClientDisconnected(ctx context.Context, in *exhook2. return nil, err } //删除连接ID - delete(mqttclient.MqttClient, etoken.DeviceId) + mqttclient.MqttClient.Delete(etoken.DeviceId) data := netbase.CreateConnectionInfo(message.DisConnectMes, "mqtt", in.Clientinfo.Clientid, in.Clientinfo.Peerhost, etoken) s.HookService.MessageCh <- data return &exhook2.EmptySuccess{}, nil diff --git a/iothub/server/tcpserver/hook.go b/iothub/server/tcpserver/hook.go index 9bfff63..f48dc1e 100644 --- a/iothub/server/tcpserver/hook.go +++ b/iothub/server/tcpserver/hook.go @@ -60,7 +60,7 @@ func (hhs *HookTcpService) hook() { data := netbase.CreateConnectionInfo(message.DisConnectMes, "tcp", hhs.conn.RemoteAddr().String(), hhs.conn.RemoteAddr().String(), etoken) hhs.HookService.MessageCh <- data } - delete(tcpclient.TcpClient, etoken.DeviceId) + tcpclient.TcpClient.Delete(etoken.DeviceId) isAuth = false return } @@ -74,7 +74,7 @@ func (hhs *HookTcpService) hook() { data := netbase.CreateConnectionInfo(message.ConnectMes, "tcp", hhs.conn.RemoteAddr().String(), hhs.conn.RemoteAddr().String(), etoken) hhs.HookService.MessageCh <- data isAuth = true - tcpclient.TcpClient[etoken.DeviceId] = hhs.conn + tcpclient.TcpClient.Store(etoken.DeviceId, hhs.conn) hhs.Send("success") } else { hhs.Send("fail") diff --git a/iothub/server/udpserver/hook.go b/iothub/server/udpserver/hook.go index 013f78e..88c3cd6 100644 --- a/iothub/server/udpserver/hook.go +++ b/iothub/server/udpserver/hook.go @@ -45,7 +45,7 @@ func InitUdpHook(addr string, hs *hook_message_work.HookService) { data := netbase.CreateConnectionInfo(message.DisConnectMes, "udp", client.IP.String(), client.AddrPort().String(), etoken) hhs.HookService.MessageCh <- data } - delete(udpclient.UdpClient, etoken.DeviceId) + udpclient.UdpClient.Delete(etoken.DeviceId) delete(authMap, client.AddrPort().String()) continue } @@ -73,10 +73,10 @@ func InitUdpHook(addr string, hs *hook_message_work.HookService) { data := netbase.CreateConnectionInfo(message.ConnectMes, "udp", client.IP.String(), client.AddrPort().String(), etoken) hhs.HookService.MessageCh <- data authMap[client.AddrPort().String()] = true - udpclient.UdpClient[etoken.DeviceId] = &udpclient.UdpClientT{ + udpclient.UdpClient.Store(etoken.DeviceId, &udpclient.UdpClientT{ Conn: server.listener, Addr: client, - } + }) hhs.Send(client, "success") } else { hhs.Send(client, "fail")