规则链添加RPC请求,及响应处理

This commit is contained in:
XM-GO
2023-08-25 09:56:26 +08:00
parent 453deea9da
commit a7ff948cfe
7 changed files with 816 additions and 17 deletions

View File

@@ -2,6 +2,7 @@ package mqtt
import (
"encoding/json"
"errors"
"fmt"
mqtt "github.com/eclipse/paho.mqtt.golang"
"math/rand"
@@ -22,34 +23,47 @@ type RpcRequest struct {
type RpcPayload struct {
Method string `json:"method"`
Params string `json:"params"`
Params any `json:"params"`
}
func (rpc RpcRequest) RequestCmd(rpcPayload RpcPayload) error {
func (rpc RpcRequest) RequestCmd(rpcPayload RpcPayload) (respPayload string, err error) {
topic := fmt.Sprintf(RpcReqTopic, rpc.RequestId)
payload, err := json.Marshal(rpcPayload)
if err != nil {
return err
return "", err
}
err = rpc.Client.Pub(topic, 0, string(payload))
if err != nil {
return err
return "", err
}
if rpc.Mode == "single" {
return nil
return "", nil
}
// 双向才会执行
repsChan := make(chan string)
respTopic := fmt.Sprintf(RpcRespTopic, rpc.RequestId)
// 订阅回调
mqMessagePubHandler := func(client mqtt.Client, msg mqtt.Message) {
if repsChan != nil {
repsChan <- string(msg.Payload())
}
}
rpc.Client.Sub(respTopic, 0, mqMessagePubHandler)
if rpc.Timeout == 0 {
rpc.Timeout = 30
}
go func() {
defer func() {
close(repsChan)
rpc.Client.UnSub(respTopic)
}()
for {
select {
case <-time.After(time.Duration(rpc.Timeout) * time.Second):
rpc.Client.UnSub(respTopic)
return "", errors.New("设备指令响应超时")
case resp := <-repsChan:
return resp, nil
}
}()
return nil
}
}
func (rpc RpcRequest) RequestAttributes(rpcPayload RpcPayload) error {
@@ -65,19 +79,20 @@ func (rpc RpcRequest) RequestAttributes(rpcPayload RpcPayload) error {
}
// 响应数据处理
var mqMessagePubHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
/*var mqMessagePubHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
//log.Println(fmt.Sprintf("Received message: %s from topic: %s\n", msg.Payload(), msg.Topic()))
}
}*/
// RespondTpc 处理设备端请求服务端方法
func (rpc RpcRequest) RespondTpc(reqPayload RpcPayload) error {
topic := fmt.Sprintf(RpcRespTopic, rpc.RequestId)
// 此处处理设备的请求参数逻辑
if reqPayload.Params == "getCurrentTime" {
unix := time.Now().Unix()
msg := fmt.Sprintf("%d", unix)
return rpc.Client.Pub(topic, 0, msg)
}
// 获取属性 ...
return nil
}