diff --git a/iothub/server/emqxserver/hook.go b/iothub/server/emqxserver/hook.go index 659090e..43187b8 100644 --- a/iothub/server/emqxserver/hook.go +++ b/iothub/server/emqxserver/hook.go @@ -87,9 +87,9 @@ func (s *HookGrpcService) OnClientConnected(ctx context.Context, in *exhook2.Cli return nil, err } //添加连接ID - go mqttclient.MqttClient.Store(etoken.DeviceId, in.Clientinfo.Clientid) + mqttclient.MqttClient.Store(etoken.DeviceId, in.Clientinfo.Clientid) data := netbase.CreateConnectionInfo(message.ConnectMes, "mqtt", in.Clientinfo.Clientid, in.Clientinfo.Peerhost, etoken) - s.HookService.Queue.Queue(data) + go s.HookService.Queue.Queue(data) return &exhook2.EmptySuccess{}, nil } @@ -102,9 +102,9 @@ func (s *HookGrpcService) OnClientDisconnected(ctx context.Context, in *exhook2. return nil, err } //删除连接ID - go mqttclient.MqttClient.Delete(etoken.DeviceId) + mqttclient.MqttClient.Delete(etoken.DeviceId) data := netbase.CreateConnectionInfo(message.DisConnectMes, "mqtt", in.Clientinfo.Clientid, in.Clientinfo.Peerhost, etoken) - s.HookService.Queue.Queue(data) + go s.HookService.Queue.Queue(data) return &exhook2.EmptySuccess{}, nil } @@ -223,7 +223,7 @@ func (s *HookGrpcService) OnMessagePublish(ctx context.Context, in *exhook2.Mess // 创建tdengine的设备属性表 netbase.CreateSubTableField(auth.ProductId, global.TslAttributesType, attributesData) // 子设备发送到队列里 - s.HookService.Queue.Queue(data) + go s.HookService.Queue.Queue(data) } if in.Message.Topic == TelemetryGatewayTopic { data.Type = message.TelemetryMes @@ -238,7 +238,7 @@ func (s *HookGrpcService) OnMessagePublish(ctx context.Context, in *exhook2.Mess // 创建tdengine的设备遥测表 netbase.CreateSubTableField(auth.ProductId, global.TslTelemetryType, telemetryData) // 子设备发送到队列里 - s.HookService.Queue.Queue(data) + go s.HookService.Queue.Queue(data) } if in.Message.Topic == ConnectGatewayTopic { if val, ok := value.(string); ok { @@ -249,7 +249,7 @@ func (s *HookGrpcService) OnMessagePublish(ctx context.Context, in *exhook2.Mess data = netbase.CreateConnectionInfo(message.DisConnectMes, "mqtt", in.Message.From, in.Message.Headers["peerhost"], auth) } // 子设备发送到队列里 - s.HookService.Queue.Queue(data) + go s.HookService.Queue.Queue(data) } } } @@ -283,7 +283,7 @@ func (s *HookGrpcService) OnMessagePublish(ctx context.Context, in *exhook2.Mess data.RequestId = id } //将数据放到队列中 - s.HookService.Queue.Queue(data) + go s.HookService.Queue.Queue(data) }() res.Value = &exhook2.ValuedResponse_Message{Message: in.Message} return res, nil diff --git a/iothub/server/httpserver/hook.go b/iothub/server/httpserver/hook.go index 851c5e8..d0d0fb4 100644 --- a/iothub/server/httpserver/hook.go +++ b/iothub/server/httpserver/hook.go @@ -87,7 +87,7 @@ func (hhs *HookHttpService) hook(req *restful.Request, resp *restful.Response) { if !ok { activeConnections.Store(req.Request.RemoteAddr, etoken) data := netbase.CreateConnectionInfo(message.ConnectMes, "http", req.Request.RemoteAddr, req.Request.RemoteAddr, etoken) - hhs.HookService.Queue.Queue(data) + go hhs.HookService.Queue.Queue(data) } marshal, _ := json.Marshal(upData) data := &netbase.DeviceEventInfo{ @@ -122,6 +122,6 @@ func (hhs *HookHttpService) hook(req *restful.Request, resp *restful.Response) { resp.Write([]byte("路径上报类型错误")) return } - hhs.HookService.Queue.Queue(data) + go hhs.HookService.Queue.Queue(data) io.WriteString(resp, "ok") } diff --git a/iothub/server/tcpserver/hook.go b/iothub/server/tcpserver/hook.go index 65b12b2..9048173 100644 --- a/iothub/server/tcpserver/hook.go +++ b/iothub/server/tcpserver/hook.go @@ -58,7 +58,7 @@ func (hhs *HookTcpService) hook() { //设置断开连接 if isAuth { data := netbase.CreateConnectionInfo(message.DisConnectMes, "tcp", hhs.conn.RemoteAddr().String(), hhs.conn.RemoteAddr().String(), etoken) - hhs.HookService.Queue.Queue(data) + go hhs.HookService.Queue.Queue(data) } tcpclient.TcpClient.Delete(etoken.DeviceId) isAuth = false @@ -72,7 +72,7 @@ func (hhs *HookTcpService) hook() { if auth { global.Log.Infof("TCP协议 设备%s,认证成功", etoken.DeviceId) data := netbase.CreateConnectionInfo(message.ConnectMes, "tcp", hhs.conn.RemoteAddr().String(), hhs.conn.RemoteAddr().String(), etoken) - hhs.HookService.Queue.Queue(data) + go hhs.HookService.Queue.Queue(data) isAuth = true tcpclient.TcpClient.Store(etoken.DeviceId, hhs.conn) hhs.Send("success") @@ -92,7 +92,7 @@ func (hhs *HookTcpService) hook() { data.Datas = fmt.Sprintf(`{"ts": "%s","rowdata": "%s"}`, ts, hexData) // etoken中添加设备标识 - hhs.HookService.Queue.Queue(data) + go hhs.HookService.Queue.Queue(data) } } diff --git a/iothub/server/udpserver/hook.go b/iothub/server/udpserver/hook.go index e983831..3c508b8 100644 --- a/iothub/server/udpserver/hook.go +++ b/iothub/server/udpserver/hook.go @@ -43,7 +43,7 @@ func InitUdpHook(addr string, hs *hook_message_work.HookService) { //设置断开连接 if isAuth, ok := authMap[client.AddrPort().String()]; ok && isAuth { data := netbase.CreateConnectionInfo(message.DisConnectMes, "udp", client.IP.String(), client.AddrPort().String(), etoken) - hhs.HookService.Queue.Queue(data) + go hhs.HookService.Queue.Queue(data) } udpclient.UdpClient.Delete(etoken.DeviceId) delete(authMap, client.AddrPort().String()) @@ -62,7 +62,7 @@ func InitUdpHook(addr string, hs *hook_message_work.HookService) { data.Datas = fmt.Sprintf(`{"ts": "%s","rowdata": "%s"}`, ts, hexData) // etoken中添加设备标识 - hhs.HookService.Queue.Queue(data) + go hhs.HookService.Queue.Queue(data) } else { token := string(buffer[:n]) etoken.GetDeviceToken(token) @@ -71,7 +71,7 @@ func InitUdpHook(addr string, hs *hook_message_work.HookService) { if auth { global.Log.Infof("UDP协议 设备%s,认证成功", etoken.DeviceId) data := netbase.CreateConnectionInfo(message.ConnectMes, "udp", client.IP.String(), client.AddrPort().String(), etoken) - hhs.HookService.Queue.Queue(data) + go hhs.HookService.Queue.Queue(data) authMap[client.AddrPort().String()] = true udpclient.UdpClient.Store(etoken.DeviceId, &udpclient.UdpClientT{