diff --git a/apps/device/util/device_rpc.go b/apps/device/util/device_rpc.go index 582317a..646790f 100644 --- a/apps/device/util/device_rpc.go +++ b/apps/device/util/device_rpc.go @@ -32,8 +32,8 @@ func BuildRunDeviceRpc(deviceId, mode string, msgData map[string]interface{}) er code, _ := json.Marshal(dataCode) //新建规则链实体 instance, errs := rule_engine.NewRuleChainInstance(findOne.Id, code) - if len(errs) > 0 { - return errs[0] + if err != nil { + return errs } metadataVals := map[string]interface{}{ "deviceId": device.Id, diff --git a/iothub/hook_message_work/hook_message_work.go b/iothub/hook_message_work/hook_message_work.go index 48b67db..87f56c3 100644 --- a/iothub/hook_message_work/hook_message_work.go +++ b/iothub/hook_message_work/hook_message_work.go @@ -4,11 +4,11 @@ import ( "context" "encoding/json" "fmt" - "github.com/PandaXGO/PandaKit/biz" "pandax/apps/device/services" ruleEntity "pandax/apps/rule/entity" ruleService "pandax/apps/rule/services" "pandax/iothub/netbase" + "github.com/PandaXGO/PandaKit/biz" "pandax/pkg/cache" "pandax/pkg/global" "pandax/pkg/global/model" @@ -21,11 +21,8 @@ import ( // 消息处理模块 func (s *HookService) MessageWork() { - for { - select { - case msg := <-s.MessageCh: - s.handleOne(msg) // 处理消息 - } + for msg := range s.MessageCh { + s.handleOne(msg) // 处理消息 } } @@ -118,12 +115,12 @@ func getRuleChainInstance(etoken *model.DeviceAuth) *rule_engine.RuleChainInstan if err != nil { return nil, err } - code, err := json.Marshal(lfData.LfData.DataCode) + code, _ := json.Marshal(lfData.LfData.DataCode) //新建规则链实体 instance, errs := rule_engine.NewRuleChainInstance(rule.Id, code) - if len(errs) > 0 { - global.Log.Error("规则链初始化失败", errs[0]) - return nil, errs[0] + if errs != nil { + global.Log.Error("规则链初始化失败", errs) + return nil, errs } return instance, nil }) @@ -158,7 +155,7 @@ func SendZtWebsocket(deviceId, message string) { "attrs": msgVals, } data, _ := json.Marshal(twinData) - for stageid, _ := range websocket.Wsp { + for stageid := range websocket.Wsp { CJNR := fmt.Sprintf(`{"MESSAGETYPE":"01","MESSAGECONTENT": %s}`, string(data)) websocket.SendMessage(CJNR, stageid) } diff --git a/pkg/initialize/event.go b/pkg/initialize/event.go index ed05c6f..dacffa3 100644 --- a/pkg/initialize/event.go +++ b/pkg/initialize/event.go @@ -33,9 +33,9 @@ func InitEvents() { return } //新建规则链实体 - instance, errs := rule_engine.NewRuleChainInstance(ruleId, code) - if len(errs) > 0 { - global.Log.Error("规则链初始化失败", errs[0]) + instance, err := rule_engine.NewRuleChainInstance(ruleId, code) + if err != nil { + global.Log.Error("规则链初始化失败", err) return } for _, product := range *list { diff --git a/pkg/rule_engine/instance.go b/pkg/rule_engine/instance.go index 4d7b034..7fa9373 100644 --- a/pkg/rule_engine/instance.go +++ b/pkg/rule_engine/instance.go @@ -2,73 +2,70 @@ package rule_engine import ( "context" - "github.com/sirupsen/logrus" - "pandax/pkg/global" + "errors" "pandax/pkg/rule_engine/manifest" "pandax/pkg/rule_engine/message" "pandax/pkg/rule_engine/nodes" + + "github.com/sirupsen/logrus" ) var ruleChainDebugData = message.NewRuleChainDebugData(100) type RuleChainInstance struct { - ruleId string - firstRuleNodeId string + ruleID string + firstRuleNodeID string nodes map[string]nodes.Node } -func NewRuleChainInstance(ruleId string, data []byte) (*RuleChainInstance, []error) { - errors := make([]error, 0) - +func NewRuleChainInstance(ruleID string, data []byte) (*RuleChainInstance, error) { manifest, err := manifest.New(data) if err != nil { - errors = append(errors, err) - logrus.WithError(err).Errorf("invalidi manifest file") - return nil, errors + logrus.WithError(err).Errorf("invalid manifest file") + return nil, err } - withManifest, errs := newInstanceWithManifest(manifest) - if len(errs) > 0 { - return nil, errs + withManifest, err := newInstanceWithManifest(manifest) + if err != nil { + return nil, err } - withManifest.ruleId = ruleId + withManifest.ruleID = ruleID return withManifest, nil } -// newWithManifest create rule chain by user's manifest file -func newInstanceWithManifest(m *manifest.Manifest) (*RuleChainInstance, []error) { - errs := make([]error, 0) +func newInstanceWithManifest(m *manifest.Manifest) (*RuleChainInstance, error) { nodes, err := nodes.GetNodes(m) if err != nil { - errs = append(errs, err) - return nil, errs + return nil, err } r := &RuleChainInstance{ - firstRuleNodeId: m.FirstRuleNodeId, + firstRuleNodeID: m.FirstRuleNodeID, nodes: nodes, } - return r, errs + return r, nil } -// StartRuleChain TODO 是否需要添加context -func (c *RuleChainInstance) StartRuleChain(context context.Context, message *message.Message) error { - // 处理debug的通道消息 +func (c *RuleChainInstance) StartRuleChain(ctx context.Context, msg *message.Message) error { + debugChan := make(chan *message.DebugMessage, 100) + endDebugChan := make(chan struct{}) + go func() { for { select { - case debugMsg := <-message.DeBugChan: - // 保存到tdengine时序数据库中 - ruleChainDebugData.Add(c.ruleId, debugMsg.NodeId, debugMsg) - case <-message.EndDeBugChan: - global.Log.Debug("规则链%s,执行结束", message.Id) + case debugMsg := <-debugChan: + ruleChainDebugData.Add(c.ruleID, debugMsg.NodeID, debugMsg) + case <-endDebugChan: + logrus.Debugf("规则链%s,执行结束", msg.ID) return } } - }() - if node, found := c.nodes[c.firstRuleNodeId]; found { - err := node.Handle(message) - message.EndDeBugChan <- struct{}{} - return err + + node, found := c.nodes[c.firstRuleNodeID] + if !found { + return errors.New("first rule node not found") } - return nil -} + + err := node.Handle(msg) + endDebugChan <- struct{}{} + return err +} \ No newline at end of file diff --git a/pkg/rule_engine/instance_test.go b/pkg/rule_engine/instance_test.go index 46f6438..1f55aa0 100644 --- a/pkg/rule_engine/instance_test.go +++ b/pkg/rule_engine/instance_test.go @@ -2,20 +2,20 @@ package rule_engine import ( "context" - "io/ioutil" + "os" "pandax/pkg/rule_engine/message" "pandax/pkg/rule_engine/nodes" "testing" ) func TestNewRuleChainInstance(t *testing.T) { - buf, err := ioutil.ReadFile("./manifest/manifest_sample.json") + buf, err := os.ReadFile("./manifest/manifest_sample.json") if err != nil { t.Error(err) } - instance, errs := NewRuleChainInstance("11", buf) - if len(errs) > 0 { - t.Error(errs[0]) + instance, err := NewRuleChainInstance("11", buf) + if err != nil { + t.Error(err) } metadata := message.Metadata{