From f4144251bdd2a84c9880c0efe9ed99a20220412f Mon Sep 17 00:00:00 2001 From: PandaX <18610165312@163.com> Date: Wed, 29 Nov 2023 14:43:27 +0800 Subject: [PATCH] =?UTF-8?q?[=E4=BC=98=E5=8C=96]=E9=99=8D=E4=BD=8Eexhook?= =?UTF-8?q?=E5=93=8D=E5=BA=94=E6=97=B6=E9=97=B4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- iothub/server/emqxserver/hook.go | 188 ++++++++++++++++--------------- 1 file changed, 95 insertions(+), 93 deletions(-) diff --git a/iothub/server/emqxserver/hook.go b/iothub/server/emqxserver/hook.go index fe0dd2d..659090e 100644 --- a/iothub/server/emqxserver/hook.go +++ b/iothub/server/emqxserver/hook.go @@ -87,7 +87,7 @@ func (s *HookGrpcService) OnClientConnected(ctx context.Context, in *exhook2.Cli return nil, err } //添加连接ID - mqttclient.MqttClient.Store(etoken.DeviceId, in.Clientinfo.Clientid) + go mqttclient.MqttClient.Store(etoken.DeviceId, in.Clientinfo.Clientid) data := netbase.CreateConnectionInfo(message.ConnectMes, "mqtt", in.Clientinfo.Clientid, in.Clientinfo.Peerhost, etoken) s.HookService.Queue.Queue(data) return &exhook2.EmptySuccess{}, nil @@ -102,7 +102,7 @@ func (s *HookGrpcService) OnClientDisconnected(ctx context.Context, in *exhook2. return nil, err } //删除连接ID - mqttclient.MqttClient.Delete(etoken.DeviceId) + go mqttclient.MqttClient.Delete(etoken.DeviceId) data := netbase.CreateConnectionInfo(message.DisConnectMes, "mqtt", in.Clientinfo.Clientid, in.Clientinfo.Peerhost, etoken) s.HookService.Queue.Queue(data) return &exhook2.EmptySuccess{}, nil @@ -180,109 +180,111 @@ func (s *HookGrpcService) OnMessagePublish(ctx context.Context, in *exhook2.Mess res.Value = &exhook2.ValuedResponse_BoolResult{BoolResult: true} return res, nil } - etoken := &model.DeviceAuth{} - etoken.GetDeviceToken(in.Message.Headers["username"]) - // 获取topic类型 - ts := time.Now().Format("2006-01-02 15:04:05.000") - eventType := IotHubTopic.GetMessageType(in.Message.Topic) + // 开启协程为了减少exhook执行时间 + go func() { + etoken := &model.DeviceAuth{} + etoken.GetDeviceToken(in.Message.Headers["username"]) + // 获取topic类型 + ts := time.Now().Format("2006-01-02 15:04:05.000") + eventType := IotHubTopic.GetMessageType(in.Message.Topic) - datas := string(in.GetMessage().GetPayload()) - data := &netbase.DeviceEventInfo{ - Type: eventType, - Datas: datas, - DeviceId: etoken.DeviceId, - DeviceAuth: etoken, - } - // 如果是网关子设备单独处理 - if eventType == message.GATEWAY { - subData := make(map[string]interface{}) - err := json.Unmarshal(in.GetMessage().GetPayload(), &subData) - if err != nil { - global.Log.Warn(fmt.Sprintf("子网关上报数据格式错误")) - res.Value = &exhook2.ValuedResponse_BoolResult{BoolResult: false} - return res, nil + datas := string(in.GetMessage().GetPayload()) + data := &netbase.DeviceEventInfo{ + Type: eventType, + Datas: datas, + DeviceId: etoken.DeviceId, + DeviceAuth: etoken, } - // key就是device name - for deviceName, value := range subData { - auth, isSub := netbase.SubAuth(deviceName) - if !isSub { - continue + // 如果是网关子设备单独处理 + if eventType == message.GATEWAY { + subData := make(map[string]interface{}) + err := json.Unmarshal(in.GetMessage().GetPayload(), &subData) + if err != nil { + global.Log.Error(fmt.Sprintf("子网关上报数据格式错误")) + return } - data.DeviceAuth = auth - data.DeviceId = auth.DeviceId - if in.Message.Topic == AttributesGatewayTopic { - data.Type = message.AttributesMes - marshal, _ := json.Marshal(value) - attributesData := netbase.UpdateDeviceAttributesData(string(marshal)) - if attributesData == nil { + // key就是device name + for deviceName, value := range subData { + auth, isSub := netbase.SubAuth(deviceName) + if !isSub { continue } - bytes, _ := json.Marshal(attributesData) - data.Datas = string(bytes) - // 创建tdengine的设备属性表 - netbase.CreateSubTableField(auth.ProductId, global.TslAttributesType, attributesData) - // 子设备发送到队列里 - s.HookService.MessageCh <- data - } - if in.Message.Topic == TelemetryGatewayTopic { - data.Type = message.TelemetryMes - // 数据处理 如果上传的数据没有时间戳 添加时间戳更改格式化 - td, _ := json.Marshal(value) - telemetryData := netbase.UpdateDeviceTelemetryData(string(td)) - if telemetryData == nil { - continue - } - bytes, _ := json.Marshal(telemetryData) - data.Datas = string(bytes) - // 创建tdengine的设备遥测表 - netbase.CreateSubTableField(auth.ProductId, global.TslTelemetryType, telemetryData) - // 子设备发送到队列里 - s.HookService.MessageCh <- data - } - if in.Message.Topic == ConnectGatewayTopic { - if val, ok := value.(string); ok { - if val == "online" { - data = netbase.CreateConnectionInfo(message.ConnectMes, "mqtt", in.Message.From, in.Message.Headers["peerhost"], auth) - } - if val == "offline" { - data = netbase.CreateConnectionInfo(message.DisConnectMes, "mqtt", in.Message.From, in.Message.Headers["peerhost"], auth) + data.DeviceAuth = auth + data.DeviceId = auth.DeviceId + if in.Message.Topic == AttributesGatewayTopic { + data.Type = message.AttributesMes + marshal, _ := json.Marshal(value) + attributesData := netbase.UpdateDeviceAttributesData(string(marshal)) + if attributesData == nil { + continue } + bytes, _ := json.Marshal(attributesData) + data.Datas = string(bytes) + // 创建tdengine的设备属性表 + netbase.CreateSubTableField(auth.ProductId, global.TslAttributesType, attributesData) // 子设备发送到队列里 - s.HookService.MessageCh <- data + s.HookService.Queue.Queue(data) + } + if in.Message.Topic == TelemetryGatewayTopic { + data.Type = message.TelemetryMes + // 数据处理 如果上传的数据没有时间戳 添加时间戳更改格式化 + td, _ := json.Marshal(value) + telemetryData := netbase.UpdateDeviceTelemetryData(string(td)) + if telemetryData == nil { + continue + } + bytes, _ := json.Marshal(telemetryData) + data.Datas = string(bytes) + // 创建tdengine的设备遥测表 + netbase.CreateSubTableField(auth.ProductId, global.TslTelemetryType, telemetryData) + // 子设备发送到队列里 + s.HookService.Queue.Queue(data) + } + if in.Message.Topic == ConnectGatewayTopic { + if val, ok := value.(string); ok { + if val == "online" { + data = netbase.CreateConnectionInfo(message.ConnectMes, "mqtt", in.Message.From, in.Message.Headers["peerhost"], auth) + } + if val == "offline" { + data = netbase.CreateConnectionInfo(message.DisConnectMes, "mqtt", in.Message.From, in.Message.Headers["peerhost"], auth) + } + // 子设备发送到队列里 + s.HookService.Queue.Queue(data) + } } } + return } - res.Value = &exhook2.ValuedResponse_Message{Message: in.Message} - return res, nil - } - switch eventType { - case message.RowMes: - data.Type = message.RowMes - data.Datas = fmt.Sprintf(`{"ts": "%s","rowdata": "%s"}`, ts, data.Datas) - case message.AttributesMes: - attributesData := netbase.UpdateDeviceAttributesData(data.Datas) - if attributesData == nil { - return res, nil + switch eventType { + case message.RowMes: + data.Type = message.RowMes + data.Datas = fmt.Sprintf(`{"ts": "%s","rowdata": "%s"}`, ts, data.Datas) + case message.AttributesMes: + attributesData := netbase.UpdateDeviceAttributesData(data.Datas) + if attributesData == nil { + global.Log.Error("属性数据格式错误,解析失败") + return + } + bytes, _ := json.Marshal(attributesData) + data.Datas = string(bytes) + case message.TelemetryMes: + // 数据处理 如果上传的数据没有时间戳 添加时间戳更改格式化 + telemetryData := netbase.UpdateDeviceTelemetryData(data.Datas) + if telemetryData == nil { + global.Log.Error("遥测数据格式错误,解析失败") + return + } + bytes, _ := json.Marshal(telemetryData) + data.Datas = string(bytes) + case message.RpcRequestFromDevice: + // 获取请求id + id := netbase.GetRequestIdFromTopic(RpcReq, in.Message.Topic) + data.RequestId = id } - bytes, _ := json.Marshal(attributesData) - data.Datas = string(bytes) - case message.TelemetryMes: - // 数据处理 如果上传的数据没有时间戳 添加时间戳更改格式化 - telemetryData := netbase.UpdateDeviceTelemetryData(data.Datas) - if telemetryData == nil { - return res, nil - } - bytes, _ := json.Marshal(telemetryData) - data.Datas = string(bytes) - case message.RpcRequestFromDevice: - // 获取请求id - id := netbase.GetRequestIdFromTopic(RpcReq, in.Message.Topic) - data.RequestId = id - } - - //将数据放到队列中 - s.HookService.Queue.Queue(data) + //将数据放到队列中 + s.HookService.Queue.Queue(data) + }() res.Value = &exhook2.ValuedResponse_Message{Message: in.Message} return res, nil }