【优化】规则链rpc请求逻辑

This commit is contained in:
PandaX
2023-10-12 20:35:43 +08:00
parent 36ad817074
commit d2e4781938
9 changed files with 394 additions and 1039 deletions

File diff suppressed because it is too large Load Diff

View File

@@ -8,14 +8,15 @@ import (
// 消息类型
const (
ConnectMes = "Connect"
DisConnectMes = "Disconnect"
RpcRequestMes = "RpcRequest"
UpEventMes = "Event"
AlarmMes = "Alarm"
RowMes = "Row"
TelemetryMes = "Telemetry"
AttributesMes = "Attributes"
ConnectMes = "Connect"
DisConnectMes = "Disconnect"
RpcRequestMes = "RpcRequestFromDevice"
RpcRequestServerMes = "RpcRequestFromServer"
UpEventMes = "Event"
AlarmMes = "Alarm"
RowMes = "Row"
TelemetryMes = "Telemetry"
AttributesMes = "Attributes"
)
// 数据类型Originator

View File

@@ -1,14 +1,14 @@
package nodes
import (
"errors"
"pandax/iothub/client/mqttclient"
"pandax/pkg/rule_engine/message"
)
type rpcRequestNode struct {
bareNode
RequestId int `json:"requestId"`
Timeout int `json:"timeout"`
Payload mqttclient.RpcPayload `json:"payload"`
}
type rpcRequestNodeFactory struct{}
@@ -26,20 +26,10 @@ func (f rpcRequestNodeFactory) Create(id string, meta Metadata) (Node, error) {
func (n *rpcRequestNode) Handle(msg *message.Message) error {
successLableNode := n.GetLinkedNode("Success")
failureLableNode := n.GetLinkedNode("Failure")
RequestId := n.RequestId
if RequestId == 0 {
RequestId = int(msg.Metadata.GetValue("requestId").(float64))
}
if msg.Msg.GetValue("method") == nil || msg.Msg.GetValue("params") == nil {
return errors.New("请求响应格式不正确")
}
var datas = mqttclient.RpcPayload{
Method: msg.Msg.GetValue("method").(string),
Params: msg.Msg.GetValue("params"),
}
var rpc = &mqttclient.RpcRequest{Client: mqttclient.MqttClient, Mode: "single"}
var rpc = &mqttclient.RpcRequest{Client: mqttclient.MqttClient, Mode: "single", Timeout: n.Timeout}
rpc.GetRequestId()
err := rpc.RespondTpc(datas)
respPayload, err := rpc.RequestCmd(n.Payload)
if err != nil {
if failureLableNode != nil {
return failureLableNode.Handle(msg)
@@ -47,6 +37,9 @@ func (n *rpcRequestNode) Handle(msg *message.Message) error {
return err
}
}
msgM := msg.Msg
msgM["payload"] = respPayload
msg.Msg = msgM
if successLableNode != nil {
return successLableNode.Handle(msg)
}

View File

@@ -0,0 +1,49 @@
package nodes
import (
"pandax/iothub/client/mqttclient"
"pandax/pkg/rule_engine/message"
)
type rpcRespondNode struct {
bareNode
RequestId int `json:"requestId"`
}
type rpcRespondFactory struct{}
func (f rpcRespondFactory) Name() string { return "RpcRespondNode" }
func (f rpcRespondFactory) Category() string { return NODE_CATEGORY_ACTION }
func (f rpcRespondFactory) Labels() []string { return []string{"Success", "Failure"} }
func (f rpcRespondFactory) Create(id string, meta Metadata) (Node, error) {
node := &rpcRespondNode{
bareNode: newBareNode(f.Name(), id, meta, f.Labels()),
}
return decodePath(meta, node)
}
func (n *rpcRespondNode) Handle(msg *message.Message) error {
successLableNode := n.GetLinkedNode("Success")
failureLableNode := n.GetLinkedNode("Failure")
RequestId := n.RequestId
if RequestId == 0 {
RequestId = int(msg.Metadata.GetValue("requestId").(float64))
}
var datas = mqttclient.RpcPayload{
Method: msg.Msg.GetValue("method").(string),
Params: msg.Msg.GetValue("params"),
}
rpc := &mqttclient.RpcRequest{Client: mqttclient.MqttClient, RequestId: RequestId}
err := rpc.RespondTpc(datas)
if err != nil {
if failureLableNode != nil {
return failureLableNode.Handle(msg)
} else {
return err
}
}
if successLableNode != nil {
return successLableNode.Handle(msg)
}
return nil
}

View File

@@ -18,6 +18,7 @@ func (f messageTypeSwitchNodeFactory) Labels() []string {
message.AttributesMes,
message.TelemetryMes,
message.RpcRequestMes,
message.RpcRequestServerMes,
message.AlarmMes,
message.UpEventMes,
message.ConnectMes,

View File

@@ -21,6 +21,7 @@ func (f switchFilterNodeFactory) Labels() []string {
message.AttributesMes,
message.TelemetryMes,
message.RpcRequestMes,
message.RpcRequestServerMes,
message.AlarmMes,
message.UpEventMes,
message.ConnectMes,

View File

@@ -28,5 +28,6 @@ func init() {
RegisterFactory(externalSendEmailNodeFactory{})
RegisterFactory(externalSendSmsNodeFactory{})
RegisterFactory(externalRuleChainNodeFactory{})
RegisterFactory(rpcRespondFactory{})
RegisterFactory(rpcRequestNodeFactory{})
}

View File

@@ -1 +1 @@
taskkill /pid 32324 -t -f
taskkill /pid 24048 -t -f

Binary file not shown.

After

Width:  |  Height:  |  Size: 170 KiB