[优化]大改动,指令下发采用规则链rpc请求

This commit is contained in:
PandaX
2023-10-14 10:00:05 +08:00
parent 42be3b23e4
commit 7c8001a687
54 changed files with 1256 additions and 294 deletions

View File

@@ -6,6 +6,7 @@ import (
"fmt"
mqtt "github.com/eclipse/paho.mqtt.golang"
"math/rand"
"pandax/pkg/global_model"
"time"
)
@@ -26,19 +27,10 @@ type RpcRequest struct {
Timeout int // 设置双向时,等待的超时时间
}
type RpcPayload struct {
Method string `json:"method"`
Params any `json:"params"`
}
// RequestCmd 下发指令
func (rpc RpcRequest) RequestCmd(rpcPayload RpcPayload) (respPayload string, err error) {
func (rpc RpcRequest) RequestCmd(rpcPayload string) (respPayload string, err error) {
topic := fmt.Sprintf(RpcReqTopic, rpc.RequestId)
payload, err := json.Marshal(rpcPayload)
if err != nil {
return "", err
}
err = rpc.Client.Pub(topic, 0, string(payload))
err = rpc.Client.Pub(topic, 0, rpcPayload)
if err != nil {
return "", err
}
@@ -73,7 +65,7 @@ func (rpc RpcRequest) RequestCmd(rpcPayload RpcPayload) (respPayload string, err
}
// RequestAttributes rpc 下发属性
func (rpc RpcRequest) RequestAttributes(rpcPayload RpcPayload) error {
func (rpc RpcRequest) RequestAttributes(rpcPayload global_model.RpcPayload) error {
topic := fmt.Sprintf(RpcReqTopic, rpc.RequestId)
if rpcPayload.Method == "" {
rpcPayload.Method = "setAttributes"
@@ -85,18 +77,9 @@ func (rpc RpcRequest) RequestAttributes(rpcPayload RpcPayload) error {
return rpc.Client.Pub(topic, 0, string(payload))
}
// RespondTpc 处理设备端请求服务端方法
func (rpc RpcRequest) RespondTpc(reqPayload RpcPayload) error {
func (rpc RpcRequest) Pub(reqPayload string) error {
topic := fmt.Sprintf(RpcRespTopic, rpc.RequestId)
//TODO 此处处理设备的请求参数逻辑
//自己定义请求逻辑
if reqPayload.Params == "getCurrentTime" {
unix := time.Now().Unix()
msg := fmt.Sprintf("%d", unix)
return rpc.Client.Pub(topic, 0, msg)
}
// 获取属性 ...
return nil
return rpc.Client.Pub(topic, 0, reqPayload)
}
func (rpc *RpcRequest) GetRequestId() {

View File

@@ -12,6 +12,7 @@ import (
ruleService "pandax/apps/rule/services"
"pandax/iothub/netbase"
"pandax/pkg/global"
"pandax/pkg/global_model"
"pandax/pkg/rule_engine"
"pandax/pkg/rule_engine/message"
"pandax/pkg/shadow"
@@ -44,7 +45,7 @@ func (s *HookService) handleOne(msg *netbase.DeviceEventInfo) {
}
}()
switch msg.Type {
case message.RowMes, message.AttributesMes, message.TelemetryMes, message.RpcRequestMes:
case message.RowMes, message.AttributesMes, message.TelemetryMes, message.RpcRequestFromDevice:
msgVals := make(map[string]interface{})
err := json.Unmarshal([]byte(msg.Datas), &msgVals)
if err != nil {
@@ -74,7 +75,7 @@ func (s *HookService) handleOne(msg *netbase.DeviceEventInfo) {
global.Log.Error("规则链执行失败", errs)
}
// 保存设备影子
if msg.Type != message.RpcRequestMes {
if msg.Type != message.RpcRequestFromDevice {
SetDeviceShadow(msg.DeviceAuth, ruleMessage.Msg, msg.Type)
}
case message.DisConnectMes, message.ConnectMes:
@@ -87,7 +88,8 @@ func (s *HookService) handleOne(msg *netbase.DeviceEventInfo) {
}
// 更改设备在线状态
if msg.Type == message.ConnectMes {
services.DeviceModelDao.UpdateStatus(msg.DeviceId, global.ONLINE)
err := services.DeviceModelDao.UpdateStatus(msg.DeviceId, global.ONLINE)
global.Log.Error(err)
} else {
services.DeviceModelDao.UpdateStatus(msg.DeviceId, global.OFFLINE)
}
@@ -106,7 +108,7 @@ func (s *HookService) handleOne(msg *netbase.DeviceEventInfo) {
}()
}
func getRuleChain(etoken *tool.DeviceAuth) *ruleEntity.RuleDataJson {
func getRuleChain(etoken *global_model.DeviceAuth) *ruleEntity.RuleDataJson {
defer func() {
if err := recover(); err != nil {
global.Log.Error(err)
@@ -118,21 +120,22 @@ func getRuleChain(etoken *tool.DeviceAuth) *ruleEntity.RuleDataJson {
rule := ruleService.RuleChainModelDao.FindOne(one.RuleChainId)
return rule.RuleDataJson, nil
})
ruleData := ruleEntity.RuleDataJson{}
biz.ErrIsNil(err, "缓存读取规则链失败")
ruleData := ruleEntity.RuleDataJson{}
err = tool.StringToStruct(get.(string), &ruleData)
biz.ErrIsNil(err, "规则链数据转化失败")
return &ruleData
}
func buildRuleMessage(etoken *tool.DeviceAuth, msgVals map[string]interface{}, msgType string) *message.Message {
func buildRuleMessage(etoken *global_model.DeviceAuth, msgVals map[string]interface{}, msgType string) *message.Message {
metadataVals := map[string]interface{}{
"deviceId": etoken.DeviceId,
"deviceName": etoken.Name,
"deviceType": etoken.DeviceType,
"productId": etoken.ProductId,
"orgId": etoken.OrgId,
"owner": etoken.Owner,
"deviceId": etoken.DeviceId,
"deviceName": etoken.Name,
"deviceType": etoken.DeviceType,
"deviceProtocol": etoken.DeviceProtocol,
"productId": etoken.ProductId,
"orgId": etoken.OrgId,
"owner": etoken.Owner,
}
return message.NewMessage(etoken.Owner, msgType, msgVals, metadataVals)
}
@@ -154,7 +157,7 @@ func SendZtWebsocket(deviceId, message string) {
}
// SetDeviceShadow 设置设备点
func SetDeviceShadow(etoken *tool.DeviceAuth, msgVals map[string]interface{}, msgType string) {
func SetDeviceShadow(etoken *global_model.DeviceAuth, msgVals map[string]interface{}, msgType string) {
defer func() {
if err := recover(); &err != nil {
global.Log.Error(err)

View File

@@ -5,8 +5,8 @@ import (
"pandax/apps/device/services"
"pandax/iothub/server/emqxserver/protobuf"
"pandax/pkg/global"
"pandax/pkg/global_model"
"pandax/pkg/tdengine"
"pandax/pkg/tool"
"regexp"
"strings"
"time"
@@ -17,7 +17,7 @@ func Auth(authToken string) bool {
if authToken == "pandax" {
return true
}
etoken := &tool.DeviceAuth{}
etoken := &global_model.DeviceAuth{}
// redis 中有就查询,没有就添加
exists, err := global.RedisDb.Exists(global.RedisDb.Context(), authToken).Result()
if exists == 1 {
@@ -28,7 +28,9 @@ func Auth(authToken string) bool {
global.Log.Infof("设备token %s 不存在", authToken)
return false
}
etoken, err = services.GetDeviceToken(device)
etoken = services.GetDeviceToken(&device.Device)
etoken.DeviceProtocol = device.Product.ProtocolName
err = global.RedisDb.Set(authToken, etoken.GetMarshal(), time.Hour*24*365)
if err != nil {
global.Log.Infof("设备TOKEN %s添加缓存失败", authToken)
return false
@@ -46,8 +48,8 @@ func Auth(authToken string) bool {
return true
}
func SubAuth(name string) (*tool.DeviceAuth, bool) {
etoken := &tool.DeviceAuth{}
func SubAuth(name string) (*global_model.DeviceAuth, bool) {
etoken := &global_model.DeviceAuth{}
// redis 中有就查询,没有就添加
exists, err := global.RedisDb.Exists(global.RedisDb.Context(), name).Result()
if exists == 1 {
@@ -59,7 +61,10 @@ func SubAuth(name string) (*tool.DeviceAuth, bool) {
global.Log.Infof("设备标识 %s 不存在", name)
return nil, false
}
etoken, err = services.GetDeviceToken(device)
etoken = services.GetDeviceToken(&device.Device)
etoken.DeviceProtocol = device.Product.ProtocolName
// todo 子设备没有token
err = global.RedisDb.Set(device.Token, etoken.GetMarshal(), time.Hour*24*365)
if err != nil {
global.Log.Infof("设备标识 %s添加缓存失败", name)
return nil, false
@@ -148,7 +153,7 @@ func GetRequestIdFromTopic(reg, topic string) (requestId string) {
return ""
}
func CreateConnectionInfo(msgType, protocol, clientID, peerHost string, deviceAuth *tool.DeviceAuth) *DeviceEventInfo {
func CreateConnectionInfo(msgType, protocol, clientID, peerHost string, deviceAuth *global_model.DeviceAuth) *DeviceEventInfo {
ts := time.Now().Format("2006-01-02 15:04:05.000")
ci := &tdengine.ConnectInfo{
ClientID: clientID,

View File

@@ -1,11 +1,13 @@
package netbase
import "pandax/pkg/tool"
import (
"pandax/pkg/global_model"
)
type DeviceEventInfo struct {
DeviceId string `json:"deviceId"`
DeviceAuth *tool.DeviceAuth `json:"deviceAuth"`
Datas string `json:"datas"`
Type string `json:"type"`
RequestId string `json:"requestId"`
DeviceId string `json:"deviceId"`
DeviceAuth *global_model.DeviceAuth `json:"deviceAuth"`
Datas string `json:"datas"`
Type string `json:"type"`
RequestId string `json:"requestId"`
}

View File

@@ -16,7 +16,7 @@ const (
TelemetryGatewayTopic = "v1/gateway/telemetry"
ConnectGatewayTopic = "v1/gateway/connect"
RpcReqReg = `v1/devices/me/rpc/request/(.*?)$`
RpcReq = `v1/devices/me/rpc/request/(.*?)$`
)
var IotHubTopic = NewIotHubTopic()
@@ -40,7 +40,7 @@ func (iht TopicMeg) GetMessageType(topic string) string {
return meg
}
if strings.Contains(topic, "v1/devices/me/rpc/request") {
return message.RpcRequestMes
return message.RpcRequestFromDevice
}
return ""
}

View File

@@ -9,8 +9,8 @@ import (
"pandax/iothub/netbase"
exhook2 "pandax/iothub/server/emqxserver/protobuf"
"pandax/pkg/global"
"pandax/pkg/global_model"
"pandax/pkg/rule_engine/message"
"pandax/pkg/tool"
"time"
)
@@ -82,12 +82,11 @@ func (s *HookGrpcService) OnClientConnack(ctx context.Context, in *exhook2.Clien
func (s *HookGrpcService) OnClientConnected(ctx context.Context, in *exhook2.ClientConnectedRequest) (*exhook2.EmptySuccess, error) {
global.Log.Info(fmt.Sprintf("Client %s Connected ", in.Clientinfo.GetNode()))
if in.Clientinfo.Clientid == mqttclient.DefaultDownStreamClientId {
return &exhook2.EmptySuccess{}, nil
}
token := netbase.GetUserName(in.Clientinfo)
etoken := &tool.DeviceAuth{}
etoken := &global_model.DeviceAuth{}
etoken.GetDeviceToken(token)
data := netbase.CreateConnectionInfo(message.ConnectMes, "mqtt", in.Clientinfo.Clientid, in.Clientinfo.Peerhost, etoken)
s.HookService.MessageCh <- data
@@ -100,7 +99,7 @@ func (s *HookGrpcService) OnClientDisconnected(ctx context.Context, in *exhook2.
if in.Clientinfo.Clientid == mqttclient.DefaultDownStreamClientId {
return &exhook2.EmptySuccess{}, nil
}
etoken := &tool.DeviceAuth{}
etoken := &global_model.DeviceAuth{}
err := etoken.GetDeviceToken(token)
if err != nil {
return nil, err
@@ -182,7 +181,7 @@ func (s *HookGrpcService) OnMessagePublish(ctx context.Context, in *exhook2.Mess
res.Value = &exhook2.ValuedResponse_BoolResult{BoolResult: true}
return res, nil
}
etoken := &tool.DeviceAuth{}
etoken := &global_model.DeviceAuth{}
etoken.GetDeviceToken(in.Message.Headers["username"])
// 获取topic类型
ts := time.Now().Format("2006-01-02 15:04:05.000")
@@ -273,9 +272,9 @@ func (s *HookGrpcService) OnMessagePublish(ctx context.Context, in *exhook2.Mess
}
bytes, _ := json.Marshal(telemetryData)
data.Datas = string(bytes)
case message.RpcRequestMes:
case message.RpcRequestFromDevice:
// 获取请求id
id := netbase.GetRequestIdFromTopic(RpcReqReg, in.Message.Topic)
id := netbase.GetRequestIdFromTopic(RpcReq, in.Message.Topic)
data.RequestId = id
}

View File

@@ -11,8 +11,8 @@ import (
"pandax/iothub/hook_message_work"
"pandax/iothub/netbase"
"pandax/pkg/global"
"pandax/pkg/global_model"
"pandax/pkg/rule_engine/message"
"pandax/pkg/tool"
"sync"
"time"
)
@@ -42,7 +42,7 @@ func InitHttpHook(addr string, hs *hook_message_work.HookService) {
case http.StateHijacked, http.StateClosed:
etoken, _ := activeConnections.Load(conn.RemoteAddr().String())
if etoken != nil {
data := netbase.CreateConnectionInfo(message.DisConnectMes, "http", conn.RemoteAddr().String(), conn.RemoteAddr().String(), etoken.(*tool.DeviceAuth))
data := netbase.CreateConnectionInfo(message.DisConnectMes, "http", conn.RemoteAddr().String(), conn.RemoteAddr().String(), etoken.(*global_model.DeviceAuth))
activeConnections.Delete(conn.RemoteAddr().String())
service.HookService.MessageCh <- data
}
@@ -80,7 +80,7 @@ func (hhs *HookHttpService) hook(req *restful.Request, resp *restful.Response) {
resp.Write([]byte("解析上报参数失败"))
return
}
etoken := &tool.DeviceAuth{}
etoken := &global_model.DeviceAuth{}
etoken.GetDeviceToken(token)
_, ok := activeConnections.Load(req.Request.RemoteAddr)
// 是否需要添加设备上线通知

View File

@@ -9,8 +9,8 @@ import (
"pandax/iothub/hook_message_work"
"pandax/iothub/netbase"
"pandax/pkg/global"
"pandax/pkg/global_model"
"pandax/pkg/rule_engine/message"
"pandax/pkg/tool"
"time"
)
@@ -48,7 +48,7 @@ func InitTcpHook(addr string, hs *hook_message_work.HookService) {
func (hhs *HookTcpService) hook() {
isAuth := false
etoken := &tool.DeviceAuth{}
etoken := &global_model.DeviceAuth{}
for {
buf := make([]byte, 128)
n := 0