【feat】添加tcp指令下发

This commit is contained in:
XM-GO
2023-09-27 17:30:11 +08:00
parent 9eff841b81
commit 0dc58c0d18
19 changed files with 481 additions and 83 deletions

View File

@@ -0,0 +1,79 @@
package mqttclient
import (
"errors"
"fmt"
mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/sirupsen/logrus"
"time"
)
const DefaultDownStreamClientId = `@panda.iothub.internal.clientId`
var MqttClient *IothubMqttClient
type IothubMqttClient struct {
Client mqtt.Client
}
var connectHandler mqtt.OnConnectHandler = func(client mqtt.Client) {
logrus.Infof("Connected")
}
var connectLostHandler mqtt.ConnectionLostHandler = func(client mqtt.Client, err error) {
logrus.Infof("Connect lost: %v", err)
}
func InitMqtt(broker, username, password string) {
server := fmt.Sprintf("tcp://%s", broker)
client := GetMqttClinent(server, username, password)
MqttClient = &IothubMqttClient{
Client: client,
}
}
func GetMqttClinent(server, username, password string) mqtt.Client {
opts := mqtt.NewClientOptions().AddBroker(server)
time.Now().Unix()
// 默认下行ID iothub会过滤掉改Id
opts.SetClientID(DefaultDownStreamClientId)
if username != "" {
opts.SetUsername(username)
}
if password != "" {
opts.SetPassword(password)
}
opts.OnConnect = connectHandler
opts.OnConnectionLost = connectLostHandler
client := mqtt.NewClient(opts)
if token := client.Connect(); token.Wait() && token.Error() != nil {
panic(token.Error())
}
return client
}
// 订阅
func (imc *IothubMqttClient) Sub(topic string, qos byte, handler mqtt.MessageHandler) {
if token := imc.Client.Subscribe(topic, qos, handler); token.Wait() && token.Error() != nil {
logrus.Infof(token.Error().Error())
}
}
// 取消订阅
func (imc *IothubMqttClient) UnSub(topic string) {
if token := imc.Client.Unsubscribe(topic); token.Wait() && token.Error() != nil {
logrus.Infof(token.Error().Error())
}
}
// 发布
func (imc *IothubMqttClient) Pub(topic string, qos byte, payload string) error {
token := imc.Client.Publish(topic, qos, false, payload)
if token.WaitTimeout(2*time.Second) == false {
return errors.New("推送消息超时")
}
if token.Error() != nil {
return token.Error()
}
return nil
}

View File

@@ -0,0 +1,106 @@
package mqttclient
import (
"encoding/json"
"errors"
"fmt"
mqtt "github.com/eclipse/paho.mqtt.golang"
"math/rand"
"time"
)
const (
RpcRespTopic = `v1/devices/me/rpc/response/%d`
RpcReqTopic = `v1/devices/me/rpc/request/%d`
)
type RpcRequest struct {
Client *IothubMqttClient
RequestId int
Mode string //单向、双向 单项只发送不等待响应 双向需要等到响应
Timeout int // 设置双向时,等待的超时时间
}
type RpcPayload struct {
Method string `json:"method"`
Params any `json:"params"`
}
// RequestCmd 下发指令
func (rpc RpcRequest) RequestCmd(rpcPayload RpcPayload) (respPayload string, err error) {
topic := fmt.Sprintf(RpcReqTopic, rpc.RequestId)
payload, err := json.Marshal(rpcPayload)
if err != nil {
return "", err
}
err = rpc.Client.Pub(topic, 0, string(payload))
if err != nil {
return "", err
}
if rpc.Mode == "single" {
return "", nil
}
// 双向才会执行
repsChan := make(chan string)
respTopic := fmt.Sprintf(RpcRespTopic, rpc.RequestId)
// 订阅回调
mqMessagePubHandler := func(client mqtt.Client, msg mqtt.Message) {
if repsChan != nil {
repsChan <- string(msg.Payload())
}
}
rpc.Client.Sub(respTopic, 0, mqMessagePubHandler)
if rpc.Timeout == 0 {
rpc.Timeout = 30
}
defer func() {
close(repsChan)
rpc.Client.UnSub(respTopic)
}()
for {
select {
case <-time.After(time.Duration(rpc.Timeout) * time.Second):
return "", errors.New("设备指令响应超时")
case resp := <-repsChan:
return resp, nil
}
}
}
// RequestAttributes rpc 下发属性
func (rpc RpcRequest) RequestAttributes(rpcPayload RpcPayload) error {
topic := fmt.Sprintf(RpcReqTopic, rpc.RequestId)
if rpcPayload.Method == "" {
rpcPayload.Method = "setAttributes"
}
payload, err := json.Marshal(rpcPayload)
if err != nil {
return err
}
return rpc.Client.Pub(topic, 0, string(payload))
}
// 响应数据处理
/*var mqMessagePubHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
//log.Println(fmt.Sprintf("Received message: %s from topic: %s\n", msg.Payload(), msg.Topic()))
}*/
// RespondTpc 处理设备端请求服务端方法
func (rpc RpcRequest) RespondTpc(reqPayload RpcPayload) error {
topic := fmt.Sprintf(RpcRespTopic, rpc.RequestId)
//TODO 此处处理设备的请求参数逻辑
//自己定义请求逻辑
if reqPayload.Params == "getCurrentTime" {
unix := time.Now().Unix()
msg := fmt.Sprintf("%d", unix)
return rpc.Client.Pub(topic, 0, msg)
}
// 获取属性 ...
return nil
}
func (rpc *RpcRequest) GetRequestId() {
rand.Seed(time.Now().UnixNano())
// 生成随机整数
rpc.RequestId = rand.Intn(10000) + 1 // 生成0到99之间的随机整数
}

