mirror of
https://gitee.com/XM-GO/PandaX.git
synced 2026-04-23 02:48:34 +08:00
[优化] 规则实体影响的处理时间
This commit is contained in:
@@ -54,23 +54,15 @@ func (s *HookService) handleOne(msg *netbase.DeviceEventInfo) {
|
||||
if msg.Type == message.TelemetryMes {
|
||||
go SendZtWebsocket(msg.DeviceId, msg.Datas)
|
||||
}
|
||||
// 获取规则链代码
|
||||
chain := getRuleChain(msg.DeviceAuth)
|
||||
if chain == nil {
|
||||
return
|
||||
}
|
||||
dataCode := chain.LfData.DataCode
|
||||
code, err := json.Marshal(dataCode)
|
||||
//新建规则链实体
|
||||
instance, errs := rule_engine.NewRuleChainInstance(chain.Id, code)
|
||||
if len(errs) > 0 {
|
||||
global.Log.Error("规则链初始化失败", errs[0])
|
||||
// 获取规则链代码实体
|
||||
instance := getRuleChainInstance(msg.DeviceAuth)
|
||||
if instance == nil {
|
||||
return
|
||||
}
|
||||
ruleMessage := buildRuleMessage(msg.DeviceAuth, msgVals, msg.Type)
|
||||
err = instance.StartRuleChain(context.Background(), ruleMessage)
|
||||
if err != nil {
|
||||
global.Log.Error("规则链执行失败", errs)
|
||||
global.Log.Error("规则链执行失败", err)
|
||||
}
|
||||
// 保存设备影子
|
||||
if msg.Type != message.RpcRequestFromDevice {
|
||||
@@ -105,15 +97,16 @@ func (s *HookService) handleOne(msg *netbase.DeviceEventInfo) {
|
||||
}()
|
||||
}
|
||||
|
||||
// 根据产品Id从缓存中获取规则链,没有就查询
|
||||
func getRuleChain(etoken *global_model.DeviceAuth) *ruleEntity.RuleDataJson {
|
||||
// 获取规则实体
|
||||
func getRuleChainInstance(etoken *global_model.DeviceAuth) *rule_engine.RuleChainInstance {
|
||||
defer func() {
|
||||
if err := recover(); err != nil {
|
||||
global.Log.Error(err)
|
||||
}
|
||||
}()
|
||||
|
||||
key := etoken.ProductId
|
||||
get, err := cache.ComputeIfAbsentProductRule(key, func(k any) (any, error) {
|
||||
instance, err := cache.ComputeIfAbsentProductRule(key, func(k any) (any, error) {
|
||||
one := services.ProductModelDao.FindOne(k.(string))
|
||||
rule := ruleService.RuleChainModelDao.FindOne(one.RuleChainId)
|
||||
var lfData ruleEntity.RuleDataJson
|
||||
@@ -121,14 +114,19 @@ func getRuleChain(etoken *global_model.DeviceAuth) *ruleEntity.RuleDataJson {
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
lfData.Id = rule.Id
|
||||
return lfData, nil
|
||||
code, err := json.Marshal(lfData.LfData.DataCode)
|
||||
//新建规则链实体
|
||||
instance, errs := rule_engine.NewRuleChainInstance(rule.Id, code)
|
||||
if len(errs) > 0 {
|
||||
global.Log.Error("规则链初始化失败", errs[0])
|
||||
return nil, errs[0]
|
||||
}
|
||||
return instance, nil
|
||||
})
|
||||
biz.ErrIsNil(err, "缓存读取规则链失败")
|
||||
if ruleData, ok := get.(ruleEntity.RuleDataJson); ok {
|
||||
return &ruleData
|
||||
if ruleData, ok := instance.(*rule_engine.RuleChainInstance); ok {
|
||||
return ruleData
|
||||
}
|
||||
biz.ErrIsNil(err, "规则链数据转化失败")
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@@ -1,12 +1,14 @@
|
||||
package initialize
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"pandax/apps/device/entity"
|
||||
"pandax/apps/device/services"
|
||||
ruleEntity "pandax/apps/rule/entity"
|
||||
"pandax/pkg/cache"
|
||||
"pandax/pkg/events"
|
||||
"pandax/pkg/global"
|
||||
"pandax/pkg/rule_engine"
|
||||
"pandax/pkg/tool"
|
||||
)
|
||||
|
||||
@@ -20,10 +22,24 @@ func InitEvents() {
|
||||
})
|
||||
if list != nil {
|
||||
var lfData ruleEntity.RuleDataJson
|
||||
tool.StringToStruct(codeData, &lfData)
|
||||
lfData.Id = ruleId
|
||||
err := tool.StringToStruct(codeData, &lfData)
|
||||
if err != nil {
|
||||
global.Log.Error("规则链序列化失败", err)
|
||||
return
|
||||
}
|
||||
code, err := json.Marshal(lfData.LfData.DataCode)
|
||||
if err != nil {
|
||||
global.Log.Error("规则链序列化失败", err)
|
||||
return
|
||||
}
|
||||
//新建规则链实体
|
||||
instance, errs := rule_engine.NewRuleChainInstance(ruleId, code)
|
||||
if len(errs) > 0 {
|
||||
global.Log.Error("规则链初始化失败", errs[0])
|
||||
return
|
||||
}
|
||||
for _, product := range *list {
|
||||
cache.PutProductRule(product.Id, lfData)
|
||||
cache.PutProductRule(product.Id, instance)
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
@@ -11,13 +11,13 @@ import (
|
||||
|
||||
var ruleChainDebugData = message.NewRuleChainDebugData(100)
|
||||
|
||||
type ruleChainInstance struct {
|
||||
type RuleChainInstance struct {
|
||||
ruleId string
|
||||
firstRuleNodeId string
|
||||
nodes map[string]nodes.Node
|
||||
}
|
||||
|
||||
func NewRuleChainInstance(ruleId string, data []byte) (*ruleChainInstance, []error) {
|
||||
func NewRuleChainInstance(ruleId string, data []byte) (*RuleChainInstance, []error) {
|
||||
errors := make([]error, 0)
|
||||
|
||||
manifest, err := manifest.New(data)
|
||||
@@ -35,14 +35,14 @@ func NewRuleChainInstance(ruleId string, data []byte) (*ruleChainInstance, []err
|
||||
}
|
||||
|
||||
// newWithManifest create rule chain by user's manifest file
|
||||
func newInstanceWithManifest(m *manifest.Manifest) (*ruleChainInstance, []error) {
|
||||
func newInstanceWithManifest(m *manifest.Manifest) (*RuleChainInstance, []error) {
|
||||
errs := make([]error, 0)
|
||||
nodes, err := nodes.GetNodes(m)
|
||||
if err != nil {
|
||||
errs = append(errs, err)
|
||||
return nil, errs
|
||||
}
|
||||
r := &ruleChainInstance{
|
||||
r := &RuleChainInstance{
|
||||
firstRuleNodeId: m.FirstRuleNodeId,
|
||||
nodes: nodes,
|
||||
}
|
||||
@@ -50,7 +50,7 @@ func newInstanceWithManifest(m *manifest.Manifest) (*ruleChainInstance, []error)
|
||||
}
|
||||
|
||||
// StartRuleChain TODO 是否需要添加context
|
||||
func (c *ruleChainInstance) StartRuleChain(context context.Context, message *message.Message) error {
|
||||
func (c *RuleChainInstance) StartRuleChain(context context.Context, message *message.Message) error {
|
||||
// 处理debug的通道消息
|
||||
go func() {
|
||||
for {
|
||||
|
||||
Reference in New Issue
Block a user