mirror of
https://gitee.com/XM-GO/PandaX.git
synced 2026-04-23 02:48:34 +08:00
180 lines
5.0 KiB
Go
180 lines
5.0 KiB
Go
package hook_message_work
|
|
|
|
import (
|
|
"encoding/json"
|
|
"fmt"
|
|
"pandax/apps/device/services"
|
|
ruleEntity "pandax/apps/rule/entity"
|
|
ruleService "pandax/apps/rule/services"
|
|
"pandax/iothub/netbase"
|
|
"pandax/pkg/global"
|
|
"pandax/pkg/global/model"
|
|
"pandax/pkg/rule_engine"
|
|
"pandax/pkg/rule_engine/message"
|
|
"pandax/pkg/shadow"
|
|
"pandax/pkg/tdengine"
|
|
"pandax/pkg/tool"
|
|
"pandax/pkg/websocket"
|
|
"time"
|
|
|
|
"github.com/kakuilan/kgo"
|
|
)
|
|
|
|
// 消息处理模块
|
|
func (s *HookService) MessageWork() {
|
|
for msg := range s.MessageCh {
|
|
s.handleOne(msg) // 处理消息
|
|
}
|
|
}
|
|
|
|
func (s *HookService) handleOne(msg *netbase.DeviceEventInfo) {
|
|
if s.Ch != nil { // 用于并发限制
|
|
s.Ch <- struct{}{}
|
|
}
|
|
// 用于做优雅关闭, 主要作用是,程序关闭,将队列中的消息处理完成后在关闭,
|
|
s.Wg.Add(1)
|
|
go func() {
|
|
defer func() {
|
|
s.Wg.Done()
|
|
if s.Ch != nil { // 用于并发限制
|
|
<-s.Ch
|
|
}
|
|
}()
|
|
switch msg.Type {
|
|
case message.RowMes, message.AttributesMes, message.TelemetryMes, message.RpcRequestFromDevice, message.UpEventMes:
|
|
msgVals := make(map[string]interface{})
|
|
err := json.Unmarshal([]byte(msg.Datas), &msgVals)
|
|
if err != nil {
|
|
global.Log.Error("数据结构解析错误", err)
|
|
return
|
|
}
|
|
// 发送websocket到云组态
|
|
if msg.Type == message.TelemetryMes {
|
|
go SendZtWebsocket(msg.DeviceId, msg.Datas)
|
|
}
|
|
// 获取规则链代码实体
|
|
instance, err := getRuleChainInstance(msg.DeviceAuth)
|
|
if err != nil {
|
|
global.Log.Error("获取设备实体失败", err)
|
|
return
|
|
}
|
|
ruleMessage := buildRuleMessage(msg.DeviceAuth, msgVals, msg.Type)
|
|
err = rule_engine.RuleEngine.StartRuleInstance(instance, ruleMessage)
|
|
if err != nil {
|
|
global.Log.Error("规则链执行失败", err)
|
|
return
|
|
}
|
|
// 保存事件信息
|
|
if msg.Type == message.UpEventMes {
|
|
tsl, err := services.ProductTemplateModelDao.FindOneByKey(msg.DeviceId, msg.Identifier)
|
|
if err != nil {
|
|
return
|
|
}
|
|
ci := &tdengine.Events{
|
|
DeviceId: msg.DeviceId,
|
|
Name: msg.Identifier,
|
|
Type: tsl.Type,
|
|
Content: msg.Datas,
|
|
Ts: time.Now().Format("2006-01-02 15:04:05.000"),
|
|
}
|
|
data, err := kgo.KConv.Struct2Map(ci, "")
|
|
if err != nil {
|
|
global.Log.Error("事件格式转化错误")
|
|
return
|
|
}
|
|
err = global.TdDb.InsertEvent(data)
|
|
if err != nil {
|
|
global.Log.Error("事件添加错误", err)
|
|
}
|
|
}
|
|
// 刷新设备状态
|
|
shadow.DeviceShadowInstance.RefreshDeviceStatus(msg.DeviceAuth.Name)
|
|
case message.DisConnectMes, message.ConnectMes:
|
|
// 更改设备在线状态
|
|
isHas := shadow.DeviceShadowInstance.HasDevice(msg.DeviceAuth.Name)
|
|
if !isHas {
|
|
shadow.InitDeviceShadow(msg.DeviceAuth.Name)
|
|
}
|
|
if msg.Type == message.ConnectMes {
|
|
shadow.DeviceShadowInstance.SetOnline(msg.DeviceAuth.Name)
|
|
|
|
} else {
|
|
shadow.DeviceShadowInstance.SetOnline(msg.DeviceAuth.Name)
|
|
}
|
|
// 添加设备连接历史
|
|
data := make(map[string]any)
|
|
err := json.Unmarshal([]byte(msg.Datas), &data)
|
|
if err != nil {
|
|
global.Log.Error("设备连接数据格式解析错误")
|
|
return
|
|
}
|
|
err = global.TdDb.InsertEvent(data)
|
|
if err != nil {
|
|
global.Log.Error("连接事件数据添加错误", err)
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
// 获取规则实体
|
|
func getRuleChainInstance(etoken *model.DeviceAuth) (*rule_engine.RuleChainInstance, error) {
|
|
key := etoken.ProductId
|
|
instance := rule_engine.RuleEngine.GetRuleInstance(key)
|
|
if instance == nil {
|
|
one, err := services.ProductModelDao.FindOne(key)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
rule, err := ruleService.RuleChainModelDao.FindOne(one.RuleChainId)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
var lfData ruleEntity.RuleDataJson
|
|
err = tool.StringToStruct(rule.RuleDataJson, &lfData)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
code, _ := json.Marshal(lfData.DataCode)
|
|
//新建规则链实体
|
|
instance, err = rule_engine.NewRuleChainInstance(rule.Id, code)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
_, err = rule_engine.RuleEngine.SaveRuleInstance(key, instance)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
return instance, nil
|
|
}
|
|
|
|
// 构建规则链执行的消息
|
|
func buildRuleMessage(etoken *model.DeviceAuth, msgVals map[string]interface{}, msgType string) *message.Message {
|
|
metadataVals := map[string]interface{}{
|
|
"deviceId": etoken.DeviceId,
|
|
"deviceName": etoken.Name,
|
|
"deviceType": etoken.DeviceType,
|
|
"deviceProtocol": etoken.Protocol,
|
|
"productId": etoken.ProductId,
|
|
"orgId": etoken.OrgId,
|
|
"owner": etoken.Owner,
|
|
}
|
|
return message.NewMessage(etoken.Owner, msgType, msgVals, metadataVals)
|
|
}
|
|
|
|
func SendZtWebsocket(deviceId, message string) {
|
|
msgVals := make(map[string]interface{})
|
|
if err := json.Unmarshal([]byte(message), &msgVals); err != nil {
|
|
return
|
|
}
|
|
twinData := map[string]interface{}{
|
|
"twinId": deviceId,
|
|
"attrs": msgVals,
|
|
}
|
|
data, _ := json.Marshal(twinData)
|
|
for stageid := range websocket.Wsp {
|
|
CJNR := fmt.Sprintf(`{"MESSAGETYPE":"01","MESSAGECONTENT": %s}`, string(data))
|
|
websocket.SendMessage(CJNR, stageid)
|
|
}
|
|
}
|