消息处理模块重构

Signed-off-by: lixxxww <941403820@qq.com>
This commit is contained in:
lixxxww
2024-01-22 07:26:49 +00:00
committed by Gitee
parent 6a9e1b5587
commit 4502bf3ff4
5 changed files with 51 additions and 57 deletions

View File

@@ -2,73 +2,70 @@ package rule_engine
import (
"context"
"github.com/sirupsen/logrus"
"pandax/pkg/global"
"errors"
"pandax/pkg/rule_engine/manifest"
"pandax/pkg/rule_engine/message"
"pandax/pkg/rule_engine/nodes"
"github.com/sirupsen/logrus"
)
var ruleChainDebugData = message.NewRuleChainDebugData(100)
type RuleChainInstance struct {
ruleId string
firstRuleNodeId string
ruleID string
firstRuleNodeID string
nodes map[string]nodes.Node
}
func NewRuleChainInstance(ruleId string, data []byte) (*RuleChainInstance, []error) {
errors := make([]error, 0)
func NewRuleChainInstance(ruleID string, data []byte) (*RuleChainInstance, error) {
manifest, err := manifest.New(data)
if err != nil {
errors = append(errors, err)
logrus.WithError(err).Errorf("invalidi manifest file")
return nil, errors
logrus.WithError(err).Errorf("invalid manifest file")
return nil, err
}
withManifest, errs := newInstanceWithManifest(manifest)
if len(errs) > 0 {
return nil, errs
withManifest, err := newInstanceWithManifest(manifest)
if err != nil {
return nil, err
}
withManifest.ruleId = ruleId
withManifest.ruleID = ruleID
return withManifest, nil
}
// newWithManifest create rule chain by user's manifest file
func newInstanceWithManifest(m *manifest.Manifest) (*RuleChainInstance, []error) {
errs := make([]error, 0)
func newInstanceWithManifest(m *manifest.Manifest) (*RuleChainInstance, error) {
nodes, err := nodes.GetNodes(m)
if err != nil {
errs = append(errs, err)
return nil, errs
return nil, err
}
r := &RuleChainInstance{
firstRuleNodeId: m.FirstRuleNodeId,
firstRuleNodeID: m.FirstRuleNodeID,
nodes: nodes,
}
return r, errs
return r, nil
}
// StartRuleChain TODO 是否需要添加context
func (c *RuleChainInstance) StartRuleChain(context context.Context, message *message.Message) error {
// 处理debug的通道消息
func (c *RuleChainInstance) StartRuleChain(ctx context.Context, msg *message.Message) error {
debugChan := make(chan *message.DebugMessage, 100)
endDebugChan := make(chan struct{})
go func() {
for {
select {
case debugMsg := <-message.DeBugChan:
// 保存到tdengine时序数据库中
ruleChainDebugData.Add(c.ruleId, debugMsg.NodeId, debugMsg)
case <-message.EndDeBugChan:
global.Log.Debug("规则链%s,执行结束", message.Id)
case debugMsg := <-debugChan:
ruleChainDebugData.Add(c.ruleID, debugMsg.NodeID, debugMsg)
case <-endDebugChan:
logrus.Debugf("规则链%s,执行结束", msg.ID)
return
}
}
}()
if node, found := c.nodes[c.firstRuleNodeId]; found {
err := node.Handle(message)
message.EndDeBugChan <- struct{}{}
return err
node, found := c.nodes[c.firstRuleNodeID]
if !found {
return errors.New("first rule node not found")
}
return nil
}
err := node.Handle(msg)
endDebugChan <- struct{}{}
return err
}

View File

@@ -2,20 +2,20 @@ package rule_engine
import (
"context"
"io/ioutil"
"os"
"pandax/pkg/rule_engine/message"
"pandax/pkg/rule_engine/nodes"
"testing"
)
func TestNewRuleChainInstance(t *testing.T) {
buf, err := ioutil.ReadFile("./manifest/manifest_sample.json")
buf, err := os.ReadFile("./manifest/manifest_sample.json")
if err != nil {
t.Error(err)
}
instance, errs := NewRuleChainInstance("11", buf)
if len(errs) > 0 {
t.Error(errs[0])
instance, err := NewRuleChainInstance("11", buf)
if err != nil {
t.Error(err)
}
metadata := message.Metadata{