[优化]设备数据上传并发处理,添加队列,以及并发数控制

This commit is contained in:
PandaX
2023-11-28 18:08:49 +08:00
parent 287c8a1b05
commit fd44f11c4b
4 changed files with 12 additions and 11 deletions

View File

@@ -89,7 +89,7 @@ func (s *HookGrpcService) OnClientConnected(ctx context.Context, in *exhook2.Cli
//添加连接ID
mqttclient.MqttClient.Store(etoken.DeviceId, in.Clientinfo.Clientid)
data := netbase.CreateConnectionInfo(message.ConnectMes, "mqtt", in.Clientinfo.Clientid, in.Clientinfo.Peerhost, etoken)
s.HookService.MessageCh <- data
s.HookService.Queue.Queue(data)
return &exhook2.EmptySuccess{}, nil
}
@@ -104,7 +104,7 @@ func (s *HookGrpcService) OnClientDisconnected(ctx context.Context, in *exhook2.
//删除连接ID
mqttclient.MqttClient.Delete(etoken.DeviceId)
data := netbase.CreateConnectionInfo(message.DisConnectMes, "mqtt", in.Clientinfo.Clientid, in.Clientinfo.Peerhost, etoken)
s.HookService.MessageCh <- data
s.HookService.Queue.Queue(data)
return &exhook2.EmptySuccess{}, nil
}

View File

@@ -44,7 +44,7 @@ func InitHttpHook(addr string, hs *hook_message_work.HookService) {
if etoken != nil {
data := netbase.CreateConnectionInfo(message.DisConnectMes, "http", conn.RemoteAddr().String(), conn.RemoteAddr().String(), etoken.(*model.DeviceAuth))
activeConnections.Delete(conn.RemoteAddr().String())
service.HookService.MessageCh <- data
service.HookService.Queue.Queue(data)
}
}
}
@@ -87,7 +87,7 @@ func (hhs *HookHttpService) hook(req *restful.Request, resp *restful.Response) {
if !ok {
activeConnections.Store(req.Request.RemoteAddr, etoken)
data := netbase.CreateConnectionInfo(message.ConnectMes, "http", req.Request.RemoteAddr, req.Request.RemoteAddr, etoken)
hhs.HookService.MessageCh <- data
hhs.HookService.Queue.Queue(data)
}
marshal, _ := json.Marshal(upData)
data := &netbase.DeviceEventInfo{
@@ -122,6 +122,6 @@ func (hhs *HookHttpService) hook(req *restful.Request, resp *restful.Response) {
resp.Write([]byte("路径上报类型错误"))
return
}
hhs.HookService.MessageCh <- data
hhs.HookService.Queue.Queue(data)
io.WriteString(resp, "ok")
}

View File

@@ -58,7 +58,7 @@ func (hhs *HookTcpService) hook() {
//设置断开连接
if isAuth {
data := netbase.CreateConnectionInfo(message.DisConnectMes, "tcp", hhs.conn.RemoteAddr().String(), hhs.conn.RemoteAddr().String(), etoken)
hhs.HookService.MessageCh <- data
hhs.HookService.Queue.Queue(data)
}
tcpclient.TcpClient.Delete(etoken.DeviceId)
isAuth = false
@@ -72,7 +72,7 @@ func (hhs *HookTcpService) hook() {
if auth {
global.Log.Infof("TCP协议 设备%s,认证成功", etoken.DeviceId)
data := netbase.CreateConnectionInfo(message.ConnectMes, "tcp", hhs.conn.RemoteAddr().String(), hhs.conn.RemoteAddr().String(), etoken)
hhs.HookService.MessageCh <- data
hhs.HookService.Queue.Queue(data)
isAuth = true
tcpclient.TcpClient.Store(etoken.DeviceId, hhs.conn)
hhs.Send("success")
@@ -92,7 +92,7 @@ func (hhs *HookTcpService) hook() {
data.Datas = fmt.Sprintf(`{"ts": "%s","rowdata": "%s"}`, ts, hexData)
// etoken中添加设备标识
hhs.HookService.MessageCh <- data
hhs.HookService.Queue.Queue(data)
}
}

View File

@@ -43,7 +43,7 @@ func InitUdpHook(addr string, hs *hook_message_work.HookService) {
//设置断开连接
if isAuth, ok := authMap[client.AddrPort().String()]; ok && isAuth {
data := netbase.CreateConnectionInfo(message.DisConnectMes, "udp", client.IP.String(), client.AddrPort().String(), etoken)
hhs.HookService.MessageCh <- data
hhs.HookService.Queue.Queue(data)
}
udpclient.UdpClient.Delete(etoken.DeviceId)
delete(authMap, client.AddrPort().String())
@@ -62,7 +62,7 @@ func InitUdpHook(addr string, hs *hook_message_work.HookService) {
data.Datas = fmt.Sprintf(`{"ts": "%s","rowdata": "%s"}`, ts, hexData)
// etoken中添加设备标识
hhs.HookService.MessageCh <- data
hhs.HookService.Queue.Queue(data)
} else {
token := string(buffer[:n])
etoken.GetDeviceToken(token)
@@ -71,7 +71,8 @@ func InitUdpHook(addr string, hs *hook_message_work.HookService) {
if auth {
global.Log.Infof("UDP协议 设备%s,认证成功", etoken.DeviceId)
data := netbase.CreateConnectionInfo(message.ConnectMes, "udp", client.IP.String(), client.AddrPort().String(), etoken)
hhs.HookService.MessageCh <- data
hhs.HookService.Queue.Queue(data)
authMap[client.AddrPort().String()] = true
udpclient.UdpClient.Store(etoken.DeviceId, &udpclient.UdpClientT{
Conn: server.listener,