[优化]mqtt客户端采用emqx接口,根据设备订阅的me topic进行返回

This commit is contained in:
PandaX
2023-10-25 10:14:08 +08:00
parent 51b95d4b3f
commit 2bf058afb0
8 changed files with 902 additions and 708 deletions

View File

@@ -52,8 +52,9 @@ func (n *rpcRequestFromDeviceNode) Handle(msg *message.Message) error {
if msg.Metadata.GetValue("deviceProtocol") != nil && msg.Metadata.GetValue("deviceProtocol").(string) != "" {
deviceProtocol = msg.Metadata.GetValue("deviceProtocol").(string)
}
deviceId := msg.Metadata.GetValue("deviceId").(string)
if deviceProtocol == global.MQTTProtocol {
rpc := &mqttclient.RpcRequest{Client: mqttclient.MqttClient}
rpc := &mqttclient.RpcRequest{}
RequestId := n.RequestId
if RequestId == 0 {
if msg.Metadata.GetValue("requestId") == nil {
@@ -64,10 +65,9 @@ func (n *rpcRequestFromDeviceNode) Handle(msg *message.Message) error {
} else {
rpc.RequestId = RequestId
}
err = rpc.Pub(result)
err = rpc.Pub(deviceId, result)
}
if deviceProtocol == global.TCPProtocol {
deviceId := msg.Metadata.GetValue("deviceId").(string)
err = tcpclient.Send(deviceId, result)
}
if err != nil {

View File

@@ -50,13 +50,13 @@ func (n *rpcRequestToDeviceNode) Handle(msg *message.Message) error {
deviceProtocol = msg.Metadata.GetValue("deviceProtocol").(string)
}
var err error
deviceId := msg.Metadata.GetValue("deviceId").(string)
if deviceProtocol == global.MQTTProtocol {
var rpc = &mqttclient.RpcRequest{Client: mqttclient.MqttClient, Mode: mode, Timeout: n.Timeout}
var rpc = &mqttclient.RpcRequest{Mode: mode, Timeout: n.Timeout}
rpc.GetRequestId()
_, err = rpc.RequestCmd(string(payload))
err = rpc.RequestCmd(deviceId, string(payload))
}
if deviceProtocol == global.TCPProtocol {
deviceId := msg.Metadata.GetValue("deviceId").(string)
err = tcpclient.Send(deviceId, string(payload))
}
if err != nil {