View File

@@ -0,0 +1,39 @@
package tcpclient
import (
"encoding/hex"
"net"
"pandax/pkg/global"
)
var TcpClient = make(map[string]*net.TCPConn)
func Send(deviceId, msg string) error {
if conn, ok := TcpClient[deviceId]; ok {
global.Log.Infof("设备%s, 发送指令%s", deviceId, msg)
_, err := conn.Write([]byte(msg))
if err != nil {
return err
}
} else {
global.Log.Infof("设备%s TCP连接不存在, 发送指令失败", deviceId)
}
return nil
}
func SendHex(deviceId, msg string) error {
if conn, ok := TcpClient[deviceId]; ok {
global.Log.Infof("设备%s, 发送指令%s", deviceId, msg)
b, err := hex.DecodeString(msg)
if err != nil {
return err
}
_, err = conn.Write(b)
if err != nil {
return err
}
} else {
global.Log.Infof("设备%s TCP连接不存在, 发送指令失败", deviceId)
}
return nil
}

View File

@@ -4,11 +4,11 @@ import (
"context"
"encoding/json"
"fmt"
"pandax/iothub/client/mqttclient"
"pandax/iothub/hook_message_work"
"pandax/iothub/netbase"
exhook2 "pandax/iothub/server/emqxserver/protobuf"
"pandax/pkg/global"
"pandax/pkg/mqtt"
"pandax/pkg/rule_engine/message"
"pandax/pkg/tool"
"time"
@@ -33,7 +33,7 @@ func InitEmqxHook(addr string, hs *hook_message_work.HookService) {
global.Log.Infof("IOTHUB HOOK Start SUCCESS,Grpc Server listen: %s", addr)
}
// 初始化 MQTT客户端
global.MqttClient = mqtt.InitMqtt(global.Conf.Mqtt.Broker, global.Conf.Mqtt.Username, global.Conf.Mqtt.Password)
mqttclient.InitMqtt(global.Conf.Mqtt.Broker, global.Conf.Mqtt.Username, global.Conf.Mqtt.Password)
}
func NewHookGrpcService(hs *hook_message_work.HookService) *HookGrpcService {
@@ -83,7 +83,7 @@ func (s *HookGrpcService) OnClientConnack(ctx context.Context, in *exhook2.Clien
func (s *HookGrpcService) OnClientConnected(ctx context.Context, in *exhook2.ClientConnectedRequest) (*exhook2.EmptySuccess, error) {
global.Log.Info(fmt.Sprintf("Client %s Connected ", in.Clientinfo.GetNode()))
if in.Clientinfo.Clientid == mqtt.DefaultDownStreamClientId {
if in.Clientinfo.Clientid == mqttclient.DefaultDownStreamClientId {
return &exhook2.EmptySuccess{}, nil
}
token := netbase.GetUserName(in.Clientinfo)
@@ -97,7 +97,7 @@ func (s *HookGrpcService) OnClientConnected(ctx context.Context, in *exhook2.Cli
func (s *HookGrpcService) OnClientDisconnected(ctx context.Context, in *exhook2.ClientDisconnectedRequest) (*exhook2.EmptySuccess, error) {
global.Log.Info(fmt.Sprintf("%s断开连接", in.Clientinfo.Username))
token := netbase.GetUserName(in.Clientinfo)
if in.Clientinfo.Clientid == mqtt.DefaultDownStreamClientId {
if in.Clientinfo.Clientid == mqttclient.DefaultDownStreamClientId {
return &exhook2.EmptySuccess{}, nil
}
etoken := &tool.DeviceAuth{}
@@ -178,7 +178,7 @@ func (s *HookGrpcService) OnMessagePublish(ctx context.Context, in *exhook2.Mess
res.Type = exhook2.ValuedResponse_STOP_AND_RETURN
res.Value = &exhook2.ValuedResponse_BoolResult{BoolResult: false}
if in.Message.From == mqtt.DefaultDownStreamClientId {
if in.Message.From == mqttclient.DefaultDownStreamClientId {
res.Value = &exhook2.ValuedResponse_BoolResult{BoolResult: true}
return res, nil
}

View File

@@ -6,6 +6,7 @@ import (
"fmt"
"github.com/emicklei/go-restful/v3"
"io"
"log"
"net"
"net/http"
"pandax/iothub/hook_message_work"
@@ -41,6 +42,7 @@ func InitHttpHook(addr string, hs *hook_message_work.HookService) {
switch state {
case http.StateHijacked, http.StateClosed:
etoken, _ := activeConnections.Load(conn.RemoteAddr())
log.Println("关闭http连接")
data := netbase.CreateConnectionInfo(message.DisConnectMes, "http", conn.RemoteAddr().String(), conn.RemoteAddr().String(), etoken.(*tool.DeviceAuth))
service.HookService.MessageCh <- data
activeConnections.Delete(conn.RemoteAddr())

View File

@@ -5,6 +5,7 @@ import (
"encoding/hex"
"fmt"
"net"
"pandax/iothub/client/tcpclient"
"pandax/iothub/hook_message_work"
"pandax/iothub/netbase"
"pandax/pkg/global"
@@ -34,7 +35,7 @@ func InitTcpHook(addr string, hs *hook_message_work.HookService) {
global.Log.Error("Error accepting connection:", err)
continue
}
conn.SetReadDeadline(time.Now().Add(2 * time.Minute))
//conn.SetReadDeadline(time.Now().Add(2 * time.Minute))
hhs := &HookTcpService{
HookService: hs,
conn: conn,
@@ -59,6 +60,7 @@ func (hhs *HookTcpService) hook() {
data := netbase.CreateConnectionInfo(message.DisConnectMes, "tcp", hhs.conn.RemoteAddr().String(), hhs.conn.RemoteAddr().String(), etoken)
hhs.HookService.MessageCh <- data
}
delete(tcpclient.TcpClient, etoken.DeviceId)
isAuth = false
return
}
@@ -68,15 +70,18 @@ func (hhs *HookTcpService) hook() {
auth := netbase.Auth(token)
// 认证成功,创建连接记录
if auth {
global.Log.Infof("TCP协议 设备%s,认证成功", etoken.DeviceId)
data := netbase.CreateConnectionInfo(message.ConnectMes, "tcp", hhs.conn.RemoteAddr().String(), hhs.conn.RemoteAddr().String(), etoken)
hhs.HookService.MessageCh <- data
isAuth = true
tcpclient.TcpClient[etoken.DeviceId] = hhs.conn
hhs.Send("success")
} else {
hhs.Send("fail")
}
} else {
hexData := hex.EncodeToString(buf[:n])
global.Log.Infof("TCP协议 设备%s, 接受消息%s", etoken.DeviceId, hexData)
ts := time.Now().Format("2006-01-02 15:04:05.000")
data := &netbase.DeviceEventInfo{
DeviceId: etoken.DeviceId,