mirror of
https://gitee.com/XM-GO/PandaX.git
synced 2026-04-23 10:58:35 +08:00
94 lines
2.5 KiB
Go
94 lines
2.5 KiB
Go
package rule_engine
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"github.com/sirupsen/logrus"
|
|
"pandax/pkg/rule_engine/manifest"
|
|
"pandax/pkg/rule_engine/message"
|
|
"pandax/pkg/rule_engine/nodes"
|
|
"strings"
|
|
)
|
|
|
|
// ruleChainInstance is rulechain's runtime instance that manage all nodes in this chain,
|
|
type ruleChainInstance struct {
|
|
firstRuleNodeId string
|
|
nodes map[string]nodes.Node
|
|
}
|
|
|
|
func NewRuleChainInstance(data []byte) (*ruleChainInstance, []error) {
|
|
errors := make([]error, 0)
|
|
|
|
manifest, err := manifest.New(data)
|
|
if err != nil {
|
|
errors = append(errors, err)
|
|
logrus.WithError(err).Errorf("invalidi manifest file")
|
|
return nil, errors
|
|
}
|
|
return newInstanceWithManifest(manifest)
|
|
}
|
|
|
|
// newWithManifest create rule chain by user's manifest file
|
|
func newInstanceWithManifest(m *manifest.Manifest) (*ruleChainInstance, []error) {
|
|
errs := make([]error, 0)
|
|
r := &ruleChainInstance{
|
|
firstRuleNodeId: m.FirstRuleNodeId,
|
|
nodes: make(map[string]nodes.Node),
|
|
}
|
|
// Create All nodes
|
|
for _, n := range m.Nodes {
|
|
metadata := nodes.NewMetadataWithValues(n.Properties)
|
|
node, err := nodes.NewNode(n.Type, n.Id, metadata)
|
|
if err != nil {
|
|
errs = append(errs, err)
|
|
continue
|
|
}
|
|
if _, found := r.nodes[n.Id]; found {
|
|
err := fmt.Errorf("node '%s' already exist in rulechain", n.Id)
|
|
errs = append(errs, err)
|
|
continue
|
|
}
|
|
r.nodes[n.Id] = node
|
|
}
|
|
for _, edge := range m.Edges {
|
|
originalNode, found := r.nodes[edge.SourceNodeId]
|
|
if !found {
|
|
err := fmt.Errorf("original node '%s' no exist in", originalNode.Name())
|
|
errs = append(errs, err)
|
|
continue
|
|
}
|
|
targetNode, found := r.nodes[edge.TargetNodeId]
|
|
if !found {
|
|
err := fmt.Errorf("target node '%s' no exist in rulechain", targetNode.Name())
|
|
errs = append(errs, err)
|
|
continue
|
|
}
|
|
//可以有多个类型
|
|
split := strings.Split(edge.Properties["type"].(string), "/")
|
|
for _, ty := range split {
|
|
originalNode.AddLinkedNode(ty, targetNode)
|
|
}
|
|
}
|
|
for name, node := range r.nodes {
|
|
targetNodes := node.GetLinkedNodes()
|
|
mustLabels := node.MustLabels()
|
|
for _, label := range mustLabels {
|
|
if _, found := targetNodes[label]; !found {
|
|
err := fmt.Errorf("the label '%s' in node '%s' no exist'", label, name)
|
|
errs = append(errs, err)
|
|
continue
|
|
}
|
|
}
|
|
}
|
|
|
|
return r, errs
|
|
}
|
|
|
|
// StartRuleChain
|
|
func (c *ruleChainInstance) StartRuleChain(context context.Context, message message.Message) error {
|
|
if node, found := c.nodes[c.firstRuleNodeId]; found {
|
|
go node.Handle(message)
|
|
}
|
|
return nil
|
|
}
|