[优化]降低exhook响应时间

This commit is contained in:
PandaX
2023-11-29 14:43:27 +08:00
parent b239f44e24
commit f4144251bd

View File

@@ -87,7 +87,7 @@ func (s *HookGrpcService) OnClientConnected(ctx context.Context, in *exhook2.Cli
return nil, err
}
//添加连接ID
mqttclient.MqttClient.Store(etoken.DeviceId, in.Clientinfo.Clientid)
go mqttclient.MqttClient.Store(etoken.DeviceId, in.Clientinfo.Clientid)
data := netbase.CreateConnectionInfo(message.ConnectMes, "mqtt", in.Clientinfo.Clientid, in.Clientinfo.Peerhost, etoken)
s.HookService.Queue.Queue(data)
return &exhook2.EmptySuccess{}, nil
@@ -102,7 +102,7 @@ func (s *HookGrpcService) OnClientDisconnected(ctx context.Context, in *exhook2.
return nil, err
}
//删除连接ID
mqttclient.MqttClient.Delete(etoken.DeviceId)
go mqttclient.MqttClient.Delete(etoken.DeviceId)
data := netbase.CreateConnectionInfo(message.DisConnectMes, "mqtt", in.Clientinfo.Clientid, in.Clientinfo.Peerhost, etoken)
s.HookService.Queue.Queue(data)
return &exhook2.EmptySuccess{}, nil
@@ -180,109 +180,111 @@ func (s *HookGrpcService) OnMessagePublish(ctx context.Context, in *exhook2.Mess
res.Value = &exhook2.ValuedResponse_BoolResult{BoolResult: true}
return res, nil
}
etoken := &model.DeviceAuth{}
etoken.GetDeviceToken(in.Message.Headers["username"])
// 获取topic类型
ts := time.Now().Format("2006-01-02 15:04:05.000")
eventType := IotHubTopic.GetMessageType(in.Message.Topic)
// 开启协程为了减少exhook执行时间
go func() {
etoken := &model.DeviceAuth{}
etoken.GetDeviceToken(in.Message.Headers["username"])
// 获取topic类型
ts := time.Now().Format("2006-01-02 15:04:05.000")
eventType := IotHubTopic.GetMessageType(in.Message.Topic)
datas := string(in.GetMessage().GetPayload())
data := &netbase.DeviceEventInfo{
Type: eventType,
Datas: datas,
DeviceId: etoken.DeviceId,
DeviceAuth: etoken,
}
// 如果是网关子设备单独处理
if eventType == message.GATEWAY {
subData := make(map[string]interface{})
err := json.Unmarshal(in.GetMessage().GetPayload(), &subData)
if err != nil {
global.Log.Warn(fmt.Sprintf("子网关上报数据格式错误"))
res.Value = &exhook2.ValuedResponse_BoolResult{BoolResult: false}
return res, nil
datas := string(in.GetMessage().GetPayload())
data := &netbase.DeviceEventInfo{
Type: eventType,
Datas: datas,
DeviceId: etoken.DeviceId,
DeviceAuth: etoken,
}
// key就是device name
for deviceName, value := range subData {
auth, isSub := netbase.SubAuth(deviceName)
if !isSub {
continue
// 如果是网关子设备单独处理
if eventType == message.GATEWAY {
subData := make(map[string]interface{})
err := json.Unmarshal(in.GetMessage().GetPayload(), &subData)
if err != nil {
global.Log.Error(fmt.Sprintf("子网关上报数据格式错误"))
return
}
data.DeviceAuth = auth
data.DeviceId = auth.DeviceId
if in.Message.Topic == AttributesGatewayTopic {
data.Type = message.AttributesMes
marshal, _ := json.Marshal(value)
attributesData := netbase.UpdateDeviceAttributesData(string(marshal))
if attributesData == nil {
// key就是device name
for deviceName, value := range subData {
auth, isSub := netbase.SubAuth(deviceName)
if !isSub {
continue
}
bytes, _ := json.Marshal(attributesData)
data.Datas = string(bytes)
// 创建tdengine的设备属性表
netbase.CreateSubTableField(auth.ProductId, global.TslAttributesType, attributesData)
// 子设备发送到队列里
s.HookService.MessageCh <- data
}
if in.Message.Topic == TelemetryGatewayTopic {
data.Type = message.TelemetryMes
// 数据处理 如果上传的数据没有时间戳 添加时间戳更改格式化
td, _ := json.Marshal(value)
telemetryData := netbase.UpdateDeviceTelemetryData(string(td))
if telemetryData == nil {
continue
}
bytes, _ := json.Marshal(telemetryData)
data.Datas = string(bytes)
// 创建tdengine的设备遥测表
netbase.CreateSubTableField(auth.ProductId, global.TslTelemetryType, telemetryData)
// 子设备发送到队列里
s.HookService.MessageCh <- data
}
if in.Message.Topic == ConnectGatewayTopic {
if val, ok := value.(string); ok {
if val == "online" {
data = netbase.CreateConnectionInfo(message.ConnectMes, "mqtt", in.Message.From, in.Message.Headers["peerhost"], auth)
}
if val == "offline" {
data = netbase.CreateConnectionInfo(message.DisConnectMes, "mqtt", in.Message.From, in.Message.Headers["peerhost"], auth)
data.DeviceAuth = auth
data.DeviceId = auth.DeviceId
if in.Message.Topic == AttributesGatewayTopic {
data.Type = message.AttributesMes
marshal, _ := json.Marshal(value)
attributesData := netbase.UpdateDeviceAttributesData(string(marshal))
if attributesData == nil {
continue
}
bytes, _ := json.Marshal(attributesData)
data.Datas = string(bytes)
// 创建tdengine的设备属性表
netbase.CreateSubTableField(auth.ProductId, global.TslAttributesType, attributesData)
// 子设备发送到队列里
s.HookService.MessageCh <- data
s.HookService.Queue.Queue(data)
}
if in.Message.Topic == TelemetryGatewayTopic {
data.Type = message.TelemetryMes
// 数据处理 如果上传的数据没有时间戳 添加时间戳更改格式化
td, _ := json.Marshal(value)
telemetryData := netbase.UpdateDeviceTelemetryData(string(td))
if telemetryData == nil {
continue
}
bytes, _ := json.Marshal(telemetryData)
data.Datas = string(bytes)
// 创建tdengine的设备遥测表
netbase.CreateSubTableField(auth.ProductId, global.TslTelemetryType, telemetryData)
// 子设备发送到队列里
s.HookService.Queue.Queue(data)
}
if in.Message.Topic == ConnectGatewayTopic {
if val, ok := value.(string); ok {
if val == "online" {
data = netbase.CreateConnectionInfo(message.ConnectMes, "mqtt", in.Message.From, in.Message.Headers["peerhost"], auth)
}
if val == "offline" {
data = netbase.CreateConnectionInfo(message.DisConnectMes, "mqtt", in.Message.From, in.Message.Headers["peerhost"], auth)
}
// 子设备发送到队列里
s.HookService.Queue.Queue(data)
}
}
}
return
}
res.Value = &exhook2.ValuedResponse_Message{Message: in.Message}
return res, nil
}
switch eventType {
case message.RowMes:
data.Type = message.RowMes
data.Datas = fmt.Sprintf(`{"ts": "%s","rowdata": "%s"}`, ts, data.Datas)
case message.AttributesMes:
attributesData := netbase.UpdateDeviceAttributesData(data.Datas)
if attributesData == nil {
return res, nil
switch eventType {
case message.RowMes:
data.Type = message.RowMes
data.Datas = fmt.Sprintf(`{"ts": "%s","rowdata": "%s"}`, ts, data.Datas)
case message.AttributesMes:
attributesData := netbase.UpdateDeviceAttributesData(data.Datas)
if attributesData == nil {
global.Log.Error("属性数据格式错误,解析失败")
return
}
bytes, _ := json.Marshal(attributesData)
data.Datas = string(bytes)
case message.TelemetryMes:
// 数据处理 如果上传的数据没有时间戳 添加时间戳更改格式化
telemetryData := netbase.UpdateDeviceTelemetryData(data.Datas)
if telemetryData == nil {
global.Log.Error("遥测数据格式错误,解析失败")
return
}
bytes, _ := json.Marshal(telemetryData)
data.Datas = string(bytes)
case message.RpcRequestFromDevice:
// 获取请求id
id := netbase.GetRequestIdFromTopic(RpcReq, in.Message.Topic)
data.RequestId = id
}
bytes, _ := json.Marshal(attributesData)
data.Datas = string(bytes)
case message.TelemetryMes:
// 数据处理 如果上传的数据没有时间戳 添加时间戳更改格式化
telemetryData := netbase.UpdateDeviceTelemetryData(data.Datas)
if telemetryData == nil {
return res, nil
}
bytes, _ := json.Marshal(telemetryData)
data.Datas = string(bytes)
case message.RpcRequestFromDevice:
// 获取请求id
id := netbase.GetRequestIdFromTopic(RpcReq, in.Message.Topic)
data.RequestId = id
}
//将数据放到队列中
s.HookService.Queue.Queue(data)
//将数据放到队列中
s.HookService.Queue.Queue(data)
}()
res.Value = &exhook2.ValuedResponse_Message{Message: in.Message}
return res, nil
}