From fd44f11c4b3eaa40f509a09d59b0a26fe3100b96 Mon Sep 17 00:00:00 2001 From: PandaX <18610165312@163.com> Date: Tue, 28 Nov 2023 18:08:49 +0800 Subject: [PATCH] =?UTF-8?q?[=E4=BC=98=E5=8C=96]=E8=AE=BE=E5=A4=87=E6=95=B0?= =?UTF-8?q?=E6=8D=AE=E4=B8=8A=E4=BC=A0=E5=B9=B6=E5=8F=91=E5=A4=84=E7=90=86?= =?UTF-8?q?=EF=BC=8C=E6=B7=BB=E5=8A=A0=E9=98=9F=E5=88=97=EF=BC=8C=E4=BB=A5?= =?UTF-8?q?=E5=8F=8A=E5=B9=B6=E5=8F=91=E6=95=B0=E6=8E=A7=E5=88=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- iothub/server/emqxserver/hook.go | 4 ++-- iothub/server/httpserver/hook.go | 6 +++--- iothub/server/tcpserver/hook.go | 6 +++--- iothub/server/udpserver/hook.go | 7 ++++--- 4 files changed, 12 insertions(+), 11 deletions(-) diff --git a/iothub/server/emqxserver/hook.go b/iothub/server/emqxserver/hook.go index cdc0f88..fe0dd2d 100644 --- a/iothub/server/emqxserver/hook.go +++ b/iothub/server/emqxserver/hook.go @@ -89,7 +89,7 @@ func (s *HookGrpcService) OnClientConnected(ctx context.Context, in *exhook2.Cli //添加连接ID mqttclient.MqttClient.Store(etoken.DeviceId, in.Clientinfo.Clientid) data := netbase.CreateConnectionInfo(message.ConnectMes, "mqtt", in.Clientinfo.Clientid, in.Clientinfo.Peerhost, etoken) - s.HookService.MessageCh <- data + s.HookService.Queue.Queue(data) return &exhook2.EmptySuccess{}, nil } @@ -104,7 +104,7 @@ func (s *HookGrpcService) OnClientDisconnected(ctx context.Context, in *exhook2. //删除连接ID mqttclient.MqttClient.Delete(etoken.DeviceId) data := netbase.CreateConnectionInfo(message.DisConnectMes, "mqtt", in.Clientinfo.Clientid, in.Clientinfo.Peerhost, etoken) - s.HookService.MessageCh <- data + s.HookService.Queue.Queue(data) return &exhook2.EmptySuccess{}, nil } diff --git a/iothub/server/httpserver/hook.go b/iothub/server/httpserver/hook.go index de27b22..851c5e8 100644 --- a/iothub/server/httpserver/hook.go +++ b/iothub/server/httpserver/hook.go @@ -44,7 +44,7 @@ func InitHttpHook(addr string, hs *hook_message_work.HookService) { if etoken != nil { data := netbase.CreateConnectionInfo(message.DisConnectMes, "http", conn.RemoteAddr().String(), conn.RemoteAddr().String(), etoken.(*model.DeviceAuth)) activeConnections.Delete(conn.RemoteAddr().String()) - service.HookService.MessageCh <- data + service.HookService.Queue.Queue(data) } } } @@ -87,7 +87,7 @@ func (hhs *HookHttpService) hook(req *restful.Request, resp *restful.Response) { if !ok { activeConnections.Store(req.Request.RemoteAddr, etoken) data := netbase.CreateConnectionInfo(message.ConnectMes, "http", req.Request.RemoteAddr, req.Request.RemoteAddr, etoken) - hhs.HookService.MessageCh <- data + hhs.HookService.Queue.Queue(data) } marshal, _ := json.Marshal(upData) data := &netbase.DeviceEventInfo{ @@ -122,6 +122,6 @@ func (hhs *HookHttpService) hook(req *restful.Request, resp *restful.Response) { resp.Write([]byte("路径上报类型错误")) return } - hhs.HookService.MessageCh <- data + hhs.HookService.Queue.Queue(data) io.WriteString(resp, "ok") } diff --git a/iothub/server/tcpserver/hook.go b/iothub/server/tcpserver/hook.go index f48dc1e..65b12b2 100644 --- a/iothub/server/tcpserver/hook.go +++ b/iothub/server/tcpserver/hook.go @@ -58,7 +58,7 @@ func (hhs *HookTcpService) hook() { //设置断开连接 if isAuth { data := netbase.CreateConnectionInfo(message.DisConnectMes, "tcp", hhs.conn.RemoteAddr().String(), hhs.conn.RemoteAddr().String(), etoken) - hhs.HookService.MessageCh <- data + hhs.HookService.Queue.Queue(data) } tcpclient.TcpClient.Delete(etoken.DeviceId) isAuth = false @@ -72,7 +72,7 @@ func (hhs *HookTcpService) hook() { if auth { global.Log.Infof("TCP协议 设备%s,认证成功", etoken.DeviceId) data := netbase.CreateConnectionInfo(message.ConnectMes, "tcp", hhs.conn.RemoteAddr().String(), hhs.conn.RemoteAddr().String(), etoken) - hhs.HookService.MessageCh <- data + hhs.HookService.Queue.Queue(data) isAuth = true tcpclient.TcpClient.Store(etoken.DeviceId, hhs.conn) hhs.Send("success") @@ -92,7 +92,7 @@ func (hhs *HookTcpService) hook() { data.Datas = fmt.Sprintf(`{"ts": "%s","rowdata": "%s"}`, ts, hexData) // etoken中添加设备标识 - hhs.HookService.MessageCh <- data + hhs.HookService.Queue.Queue(data) } } diff --git a/iothub/server/udpserver/hook.go b/iothub/server/udpserver/hook.go index 88c3cd6..e983831 100644 --- a/iothub/server/udpserver/hook.go +++ b/iothub/server/udpserver/hook.go @@ -43,7 +43,7 @@ func InitUdpHook(addr string, hs *hook_message_work.HookService) { //设置断开连接 if isAuth, ok := authMap[client.AddrPort().String()]; ok && isAuth { data := netbase.CreateConnectionInfo(message.DisConnectMes, "udp", client.IP.String(), client.AddrPort().String(), etoken) - hhs.HookService.MessageCh <- data + hhs.HookService.Queue.Queue(data) } udpclient.UdpClient.Delete(etoken.DeviceId) delete(authMap, client.AddrPort().String()) @@ -62,7 +62,7 @@ func InitUdpHook(addr string, hs *hook_message_work.HookService) { data.Datas = fmt.Sprintf(`{"ts": "%s","rowdata": "%s"}`, ts, hexData) // etoken中添加设备标识 - hhs.HookService.MessageCh <- data + hhs.HookService.Queue.Queue(data) } else { token := string(buffer[:n]) etoken.GetDeviceToken(token) @@ -71,7 +71,8 @@ func InitUdpHook(addr string, hs *hook_message_work.HookService) { if auth { global.Log.Infof("UDP协议 设备%s,认证成功", etoken.DeviceId) data := netbase.CreateConnectionInfo(message.ConnectMes, "udp", client.IP.String(), client.AddrPort().String(), etoken) - hhs.HookService.MessageCh <- data + hhs.HookService.Queue.Queue(data) + authMap[client.AddrPort().String()] = true udpclient.UdpClient.Store(etoken.DeviceId, &udpclient.UdpClientT{ Conn: server.listener,