diff --git a/pkg/rule_engine/nodes/action_rpc_request_node.go b/pkg/rule_engine/nodes/action_rpc_request_node.go index 2a30264..a1f2875 100644 --- a/pkg/rule_engine/nodes/action_rpc_request_node.go +++ b/pkg/rule_engine/nodes/action_rpc_request_node.go @@ -1,14 +1,14 @@ package nodes import ( + "errors" "pandax/iothub/client/mqttclient" "pandax/pkg/rule_engine/message" ) type rpcRequestNode struct { bareNode - Timeout int `json:"timeout"` - Payload mqttclient.RpcPayload `json:"payload"` + RequestId int `json:"requestId"` } type rpcRequestNodeFactory struct{} @@ -26,10 +26,20 @@ func (f rpcRequestNodeFactory) Create(id string, meta Metadata) (Node, error) { func (n *rpcRequestNode) Handle(msg *message.Message) error { successLableNode := n.GetLinkedNode("Success") failureLableNode := n.GetLinkedNode("Failure") - - var rpc = &mqttclient.RpcRequest{Client: mqttclient.MqttClient, Mode: "double", Timeout: n.Timeout} + RequestId := n.RequestId + if RequestId == 0 { + RequestId = int(msg.Metadata.GetValue("requestId").(float64)) + } + if msg.Msg.GetValue("method") == nil || msg.Msg.GetValue("params") == nil { + return errors.New("请求响应格式不正确") + } + var datas = mqttclient.RpcPayload{ + Method: msg.Msg.GetValue("method").(string), + Params: msg.Msg.GetValue("params"), + } + var rpc = &mqttclient.RpcRequest{Client: mqttclient.MqttClient, Mode: "single"} rpc.GetRequestId() - err := rpc.RespondTpc(n.Payload) + err := rpc.RespondTpc(datas) if err != nil { if failureLableNode != nil { return failureLableNode.Handle(msg)