Files
PandaX/pkg/mqtt/rpc.go
2023-08-22 15:17:14 +08:00

89 lines
2.1 KiB
Go

package mqtt
import (
"encoding/json"
"fmt"
mqtt "github.com/eclipse/paho.mqtt.golang"
"math/rand"
"time"
)
const (
RpcRespTopic = `v1/devices/me/rpc/response/%d`
RpcReqTopic = `v1/devices/me/rpc/request/%d`
)
type RpcRequest struct {
Client *IothubMqttClient
RequestId int
Mode string //单向、双向 单项只发送不等待响应 双向需要等到响应
Timeout int // 设置双向时,等待的超时时间
}
type RpcPayload struct {
Method string `json:"method"`
Params string `json:"params"`
}
func (rpc RpcRequest) RequestCmd(rpcPayload RpcPayload) error {
topic := fmt.Sprintf(RpcReqTopic, rpc.RequestId)
payload, err := json.Marshal(rpcPayload)
if err != nil {
return err
}
err = rpc.Client.Pub(topic, 0, string(payload))
if err != nil {
return err
}
if rpc.Mode == "single" {
return nil
}
respTopic := fmt.Sprintf(RpcRespTopic, rpc.RequestId)
rpc.Client.Sub(respTopic, 0, mqMessagePubHandler)
if rpc.Timeout == 0 {
rpc.Timeout = 30
}
go func() {
select {
case <-time.After(time.Duration(rpc.Timeout) * time.Second):
rpc.Client.UnSub(respTopic)
}
}()
return nil
}
func (rpc RpcRequest) RequestAttributes(rpcPayload RpcPayload) error {
topic := fmt.Sprintf(RpcReqTopic, rpc.RequestId)
if rpcPayload.Method == "" {
rpcPayload.Method = "setAttributes"
}
payload, err := json.Marshal(rpcPayload)
if err != nil {
return err
}
return rpc.Client.Pub(topic, 0, string(payload))
}
// 响应数据处理
var mqMessagePubHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
//log.Println(fmt.Sprintf("Received message: %s from topic: %s\n", msg.Payload(), msg.Topic()))
}
// RespondTpc 处理设备端请求服务端方法
func (rpc RpcRequest) RespondTpc(reqPayload RpcPayload) error {
topic := fmt.Sprintf(RpcRespTopic, rpc.RequestId)
if reqPayload.Params == "getCurrentTime" {
unix := time.Now().Unix()
msg := fmt.Sprintf("%d", unix)
return rpc.Client.Pub(topic, 0, msg)
}
return nil
}
func (rpc *RpcRequest) GetRequestId() {
rand.Seed(time.Now().UnixNano())
// 生成随机整数
rpc.RequestId = rand.Intn(10000) + 1 // 生成0到99之间的随机整数
}