This commit is contained in:
tfl
2024-08-21 17:35:50 +08:00
parent 34ea7472b7
commit dd5b38b4e3
24 changed files with 328 additions and 133 deletions

View File

@@ -91,7 +91,7 @@ devA 为设备标识
"devB": "offline"
}
```
## 命令下发,设备请求格式,
## 服务端命令下发,设备请求格式,
```json
{
"method": "restart",
@@ -103,14 +103,28 @@ devA 为设备标识
}
}
```
属性下发 method: 'setAttributes'
## 命令响应的格式
## 服务端属性下发 method: 'setAttributes'
```json
{
"method": "2343",
"method": "setAttributes",
"params": {
"aa": "2"
}
}
```
## 设备端 请求的格式
{
"method": "getCurrentTime",
"params": {
"aa": "2"
}
}
## 设备端 响应的格式
{
"method": "cmdResp",
"params": {
"aa": "2"
}
}

View File

@@ -3,13 +3,11 @@ package mqttclient
import (
"errors"
"fmt"
"math/rand"
"time"
)
const (
RpcRespTopic = `v1/devices/me/rpc/response/%d`
RpcReqTopic = `v1/devices/me/rpc/request/%d`
RpcRespTopic = `v1/devices/me/rpc/response/%s`
RpcReqTopic = `v1/devices/me/rpc/request/%s`
)
const (
@@ -18,7 +16,7 @@ const (
)
type RpcRequest struct {
RequestId int
RequestId string
Mode string //单向、双向 单项只发送不等待响应 双向需要等到响应
Timeout int // 设置双向时,等待的超时时间
}
@@ -41,9 +39,3 @@ func (rpc RpcRequest) Pub(deviceId, reqPayload string) error {
}
return Publish(topic, value.(string), reqPayload)
}
func (rpc *RpcRequest) GetRequestId() {
rand.Seed(time.Now().UnixNano())
// 生成随机整数
rpc.RequestId = rand.Intn(10000) + 1 // 生成0到99之间的随机整数
}

View File

@@ -0,0 +1,47 @@
package udpclient
import (
"encoding/hex"
"net"
"pandax/pkg/global"
"sync"
)
type UdpClientT struct {
Conn *net.UDPConn
Addr *net.UDPAddr
}
var UdpClient sync.Map
func Send(deviceId, msg string) error {
if conn, ok := UdpClient.Load(deviceId); ok {
conn := conn.(*UdpClientT)
global.Log.Infof("设备%s, 发送指令%s", deviceId, msg)
_, err := conn.Conn.WriteToUDP([]byte(msg), conn.Addr)
if err != nil {
return err
}
} else {
global.Log.Infof("设备%s TCP连接不存在, 发送指令失败", deviceId)
}
return nil
}
func SendHex(deviceId, msg string) error {
if conn, ok := UdpClient.Load(deviceId); ok {
conn := conn.(*UdpClientT)
global.Log.Infof("设备%s, 发送指令%s", deviceId, msg)
b, err := hex.DecodeString(msg)
if err != nil {
return err
}
_, err = conn.Conn.WriteToUDP(b, conn.Addr)
if err != nil {
return err
}
} else {
global.Log.Infof("设备%s TCP连接不存在, 发送指令失败", deviceId)
}
return nil
}

View File

@@ -14,8 +14,12 @@ import (
"pandax/pkg/global/model"
"pandax/pkg/rule_engine"
"pandax/pkg/rule_engine/message"
"pandax/pkg/tdengine"
"pandax/pkg/tool"
"pandax/pkg/websocket"
"time"
"github.com/kakuilan/kgo"
)
// 消息处理模块
@@ -38,11 +42,8 @@ func (s *HookService) handleOne(msg *netbase.DeviceEventInfo) {
<-s.Ch
}
}()
// 去除上传数据的非法空字符
// msg.Datas = strings.ReplaceAll(msg.Datas, "\\u0000", "")
switch msg.Type {
case message.RowMes, message.AttributesMes, message.TelemetryMes, message.RpcRequestFromDevice:
case message.RowMes, message.AttributesMes, message.TelemetryMes, message.RpcRequestFromDevice, message.UpEventMes:
msgVals := make(map[string]interface{})
err := json.Unmarshal([]byte(msg.Datas), &msgVals)
if err != nil {
@@ -65,6 +66,29 @@ func (s *HookService) handleOne(msg *netbase.DeviceEventInfo) {
global.Log.Error("规则链执行失败", err)
return
}
// 保存事件信息
if msg.Type == message.UpEventMes {
tsl, err := services.ProductTemplateModelDao.FindOneByKey(msg.DeviceId, msg.Identifier)
if err != nil {
return
}
ci := &tdengine.Events{
DeviceId: msg.DeviceId,
Name: msg.Identifier,
Type: tsl.Type,
Content: msg.Datas,
Ts: time.Now().Format("2006-01-02 15:04:05.000"),
}
data, err := kgo.KConv.Struct2Map(ci, "")
if err != nil {
global.Log.Error("事件格式转化错误")
return
}
err = global.TdDb.InsertEvent(data)
if err != nil {
global.Log.Error("事件添加错误", err)
}
}
case message.DisConnectMes, message.ConnectMes:
// 更改设备在线状态
if msg.Type == message.ConnectMes {

View File

@@ -2,7 +2,6 @@ package netbase
import (
"encoding/json"
"fmt"
"pandax/apps/device/entity"
"pandax/apps/device/services"
exhook "pandax/iothub/server/emqxserver/protobuf"
@@ -182,6 +181,15 @@ func SplitLwm2mClientID(lwm2mClientID string, index int) string {
}
func GetRequestIdFromTopic(reg, topic string) (requestId string) {
re := regexp.MustCompile(reg)
res := re.FindStringSubmatch(topic)
if len(res) > 2 {
return res[2]
}
return ""
}
func GetEventFromTopic(reg, topic string) (identifier string) {
re := regexp.MustCompile(reg)
res := re.FindStringSubmatch(topic)
if len(res) > 1 {
@@ -190,13 +198,14 @@ func GetRequestIdFromTopic(reg, topic string) (requestId string) {
return ""
}
func CreateConnectionInfo(msgType, protocol, clientID, peerHost string, deviceAuth *model.DeviceAuth) *DeviceEventInfo {
// eventType 事件类型 info alarm
func CreateEvent(msgType, eventType, content string, deviceAuth *model.DeviceAuth) *DeviceEventInfo {
ts := time.Now().Format("2006-01-02 15:04:05.000")
ci := &tdengine.Events{
DeviceId: deviceAuth.DeviceId,
Name: msgType,
Type: "info",
Content: fmt.Sprintf("设备%s, %s 事件", deviceAuth.Name, msgType),
Type: eventType,
Content: content,
Ts: ts,
}
v, err := json.Marshal(*ci)

View File

@@ -10,7 +10,8 @@ type DeviceEventInfo struct {
DeviceAuth *model.DeviceAuth `json:"deviceAuth"`
Datas string `json:"datas"`
Type string `json:"type"`
RequestId string `json:"requestId"`
RequestId string `json:"requestId"` // rpc 请求ID
Identifier string `json:"identifier"` //事件标识
}
func (j *DeviceEventInfo) Bytes() []byte {

View File

@@ -16,7 +16,9 @@ const (
TelemetryGatewayTopic = "v1/gateway/telemetry"
ConnectGatewayTopic = "v1/gateway/connect"
RpcReq = `v1/devices/me/rpc/request/(.*?)$`
RpcReq = `v1/devices/me/rpc/(.*?)/(.*?)$`
EventReq = `v1/devices/event/(.*?)$`
)
var IotHubTopic = NewIotHubTopic()
@@ -39,8 +41,11 @@ func (iht TopicMeg) GetMessageType(topic string) string {
if meg, ok := iht[topic]; ok {
return meg
}
if strings.Contains(topic, "v1/devices/me/rpc/request") {
if strings.Contains(topic, "v1/devices/me/rpc/request") || strings.Contains(topic, "v1/devices/me/rpc/response") {
return message.RpcRequestFromDevice
}
if strings.Contains(topic, "v1/devices/event") {
return message.UpEventMes
}
return ""
}

View File

@@ -88,7 +88,7 @@ func (s *HookGrpcService) OnClientConnected(ctx context.Context, in *exhook2.Cli
}
//添加连接ID
mqttclient.Session.Store(etoken.DeviceId, in.Clientinfo.Clientid)
data := netbase.CreateConnectionInfo(message.ConnectMes, "mqtt", in.Clientinfo.Clientid, in.Clientinfo.Peerhost, etoken)
data := netbase.CreateEvent(message.ConnectMes, "info", fmt.Sprintf("设备%s通过MQTT协议连接", etoken.Name), etoken)
go s.HookService.Queue.Queue(data)
return &exhook2.EmptySuccess{}, nil
}
@@ -103,7 +103,7 @@ func (s *HookGrpcService) OnClientDisconnected(ctx context.Context, in *exhook2.
}
//删除连接ID
mqttclient.Session.Delete(etoken.DeviceId)
data := netbase.CreateConnectionInfo(message.DisConnectMes, "mqtt", in.Clientinfo.Clientid, in.Clientinfo.Peerhost, etoken)
data := netbase.CreateEvent(message.DisConnectMes, "info", fmt.Sprintf("设备%s断开连接", etoken.Name), etoken)
go s.HookService.Queue.Queue(data)
return &exhook2.EmptySuccess{}, nil
}
@@ -243,10 +243,10 @@ func (s *HookGrpcService) OnMessagePublish(ctx context.Context, in *exhook2.Mess
if in.Message.Topic == ConnectGatewayTopic {
if val, ok := value.(string); ok {
if val == "online" {
data = netbase.CreateConnectionInfo(message.ConnectMes, "mqtt", in.Message.From, in.Message.Headers["peerhost"], auth)
data = netbase.CreateEvent(message.ConnectMes, "info", fmt.Sprintf("子设备%s通过网关连接", etoken.Name), auth)
}
if val == "offline" {
data = netbase.CreateConnectionInfo(message.DisConnectMes, "mqtt", in.Message.From, in.Message.Headers["peerhost"], auth)
data = netbase.CreateEvent(message.ConnectMes, "info", fmt.Sprintf("子设备设备%s通过网关连接", etoken.Name), auth)
}
// 子设备发送到队列里
go s.HookService.Queue.Queue(data)
@@ -281,6 +281,10 @@ func (s *HookGrpcService) OnMessagePublish(ctx context.Context, in *exhook2.Mess
// 获取请求id
id := netbase.GetRequestIdFromTopic(RpcReq, in.Message.Topic)
data.RequestId = id
case message.UpEventMes:
data.Type = message.UpEventMes
identifier := netbase.GetEventFromTopic(EventReq, in.Message.Topic)
data.Identifier = identifier
}
//将数据放到队列中
go s.HookService.Queue.Queue(data)

View File

@@ -153,12 +153,12 @@ func (hhs *HookHttpService) hook(req *restful.Request, resp *restful.Response) {
func (cm *ConnectionManager) AddConnection(addr string, etoken *model.DeviceAuth, service *hook_message_work.HookService) {
cm.activeConnections.Store(addr, etoken)
data := netbase.CreateConnectionInfo(message.ConnectMes, "http", addr, addr, etoken)
data := netbase.CreateEvent(message.ConnectMes, "info", fmt.Sprintf("设备%s通过HTTP协议连接", etoken.Name), etoken)
go service.Queue.Queue(data)
}
func (cm *ConnectionManager) RemoveConnection(addr string, etoken *model.DeviceAuth, service *hook_message_work.HookService) {
data := netbase.CreateConnectionInfo(message.DisConnectMes, "http", addr, addr, etoken)
data := netbase.CreateEvent(message.DisConnectMes, "info", fmt.Sprintf("设备%s断开连接", etoken.Name), etoken)
cm.activeConnections.Delete(addr)
go service.Queue.Queue(data)
}

View File

@@ -47,7 +47,7 @@ func handleConnection(conn *net.TCPConn, hs *hook_message_work.HookService) {
defer func() {
_ = conn.Close()
if isAuth {
data := netbase.CreateConnectionInfo(message.DisConnectMes, "tcp", conn.RemoteAddr().String(), conn.RemoteAddr().String(), etoken)
data := netbase.CreateEvent(message.DisConnectMes, "info", fmt.Sprintf("设备%s断开连接", etoken.Name), etoken)
go hs.Queue.Queue(data)
}
tcpclient.TcpClient.Delete(etoken.DeviceId)
@@ -67,7 +67,7 @@ func handleConnection(conn *net.TCPConn, hs *hook_message_work.HookService) {
auth := netbase.Auth(token)
if auth {
global.Log.Infof("TCP协议 设备%s,认证成功", etoken.DeviceId)
data := netbase.CreateConnectionInfo(message.ConnectMes, "tcp", conn.RemoteAddr().String(), conn.RemoteAddr().String(), etoken)
data := netbase.CreateEvent(message.ConnectMes, "info", fmt.Sprintf("设备%s通过TCP协议连接", etoken.Name), etoken)
go hs.Queue.Queue(data)
isAuth = true
tcpclient.TcpClient.Store(etoken.DeviceId, conn)

View File

@@ -7,7 +7,7 @@ import (
"net"
"time"
udpclient "pandax/iothub/client/updclient"
udpclient "pandax/iothub/client/udpclient"
"pandax/iothub/hook_message_work"
"pandax/iothub/netbase"
"pandax/pkg/global"
@@ -44,7 +44,7 @@ func InitUdpHook(addr string, hs *hook_message_work.HookService) {
_ = server.listener.Close()
if isAuth, ok := authMap[client.AddrPort().String()]; ok && isAuth {
data := netbase.CreateConnectionInfo(message.DisConnectMes, "udp", client.IP.String(), client.AddrPort().String(), etoken)
data := netbase.CreateEvent(message.DisConnectMes, "info", fmt.Sprintf("设备%s断开连接", etoken.Name), etoken)
go hhs.HookService.Queue.Queue(data)
}
udpclient.UdpClient.Delete(etoken.DeviceId)
@@ -72,7 +72,7 @@ func InitUdpHook(addr string, hs *hook_message_work.HookService) {
if auth {
global.Log.Infof("UDP协议 设备%s,认证成功", etoken.DeviceId)
data := netbase.CreateConnectionInfo(message.ConnectMes, "udp", client.IP.String(), client.AddrPort().String(), etoken)
data := netbase.CreateEvent(message.ConnectMes, "info", fmt.Sprintf("设备%s通过UDP协议连接", etoken.Name), etoken)
go hhs.HookService.Queue.Queue(data)
authMap[client.AddrPort().String()] = true
@@ -112,4 +112,4 @@ func (hhs *HookUdpService) SendBytes(addr *net.UDPAddr, msg []byte) error {
hhs.HookService.MessageCh <- data
}
return err
}
}