【fix】子设备上报方式修复

This commit is contained in:
XM-GO
2023-09-28 10:38:02 +08:00
parent 0dc58c0d18
commit 0b568d16e1
13 changed files with 378 additions and 64 deletions

View File

@@ -47,7 +47,7 @@ return {msg: msg, metadata: metadata, msgType: msgType};
```
## 网关子设备属性上报格式
devA 为设备ID
devA 为设备标识
```json
{
"devA": {
@@ -62,18 +62,25 @@ devA 为设备ID
```
## 网关子设备遥测上报格式
devA 为设备ID
devA 为设备标识
```json
{
"devA": [
{
"devA": {
"ts": 1689837909000,
"values": {
"telemetry1": "value1",
"telemetry2": 0
}
}
]
}
```
或者
```json
{
"devA": {
"telemetry1": "value1",
"telemetry2": 0
}
}
```
@@ -84,13 +91,11 @@ devA 为设备ID
"devB": "offline"
}
```
## 命令下发格式
下发的ID和相应的ID相同
## 命令下发,设备请求格式,
```json
{
"id": "2343",
"cmd": "restart",
"content": {
"method": "restart",
"params": {
"firmware_address": "http://xxx.yyy.com",
"version": "latest",
"secret": "****",
@@ -98,14 +103,13 @@ devA 为设备ID
}
}
```
属性下发 method: 'setAttributes'
## 命令响应的格式
success 返回结果必传 content代表输出参数可选
```json
{
"id": "2343",
"success": true,
"content": {
"method": "2343",
"params": {
"aa": "2"
}
}

View File

@@ -5,6 +5,7 @@ import (
"encoding/json"
"fmt"
"github.com/PandaXGO/PandaKit/biz"
"log"
"pandax/apps/device/entity"
"pandax/apps/device/services"
ruleEntity "pandax/apps/rule/entity"
@@ -67,6 +68,7 @@ func (s *HookService) handleOne(msg *netbase.DeviceEventInfo) {
global.Log.Error("规则链执行失败", errs)
}
// 保存设备影子
log.Println(ruleMessage.Msg, msg.Type, msg.DeviceAuth)
if msg.Type != message.RpcRequestMes {
SetDeviceShadow(msg.DeviceAuth, ruleMessage.Msg, msg.Type)
}
@@ -106,7 +108,7 @@ func getRuleChain(etoken *tool.DeviceAuth) *ruleEntity.RuleDataJson {
}
}()
key := etoken.ProductId
get, err := global.Cache.ComputeIfAbsent(key, func(k any) (any, error) {
get, err := global.ProductCache.ComputeIfAbsent(key, func(k any) (any, error) {
one := services.ProductModelDao.FindOne(k.(string))
rule := ruleService.RuleChainModelDao.FindOne(one.RuleChainId)
return rule.RuleDataJson, nil

View File

@@ -2,7 +2,6 @@ package netbase
import (
"encoding/json"
"log"
"pandax/apps/device/services"
"pandax/iothub/server/emqxserver/protobuf"
"pandax/pkg/global"
@@ -25,7 +24,6 @@ func Auth(authToken string) bool {
err = global.RedisDb.Get(authToken, etoken)
} else {
device, err := services.DeviceModelDao.FindOneByToken(authToken)
log.Println(err)
if err != nil {
global.Log.Infof("设备token %s 不存在", authToken)
return false
@@ -48,6 +46,32 @@ func Auth(authToken string) bool {
return true
}
func SubAuth(name string) (*tool.DeviceAuth, bool) {
etoken := &tool.DeviceAuth{}
// redis 中有就查询,没有就添加
exists, err := global.RedisDb.Exists(global.RedisDb.Context(), name).Result()
if exists == 1 {
err = etoken.GetDeviceToken(name)
} else {
device, err := services.DeviceModelDao.FindOneByName(name)
// 没有设备就要创建设备
if err != nil {
global.Log.Infof("设备标识 %s 不存在", name)
return nil, false
}
etoken, err = services.GetDeviceToken(device)
if err != nil {
global.Log.Infof("设备标识 %s添加缓存失败", name)
return nil, false
}
}
if err != nil {
global.Log.Infof("无效设备标识 %s", name)
return nil, false
}
return etoken, true
}
// 解析遥测数据类型 返回标准带时间戳格式
func UpdateDeviceTelemetryData(data string) map[string]interface{} {
tel := make(map[string]interface{})

View File

@@ -15,7 +15,6 @@ const (
AttributesGatewayTopic = "v1/gateway/attributes"
TelemetryGatewayTopic = "v1/gateway/telemetry"
ConnectGatewayTopic = "v1/gateway/connect"
DisconnectGatewayTopic = "v1/gateway/disconnect"
RpcReqReg = `v1/devices/me/rpc/request/(.*?)$`
)
@@ -33,7 +32,6 @@ func NewIotHubTopic() TopicMeg {
AttributesGatewayTopic: message.GATEWAY,
TelemetryGatewayTopic: message.GATEWAY,
ConnectGatewayTopic: message.GATEWAY,
DisconnectGatewayTopic: message.GATEWAY,
}
}

View File

@@ -203,15 +203,15 @@ func (s *HookGrpcService) OnMessagePublish(ctx context.Context, in *exhook2.Mess
res.Value = &exhook2.ValuedResponse_BoolResult{BoolResult: false}
return res, nil
}
// key就是deviceId
// key就是device name
for key, value := range subData {
etoken := &tool.DeviceAuth{}
err = global.RedisDb.Get(key, etoken)
if err != nil {
global.Log.Infof("%s设备不存在", key)
auth, isSub := netbase.SubAuth(key)
if !isSub {
global.Log.Infof("子设备%s 标识不存在,请先创建设备标识", key)
continue
}
data.DeviceId = key
data.DeviceAuth = auth
data.DeviceId = auth.DeviceId
if in.Message.Topic == AttributesGatewayTopic {
data.Type = message.AttributesMes
marshal, _ := json.Marshal(value)
@@ -227,36 +227,28 @@ func (s *HookGrpcService) OnMessagePublish(ctx context.Context, in *exhook2.Mess
if in.Message.Topic == TelemetryGatewayTopic {
data.Type = message.TelemetryMes
// 数据处理 如果上传的数据没有时间戳 添加时间戳更改格式化
telData := make([]map[string]interface{}, 0)
// 解析子设备遥测数据结构
marshal, _ := json.Marshal(value)
err := json.Unmarshal(marshal, &telData)
if err != nil {
global.Log.Infof("%s子设备遥测数据结构错误", key)
td, _ := json.Marshal(value)
telemetryData := netbase.UpdateDeviceTelemetryData(string(td))
if telemetryData == nil {
continue
}
for _, da := range telData {
td, _ := json.Marshal(da)
telemetryData := netbase.UpdateDeviceTelemetryData(string(td))
if telemetryData == nil {
continue
bytes, _ := json.Marshal(telemetryData)
data.Datas = string(bytes)
// 子设备发送到队列里
s.HookService.MessageCh <- data
}
if in.Message.Topic == ConnectGatewayTopic {
if val, ok := value.(string); ok {
if val == "online" {
data = netbase.CreateConnectionInfo(message.ConnectMes, "mqtt", in.Message.From, in.Message.Headers["peerhost"], auth)
}
if val == "offline" {
data = netbase.CreateConnectionInfo(message.DisConnectMes, "mqtt", in.Message.From, in.Message.Headers["peerhost"], auth)
}
bytes, _ := json.Marshal(telemetryData)
data.Datas = string(bytes)
// 子设备发送到队列里
s.HookService.MessageCh <- data
}
}
if in.Message.Topic == ConnectGatewayTopic {
data = netbase.CreateConnectionInfo(message.ConnectMes, "mqtt", in.Message.From, in.Message.Headers["peerhost"], etoken)
// 子设备发送到队列里
s.HookService.MessageCh <- data
}
if in.Message.Topic == DisconnectGatewayTopic {
data = netbase.CreateConnectionInfo(message.DisConnectMes, "mqtt", in.Message.From, in.Message.Headers["peerhost"], etoken)
// 子设备发送到队列里
s.HookService.MessageCh <- data
}
}
res.Value = &exhook2.ValuedResponse_Message{Message: in.Message}
return res, nil

View File

@@ -6,7 +6,6 @@ import (
"fmt"
"github.com/emicklei/go-restful/v3"
"io"
"log"
"net"
"net/http"
"pandax/iothub/hook_message_work"
@@ -41,11 +40,10 @@ func InitHttpHook(addr string, hs *hook_message_work.HookService) {
// 断开连接
switch state {
case http.StateHijacked, http.StateClosed:
etoken, _ := activeConnections.Load(conn.RemoteAddr())
log.Println("关闭http连接")
etoken, _ := activeConnections.Load(conn.RemoteAddr().String())
data := netbase.CreateConnectionInfo(message.DisConnectMes, "http", conn.RemoteAddr().String(), conn.RemoteAddr().String(), etoken.(*tool.DeviceAuth))
activeConnections.Delete(conn.RemoteAddr().String())
service.HookService.MessageCh <- data
activeConnections.Delete(conn.RemoteAddr())
}
}
err := server.Start(context.TODO())