mirror of
https://gitee.com/XM-GO/PandaX.git
synced 2026-04-23 02:48:34 +08:00
[优化]降低exhook响应时间
This commit is contained in:
@@ -87,9 +87,9 @@ func (s *HookGrpcService) OnClientConnected(ctx context.Context, in *exhook2.Cli
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
//添加连接ID
|
//添加连接ID
|
||||||
go mqttclient.MqttClient.Store(etoken.DeviceId, in.Clientinfo.Clientid)
|
mqttclient.MqttClient.Store(etoken.DeviceId, in.Clientinfo.Clientid)
|
||||||
data := netbase.CreateConnectionInfo(message.ConnectMes, "mqtt", in.Clientinfo.Clientid, in.Clientinfo.Peerhost, etoken)
|
data := netbase.CreateConnectionInfo(message.ConnectMes, "mqtt", in.Clientinfo.Clientid, in.Clientinfo.Peerhost, etoken)
|
||||||
s.HookService.Queue.Queue(data)
|
go s.HookService.Queue.Queue(data)
|
||||||
return &exhook2.EmptySuccess{}, nil
|
return &exhook2.EmptySuccess{}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -102,9 +102,9 @@ func (s *HookGrpcService) OnClientDisconnected(ctx context.Context, in *exhook2.
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
//删除连接ID
|
//删除连接ID
|
||||||
go mqttclient.MqttClient.Delete(etoken.DeviceId)
|
mqttclient.MqttClient.Delete(etoken.DeviceId)
|
||||||
data := netbase.CreateConnectionInfo(message.DisConnectMes, "mqtt", in.Clientinfo.Clientid, in.Clientinfo.Peerhost, etoken)
|
data := netbase.CreateConnectionInfo(message.DisConnectMes, "mqtt", in.Clientinfo.Clientid, in.Clientinfo.Peerhost, etoken)
|
||||||
s.HookService.Queue.Queue(data)
|
go s.HookService.Queue.Queue(data)
|
||||||
return &exhook2.EmptySuccess{}, nil
|
return &exhook2.EmptySuccess{}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -223,7 +223,7 @@ func (s *HookGrpcService) OnMessagePublish(ctx context.Context, in *exhook2.Mess
|
|||||||
// 创建tdengine的设备属性表
|
// 创建tdengine的设备属性表
|
||||||
netbase.CreateSubTableField(auth.ProductId, global.TslAttributesType, attributesData)
|
netbase.CreateSubTableField(auth.ProductId, global.TslAttributesType, attributesData)
|
||||||
// 子设备发送到队列里
|
// 子设备发送到队列里
|
||||||
s.HookService.Queue.Queue(data)
|
go s.HookService.Queue.Queue(data)
|
||||||
}
|
}
|
||||||
if in.Message.Topic == TelemetryGatewayTopic {
|
if in.Message.Topic == TelemetryGatewayTopic {
|
||||||
data.Type = message.TelemetryMes
|
data.Type = message.TelemetryMes
|
||||||
@@ -238,7 +238,7 @@ func (s *HookGrpcService) OnMessagePublish(ctx context.Context, in *exhook2.Mess
|
|||||||
// 创建tdengine的设备遥测表
|
// 创建tdengine的设备遥测表
|
||||||
netbase.CreateSubTableField(auth.ProductId, global.TslTelemetryType, telemetryData)
|
netbase.CreateSubTableField(auth.ProductId, global.TslTelemetryType, telemetryData)
|
||||||
// 子设备发送到队列里
|
// 子设备发送到队列里
|
||||||
s.HookService.Queue.Queue(data)
|
go s.HookService.Queue.Queue(data)
|
||||||
}
|
}
|
||||||
if in.Message.Topic == ConnectGatewayTopic {
|
if in.Message.Topic == ConnectGatewayTopic {
|
||||||
if val, ok := value.(string); ok {
|
if val, ok := value.(string); ok {
|
||||||
@@ -249,7 +249,7 @@ func (s *HookGrpcService) OnMessagePublish(ctx context.Context, in *exhook2.Mess
|
|||||||
data = netbase.CreateConnectionInfo(message.DisConnectMes, "mqtt", in.Message.From, in.Message.Headers["peerhost"], auth)
|
data = netbase.CreateConnectionInfo(message.DisConnectMes, "mqtt", in.Message.From, in.Message.Headers["peerhost"], auth)
|
||||||
}
|
}
|
||||||
// 子设备发送到队列里
|
// 子设备发送到队列里
|
||||||
s.HookService.Queue.Queue(data)
|
go s.HookService.Queue.Queue(data)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -283,7 +283,7 @@ func (s *HookGrpcService) OnMessagePublish(ctx context.Context, in *exhook2.Mess
|
|||||||
data.RequestId = id
|
data.RequestId = id
|
||||||
}
|
}
|
||||||
//将数据放到队列中
|
//将数据放到队列中
|
||||||
s.HookService.Queue.Queue(data)
|
go s.HookService.Queue.Queue(data)
|
||||||
}()
|
}()
|
||||||
res.Value = &exhook2.ValuedResponse_Message{Message: in.Message}
|
res.Value = &exhook2.ValuedResponse_Message{Message: in.Message}
|
||||||
return res, nil
|
return res, nil
|
||||||
|
|||||||
@@ -87,7 +87,7 @@ func (hhs *HookHttpService) hook(req *restful.Request, resp *restful.Response) {
|
|||||||
if !ok {
|
if !ok {
|
||||||
activeConnections.Store(req.Request.RemoteAddr, etoken)
|
activeConnections.Store(req.Request.RemoteAddr, etoken)
|
||||||
data := netbase.CreateConnectionInfo(message.ConnectMes, "http", req.Request.RemoteAddr, req.Request.RemoteAddr, etoken)
|
data := netbase.CreateConnectionInfo(message.ConnectMes, "http", req.Request.RemoteAddr, req.Request.RemoteAddr, etoken)
|
||||||
hhs.HookService.Queue.Queue(data)
|
go hhs.HookService.Queue.Queue(data)
|
||||||
}
|
}
|
||||||
marshal, _ := json.Marshal(upData)
|
marshal, _ := json.Marshal(upData)
|
||||||
data := &netbase.DeviceEventInfo{
|
data := &netbase.DeviceEventInfo{
|
||||||
@@ -122,6 +122,6 @@ func (hhs *HookHttpService) hook(req *restful.Request, resp *restful.Response) {
|
|||||||
resp.Write([]byte("路径上报类型错误"))
|
resp.Write([]byte("路径上报类型错误"))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
hhs.HookService.Queue.Queue(data)
|
go hhs.HookService.Queue.Queue(data)
|
||||||
io.WriteString(resp, "ok")
|
io.WriteString(resp, "ok")
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -58,7 +58,7 @@ func (hhs *HookTcpService) hook() {
|
|||||||
//设置断开连接
|
//设置断开连接
|
||||||
if isAuth {
|
if isAuth {
|
||||||
data := netbase.CreateConnectionInfo(message.DisConnectMes, "tcp", hhs.conn.RemoteAddr().String(), hhs.conn.RemoteAddr().String(), etoken)
|
data := netbase.CreateConnectionInfo(message.DisConnectMes, "tcp", hhs.conn.RemoteAddr().String(), hhs.conn.RemoteAddr().String(), etoken)
|
||||||
hhs.HookService.Queue.Queue(data)
|
go hhs.HookService.Queue.Queue(data)
|
||||||
}
|
}
|
||||||
tcpclient.TcpClient.Delete(etoken.DeviceId)
|
tcpclient.TcpClient.Delete(etoken.DeviceId)
|
||||||
isAuth = false
|
isAuth = false
|
||||||
@@ -72,7 +72,7 @@ func (hhs *HookTcpService) hook() {
|
|||||||
if auth {
|
if auth {
|
||||||
global.Log.Infof("TCP协议 设备%s,认证成功", etoken.DeviceId)
|
global.Log.Infof("TCP协议 设备%s,认证成功", etoken.DeviceId)
|
||||||
data := netbase.CreateConnectionInfo(message.ConnectMes, "tcp", hhs.conn.RemoteAddr().String(), hhs.conn.RemoteAddr().String(), etoken)
|
data := netbase.CreateConnectionInfo(message.ConnectMes, "tcp", hhs.conn.RemoteAddr().String(), hhs.conn.RemoteAddr().String(), etoken)
|
||||||
hhs.HookService.Queue.Queue(data)
|
go hhs.HookService.Queue.Queue(data)
|
||||||
isAuth = true
|
isAuth = true
|
||||||
tcpclient.TcpClient.Store(etoken.DeviceId, hhs.conn)
|
tcpclient.TcpClient.Store(etoken.DeviceId, hhs.conn)
|
||||||
hhs.Send("success")
|
hhs.Send("success")
|
||||||
@@ -92,7 +92,7 @@ func (hhs *HookTcpService) hook() {
|
|||||||
data.Datas = fmt.Sprintf(`{"ts": "%s","rowdata": "%s"}`, ts, hexData)
|
data.Datas = fmt.Sprintf(`{"ts": "%s","rowdata": "%s"}`, ts, hexData)
|
||||||
|
|
||||||
// etoken中添加设备标识
|
// etoken中添加设备标识
|
||||||
hhs.HookService.Queue.Queue(data)
|
go hhs.HookService.Queue.Queue(data)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -43,7 +43,7 @@ func InitUdpHook(addr string, hs *hook_message_work.HookService) {
|
|||||||
//设置断开连接
|
//设置断开连接
|
||||||
if isAuth, ok := authMap[client.AddrPort().String()]; ok && isAuth {
|
if isAuth, ok := authMap[client.AddrPort().String()]; ok && isAuth {
|
||||||
data := netbase.CreateConnectionInfo(message.DisConnectMes, "udp", client.IP.String(), client.AddrPort().String(), etoken)
|
data := netbase.CreateConnectionInfo(message.DisConnectMes, "udp", client.IP.String(), client.AddrPort().String(), etoken)
|
||||||
hhs.HookService.Queue.Queue(data)
|
go hhs.HookService.Queue.Queue(data)
|
||||||
}
|
}
|
||||||
udpclient.UdpClient.Delete(etoken.DeviceId)
|
udpclient.UdpClient.Delete(etoken.DeviceId)
|
||||||
delete(authMap, client.AddrPort().String())
|
delete(authMap, client.AddrPort().String())
|
||||||
@@ -62,7 +62,7 @@ func InitUdpHook(addr string, hs *hook_message_work.HookService) {
|
|||||||
data.Datas = fmt.Sprintf(`{"ts": "%s","rowdata": "%s"}`, ts, hexData)
|
data.Datas = fmt.Sprintf(`{"ts": "%s","rowdata": "%s"}`, ts, hexData)
|
||||||
|
|
||||||
// etoken中添加设备标识
|
// etoken中添加设备标识
|
||||||
hhs.HookService.Queue.Queue(data)
|
go hhs.HookService.Queue.Queue(data)
|
||||||
} else {
|
} else {
|
||||||
token := string(buffer[:n])
|
token := string(buffer[:n])
|
||||||
etoken.GetDeviceToken(token)
|
etoken.GetDeviceToken(token)
|
||||||
@@ -71,7 +71,7 @@ func InitUdpHook(addr string, hs *hook_message_work.HookService) {
|
|||||||
if auth {
|
if auth {
|
||||||
global.Log.Infof("UDP协议 设备%s,认证成功", etoken.DeviceId)
|
global.Log.Infof("UDP协议 设备%s,认证成功", etoken.DeviceId)
|
||||||
data := netbase.CreateConnectionInfo(message.ConnectMes, "udp", client.IP.String(), client.AddrPort().String(), etoken)
|
data := netbase.CreateConnectionInfo(message.ConnectMes, "udp", client.IP.String(), client.AddrPort().String(), etoken)
|
||||||
hhs.HookService.Queue.Queue(data)
|
go hhs.HookService.Queue.Queue(data)
|
||||||
|
|
||||||
authMap[client.AddrPort().String()] = true
|
authMap[client.AddrPort().String()] = true
|
||||||
udpclient.UdpClient.Store(etoken.DeviceId, &udpclient.UdpClientT{
|
udpclient.UdpClient.Store(etoken.DeviceId, &udpclient.UdpClientT{
|
||||||
|
|||||||
Reference in New Issue
Block a user