diff --git a/iothub/hook_message_work.go b/iothub/hook_message_work.go index dc5c8b8..d80e80c 100644 --- a/iothub/hook_message_work.go +++ b/iothub/hook_message_work.go @@ -7,12 +7,10 @@ import ( "pandax/apps/device/services" ruleEntity "pandax/apps/rule/entity" "pandax/pkg/global" - "pandax/pkg/mqtt" "pandax/pkg/rule_engine" "pandax/pkg/rule_engine/message" "pandax/pkg/tool" "pandax/pkg/websocket" - "strconv" ) // 消息处理模块 @@ -40,7 +38,9 @@ func (s *HookService) handleOne(msg *DeviceEventInfo) { switch msg.Type { case message.RowMes, message.AttributesMes, message.TelemetryMes: // 发送websocket到云组态 - go SendZtWebsocket(msg.DeviceId, msg.Datas) + if msg.Type == message.TelemetryMes { + go SendZtWebsocket(msg.DeviceId, msg.Datas) + } // 业务逻辑执行 // 获取规则链代码 chain := getRuleChain(etoken) @@ -59,24 +59,20 @@ func (s *HookService) handleOne(msg *DeviceEventInfo) { } // Rpc请求 case message.RpcRequestMes: - var datas = mqtt.RpcPayload{} - err := json.Unmarshal([]byte(msg.Datas), &datas) - if err != nil { - global.Log.Error("RPC请求数据解析错误,请检查数据格式") + chain := getRuleChain(etoken) + dataCode := chain.LfData.DataCode + code, err := json.Marshal(dataCode) + //新建规则链实体 + instance, errs := rule_engine.NewRuleChainInstance(code) + if len(errs) > 0 { + global.Log.Error("规则链初始化失败", errs[0]) return } - RequestId, err := strconv.Atoi(msg.RequestId) + ruleMessage := buildRuleMessage(etoken, msg, msg.Type) + err = instance.StartRuleChain(context.Background(), ruleMessage) if err != nil { - global.Log.Error("RPC请求请求iD非整型") - return + global.Log.Error("规则链执行失败", errs) } - var rpc = &mqtt.RpcRequest{Client: global.MqttClient, RequestId: RequestId} - err = rpc.RespondTpc(datas) - if err != nil { - global.Log.Error("处理响应失败") - return - } - //services.DeviceCmdLogModelDao.UpdateResp(id, content, state) case message.DisConnectMes, message.ConnectMes: // 更改设备在线状态 if msg.Type == message.ConnectMes {