diff --git a/iothub/hook_message_work/hook_message_work.go b/iothub/hook_message_work/hook_message_work.go index 45ada5a..0fb0790 100644 --- a/iothub/hook_message_work/hook_message_work.go +++ b/iothub/hook_message_work/hook_message_work.go @@ -54,23 +54,15 @@ func (s *HookService) handleOne(msg *netbase.DeviceEventInfo) { if msg.Type == message.TelemetryMes { go SendZtWebsocket(msg.DeviceId, msg.Datas) } - // 获取规则链代码 - chain := getRuleChain(msg.DeviceAuth) - if chain == nil { - return - } - dataCode := chain.LfData.DataCode - code, err := json.Marshal(dataCode) - //新建规则链实体 - instance, errs := rule_engine.NewRuleChainInstance(chain.Id, code) - if len(errs) > 0 { - global.Log.Error("规则链初始化失败", errs[0]) + // 获取规则链代码实体 + instance := getRuleChainInstance(msg.DeviceAuth) + if instance == nil { return } ruleMessage := buildRuleMessage(msg.DeviceAuth, msgVals, msg.Type) err = instance.StartRuleChain(context.Background(), ruleMessage) if err != nil { - global.Log.Error("规则链执行失败", errs) + global.Log.Error("规则链执行失败", err) } // 保存设备影子 if msg.Type != message.RpcRequestFromDevice { @@ -105,15 +97,16 @@ func (s *HookService) handleOne(msg *netbase.DeviceEventInfo) { }() } -// 根据产品Id从缓存中获取规则链,没有就查询 -func getRuleChain(etoken *global_model.DeviceAuth) *ruleEntity.RuleDataJson { +// 获取规则实体 +func getRuleChainInstance(etoken *global_model.DeviceAuth) *rule_engine.RuleChainInstance { defer func() { if err := recover(); err != nil { global.Log.Error(err) } }() + key := etoken.ProductId - get, err := cache.ComputeIfAbsentProductRule(key, func(k any) (any, error) { + instance, err := cache.ComputeIfAbsentProductRule(key, func(k any) (any, error) { one := services.ProductModelDao.FindOne(k.(string)) rule := ruleService.RuleChainModelDao.FindOne(one.RuleChainId) var lfData ruleEntity.RuleDataJson @@ -121,14 +114,19 @@ func getRuleChain(etoken *global_model.DeviceAuth) *ruleEntity.RuleDataJson { if err != nil { return nil, err } - lfData.Id = rule.Id - return lfData, nil + code, err := json.Marshal(lfData.LfData.DataCode) + //新建规则链实体 + instance, errs := rule_engine.NewRuleChainInstance(rule.Id, code) + if len(errs) > 0 { + global.Log.Error("规则链初始化失败", errs[0]) + return nil, errs[0] + } + return instance, nil }) biz.ErrIsNil(err, "缓存读取规则链失败") - if ruleData, ok := get.(ruleEntity.RuleDataJson); ok { - return &ruleData + if ruleData, ok := instance.(*rule_engine.RuleChainInstance); ok { + return ruleData } - biz.ErrIsNil(err, "规则链数据转化失败") return nil } diff --git a/pkg/initialize/event.go b/pkg/initialize/event.go index 11f7a4d..513725a 100644 --- a/pkg/initialize/event.go +++ b/pkg/initialize/event.go @@ -1,12 +1,14 @@ package initialize import ( + "encoding/json" "pandax/apps/device/entity" "pandax/apps/device/services" ruleEntity "pandax/apps/rule/entity" "pandax/pkg/cache" "pandax/pkg/events" "pandax/pkg/global" + "pandax/pkg/rule_engine" "pandax/pkg/tool" ) @@ -20,10 +22,24 @@ func InitEvents() { }) if list != nil { var lfData ruleEntity.RuleDataJson - tool.StringToStruct(codeData, &lfData) - lfData.Id = ruleId + err := tool.StringToStruct(codeData, &lfData) + if err != nil { + global.Log.Error("规则链序列化失败", err) + return + } + code, err := json.Marshal(lfData.LfData.DataCode) + if err != nil { + global.Log.Error("规则链序列化失败", err) + return + } + //新建规则链实体 + instance, errs := rule_engine.NewRuleChainInstance(ruleId, code) + if len(errs) > 0 { + global.Log.Error("规则链初始化失败", errs[0]) + return + } for _, product := range *list { - cache.PutProductRule(product.Id, lfData) + cache.PutProductRule(product.Id, instance) } } }) diff --git a/pkg/rule_engine/instance.go b/pkg/rule_engine/instance.go index e93211b..4d7b034 100644 --- a/pkg/rule_engine/instance.go +++ b/pkg/rule_engine/instance.go @@ -11,13 +11,13 @@ import ( var ruleChainDebugData = message.NewRuleChainDebugData(100) -type ruleChainInstance struct { +type RuleChainInstance struct { ruleId string firstRuleNodeId string nodes map[string]nodes.Node } -func NewRuleChainInstance(ruleId string, data []byte) (*ruleChainInstance, []error) { +func NewRuleChainInstance(ruleId string, data []byte) (*RuleChainInstance, []error) { errors := make([]error, 0) manifest, err := manifest.New(data) @@ -35,14 +35,14 @@ func NewRuleChainInstance(ruleId string, data []byte) (*ruleChainInstance, []err } // newWithManifest create rule chain by user's manifest file -func newInstanceWithManifest(m *manifest.Manifest) (*ruleChainInstance, []error) { +func newInstanceWithManifest(m *manifest.Manifest) (*RuleChainInstance, []error) { errs := make([]error, 0) nodes, err := nodes.GetNodes(m) if err != nil { errs = append(errs, err) return nil, errs } - r := &ruleChainInstance{ + r := &RuleChainInstance{ firstRuleNodeId: m.FirstRuleNodeId, nodes: nodes, } @@ -50,7 +50,7 @@ func newInstanceWithManifest(m *manifest.Manifest) (*ruleChainInstance, []error) } // StartRuleChain TODO 是否需要添加context -func (c *ruleChainInstance) StartRuleChain(context context.Context, message *message.Message) error { +func (c *RuleChainInstance) StartRuleChain(context context.Context, message *message.Message) error { // 处理debug的通道消息 go func() { for {