Files
PandaX/iothub/hook_message_work.go
2023-09-01 16:47:22 +08:00

134 lines
3.6 KiB
Go

package iothub
import (
"context"
"encoding/json"
"fmt"
"pandax/apps/device/services"
ruleEntity "pandax/apps/rule/entity"
"pandax/pkg/global"
"pandax/pkg/rule_engine"
"pandax/pkg/rule_engine/message"
"pandax/pkg/tool"
"pandax/pkg/websocket"
)
// 消息处理模块
func (s *HookService) MessageWork() {
for {
select {
case msg := <-s.messageCh:
s.handleOne(msg) // 处理消息
}
}
}
func (s *HookService) handleOne(msg *DeviceEventInfo) {
if s.ch != nil { // 用于并发限制
s.ch <- struct{}{}
}
s.wg.Add(1)
go func() {
defer s.wg.Done()
etoken := &tool.DeviceAuth{}
err := global.RedisDb.Get(msg.DeviceId, etoken)
if err != nil {
return
}
switch msg.Type {
case message.RowMes, message.AttributesMes, message.TelemetryMes:
// 发送websocket到云组态
if msg.Type == message.TelemetryMes {
go SendZtWebsocket(msg.DeviceId, msg.Datas)
}
// 业务逻辑执行
// 获取规则链代码
chain := getRuleChain(etoken)
dataCode := chain.LfData.DataCode
code, err := json.Marshal(dataCode)
//新建规则链实体
instance, errs := rule_engine.NewRuleChainInstance(code)
if len(errs) > 0 {
global.Log.Error("规则链初始化失败", errs[0])
return
}
ruleMessage := buildRuleMessage(etoken, msg, msg.Type)
err = instance.StartRuleChain(context.Background(), ruleMessage)
if err != nil {
global.Log.Error("规则链执行失败", errs)
}
// Rpc请求
case message.RpcRequestMes:
chain := getRuleChain(etoken)
dataCode := chain.LfData.DataCode
code, err := json.Marshal(dataCode)
//新建规则链实体
instance, errs := rule_engine.NewRuleChainInstance(code)
if len(errs) > 0 {
global.Log.Error("规则链初始化失败", errs[0])
return
}
ruleMessage := buildRuleMessage(etoken, msg, msg.Type)
err = instance.StartRuleChain(context.Background(), ruleMessage)
if err != nil {
global.Log.Error("规则链执行失败", errs)
}
case message.DisConnectMes, message.ConnectMes:
// 更改设备在线状态
if msg.Type == message.ConnectMes {
services.DeviceModelDao.UpdateStatus(msg.DeviceId, global.ONLINE)
} else {
services.DeviceModelDao.UpdateStatus(msg.DeviceId, global.OFFLINE)
}
// 添加设备连接历史
data := make(map[string]any)
err := json.Unmarshal([]byte(msg.Datas), &data)
if err != nil {
global.Log.Error("设备连接数据格式解析错误")
return
}
err = global.TdDb.InsertEvent(data)
if err != nil {
global.Log.Error("连接事件数据添加错误", err)
}
}
}()
}
func getRuleChain(etoken *tool.DeviceAuth) *ruleEntity.RuleDataJson {
ruleData := ruleEntity.RuleDataJson{}
err := global.RedisDb.Get(etoken.ProductId, &ruleData)
if err != nil {
return nil
}
return &ruleData
}
func buildRuleMessage(etoken *tool.DeviceAuth, dei *DeviceEventInfo, msgType string) *message.Message {
metadataVals := map[string]interface{}{
"deviceId": etoken.DeviceId,
"deviceName": etoken.Name,
"deviceType": etoken.DeviceType,
"productId": etoken.ProductId,
}
msgVals := make(map[string]interface{})
json.Unmarshal([]byte(dei.Datas), &msgVals)
return message.NewMessage(etoken.User, msgType, msgVals, metadataVals)
}
func SendZtWebsocket(deviceId, message string) {
msgVals := make(map[string]interface{})
if err := json.Unmarshal([]byte(message), &msgVals); err != nil {
return
}
twinData := map[string]interface{}{
"twinId": deviceId,
"attrs": msgVals,
}
data, _ := json.Marshal(twinData)
for stageid, _ := range websocket.Wsp {
CJNR := fmt.Sprintf(`{"MESSAGETYPE":"01","MESSAGECONTENT": %s}`, string(data))
websocket.SendMessage(CJNR, stageid)
}
}