[feat]udp协议数据上传解析

This commit is contained in:
PandaX
2023-10-26 09:00:38 +08:00
parent b129a354bd
commit 47fa50d59f
5 changed files with 93 additions and 53 deletions

View File

@@ -176,7 +176,7 @@ func (s *HookGrpcService) OnMessagePublish(ctx context.Context, in *exhook2.Mess
res.Type = exhook2.ValuedResponse_STOP_AND_RETURN
res.Value = &exhook2.ValuedResponse_BoolResult{BoolResult: false}
//服务端的HTTP请求放行
if in.GetMessage().From == "http_api" {
if in.Message.From == "http_api" {
res.Value = &exhook2.ValuedResponse_BoolResult{BoolResult: true}
return res, nil
}
@@ -193,7 +193,6 @@ func (s *HookGrpcService) OnMessagePublish(ctx context.Context, in *exhook2.Mess
DeviceId: etoken.DeviceId,
DeviceAuth: etoken,
}
// 如果是网关子设备单独处理
if eventType == message.GATEWAY {
subData := make(map[string]interface{})

View File

@@ -3,17 +3,20 @@ package updserver
import (
"context"
"encoding/hex"
"fmt"
"net"
udpclient "pandax/iothub/client/updclient"
"pandax/iothub/hook_message_work"
"pandax/iothub/netbase"
"pandax/pkg/global"
"pandax/pkg/global_model"
"pandax/pkg/rule_engine/message"
"time"
)
type HookUdpService struct {
HookService *hook_message_work.HookService
conn *net.UDPConn
addr *net.UDPAddr
}
func InitUdpHook(addr string, hs *hook_message_work.HookService) {
@@ -26,40 +29,76 @@ func InitUdpHook(addr string, hs *hook_message_work.HookService) {
global.Log.Infof("UDP IOTHUB HOOK Start SUCCESS, Server listen: %s", addr)
}
buffer := make([]byte, 1024)
isAuth := false
etoken := &global_model.DeviceAuth{}
hhs := &HookUdpService{
HookService: hs,
conn: server.listener,
}
for {
n, client, err := server.listener.ReadFromUDP(buffer)
if err != nil {
global.Log.Error("Error accepting connection:", err)
_ = server.listener.Close()
//设置断开连接
if isAuth {
data := netbase.CreateConnectionInfo(message.DisConnectMes, "udp", client.IP.String(), client.AddrPort().String(), etoken)
hhs.HookService.MessageCh <- data
}
delete(udpclient.UdpClient, etoken.DeviceId)
isAuth = false
continue
}
hhs := &HookUdpService{
HookService: hs,
conn: server.listener,
addr: client,
}
go hhs.hook(buffer[:n])
if !isAuth {
token := string(buffer[:n])
etoken.GetDeviceToken(token)
auth := netbase.Auth(token)
// 认证成功,创建连接记录
if auth {
global.Log.Infof("UDP协议 设备%s,认证成功", etoken.DeviceId)
data := netbase.CreateConnectionInfo(message.ConnectMes, "udp", client.IP.String(), client.AddrPort().String(), etoken)
hhs.HookService.MessageCh <- data
isAuth = true
udpclient.UdpClient[etoken.DeviceId] = &udpclient.UdpClientT{
Conn: server.listener,
Addr: client,
}
hhs.Send(client, "success")
} else {
hhs.Send(client, "fail")
}
} else {
data := &netbase.DeviceEventInfo{
DeviceId: etoken.DeviceId,
DeviceAuth: etoken,
}
hexData := hex.EncodeToString(buffer[:n])
global.Log.Infof("UDP协议 设备%s, 接受消息%s", etoken.DeviceId, hexData)
ts := time.Now().Format("2006-01-02 15:04:05.000")
data.Type = message.RowMes
data.Datas = fmt.Sprintf(`{"ts": "%s","rowdata": "%s"}`, ts, hexData)
// etoken中添加设备标识
hhs.HookService.MessageCh <- data
}
}
}
func (hhs *HookUdpService) hook(data []byte) {
hhs.Send("success")
func (hhs *HookUdpService) Send(addr *net.UDPAddr, message string) error {
return hhs.SendBytes(addr, []byte(message))
}
func (hhs *HookUdpService) Send(message string) error {
return hhs.SendBytes([]byte(message))
}
func (hhs *HookUdpService) SendHex(msg string) error {
func (hhs *HookUdpService) SendHex(addr *net.UDPAddr, msg string) error {
b, err := hex.DecodeString(msg)
if err != nil {
return err
}
return hhs.SendBytes(b)
return hhs.SendBytes(addr, b)
}
func (hhs *HookUdpService) SendBytes(msg []byte) error {
_, err := hhs.conn.WriteToUDP(msg, hhs.addr)
func (hhs *HookUdpService) SendBytes(addr *net.UDPAddr, msg []byte) error {
_, err := hhs.conn.WriteToUDP(msg, addr)
if err != nil {
hhs.conn.Close()
data := &netbase.DeviceEventInfo{