规则引擎

This commit is contained in:
PandaGoAdmin
2023-02-28 14:56:21 +08:00
parent e048fa53a5
commit 8e854ce527
13 changed files with 667 additions and 215 deletions

View File

@@ -0,0 +1,89 @@
package rule_engine
import (
"context"
"fmt"
"github.com/sirupsen/logrus"
"pandax/pkg/rule_engine/manifest"
"pandax/pkg/rule_engine/message"
"pandax/pkg/rule_engine/nodes"
)
// ruleChainInstance is rulechain's runtime instance that manage all nodes in this chain,
type ruleChainInstance struct {
firstRuleNodeId string
nodes map[string]nodes.Node
}
func newRuleChainInstance(data []byte) (*ruleChainInstance, []error) {
errors := []error{}
manifest, err := manifest.New(data)
if err != nil {
errors = append(errors, err)
logrus.WithError(err).Errorf("invalidi manifest file")
return nil, errors
}
return newInstanceWithManifest(manifest)
}
// newWithManifest create rule chain by user's manifest file
func newInstanceWithManifest(m *manifest.Manifest) (*ruleChainInstance, []error) {
errs := []error{}
r := &ruleChainInstance{
firstRuleNodeId: m.FirstRuleNodeId,
nodes: make(map[string]nodes.Node),
}
// Create All nodes
for _, n := range m.Nodes {
metadata := nodes.NewMetadataWithValues(n.Properties)
node, err := nodes.NewNode(n.Type, n.Id, metadata)
if err != nil {
errs = append(errs, err)
continue
}
if _, found := r.nodes[n.Id]; found {
err := fmt.Errorf("node '%s' already exist in rulechain", n.Id)
errs = append(errs, err)
continue
}
r.nodes[n.Id] = node
}
for _, edge := range m.Edges {
originalNode, found := r.nodes[edge.SourceNodeId]
if !found {
err := fmt.Errorf("original node '%s' no exist in", originalNode.Name())
errs = append(errs, err)
continue
}
targetNode, found := r.nodes[edge.TargetNodeId]
if !found {
err := fmt.Errorf("target node '%s' no exist in rulechain", targetNode.Name())
errs = append(errs, err)
continue
}
originalNode.AddLinkedNode(edge.Type, targetNode)
}
for name, node := range r.nodes {
targetNodes := node.GetLinkedNodes()
mustLabels := node.MustLabels()
for _, label := range mustLabels {
if _, found := targetNodes[label]; !found {
err := fmt.Errorf("the label '%s' in node '%s' no exist'", label, name)
errs = append(errs, err)
continue
}
}
}
return r, errs
}
// StartRuleChain TODO 是否需要添加context
func (c *ruleChainInstance) StartRuleChain(context context.Context, message message.Message) error {
if node, found := c.nodes[c.firstRuleNodeId]; found {
go node.Handle(message)
}
return nil
}

View File

@@ -0,0 +1,58 @@
package manifest
import (
"encoding/json"
"github.com/sirupsen/logrus"
)
type Node struct {
Type string `json:"type" yaml:"type"`
Id string `json:"id" yaml:"id"`
Properties map[string]interface{} `json:"properties" yaml:"properties"` //debugMode
}
type Edge struct {
SourceNodeId string `json:"sourceNodeId" yaml:"sourceNodeId"`
TargetNodeId string `json:"targetNodeId" yaml:"targetNodeId"`
Type string `json:"type" yaml:"type"` //success or fail
}
type Manifest struct {
FirstRuleNodeId string `json:"firstRuleNodeId" yaml:"firstRuleNodeId"`
Nodes []Node `json:"nodes" yaml:"nodes"`
Edges []Edge `json:"edges" yaml:"edges"`
}
func New(data []byte) (*Manifest, error) {
firstRuleNodeId := ""
manifest := make(map[string]interface{})
if err := json.Unmarshal(data, manifest); err != nil {
logrus.WithError(err).Errorf("invalid node chain manifest file")
return nil, err
}
nodes := make([]Node, 0)
for _, node := range manifest["nodes"].([]map[string]interface{}) {
if node["type"].(string) == "InputNode" {
firstRuleNodeId = node["id"].(string)
}
nodes = append(nodes, Node{
Id: node["id"].(string),
Type: node["type"].(string),
Properties: node["properties"].(map[string]interface{}),
})
}
edges := make([]Edge, 0)
for _, edge := range manifest["edges"].([]map[string]interface{}) {
edges = append(edges, Edge{
Type: edge["type"].(string),
SourceNodeId: edge["sourceNodeId"].(string),
TargetNodeId: edge["targetNodeId"].(string),
})
}
m := &Manifest{
FirstRuleNodeId: firstRuleNodeId,
Nodes: nodes,
Edges: edges,
}
return m, nil
}

View File

@@ -0,0 +1,125 @@
{
"nodes": [
{
"id": "3b6b8df4-445f-4e70-9674-f7aa486b3d81",
"type": "InputNode",
"x": 280,
"y": 280,
"properties": {
"icon": "/src/assets/icon_module/svg/start.svg",
"debugMode": false,
"status": false
},
"zIndex": 1002,
"text": {
"x": 290,
"y": 280,
"value": "输入"
}
},
{
"id": "45afb241-1977-4cbf-8af3-c12844d5666b",
"type": "DelayNode",
"x": 600,
"y": 160,
"properties": {
"icon": "/src/assets/icon_module/svg/function.svg",
"debugMode": false,
"status": false
},
"zIndex": 1004,
"text": {
"x": 610,
"y": 160,
"value": "延迟"
}
},
{
"id": "95047b03-e966-4685-b625-3cea27415706",
"type": "SwitchNode",
"x": 600,
"y": 460,
"properties": {
"icon": "/src/assets/icon_module/svg/switch.svg",
"debugMode": false,
"status": false
},
"zIndex": 1006,
"text": {
"x": 610,
"y": 460,
"value": "分流"
}
}
],
"edges": [
{
"id": "fde7f2de-cc0f-467d-a614-4505f058dc2a",
"type": "bezier-link",
"sourceNodeId": "3b6b8df4-445f-4e70-9674-f7aa486b3d81",
"targetNodeId": "45afb241-1977-4cbf-8af3-c12844d5666b",
"startPoint": {
"x": 340,
"y": 280
},
"endPoint": {
"x": 540,
"y": 160
},
"properties": {},
"zIndex": 1007,
"pointsList": [
{
"x": 340,
"y": 280
},
{
"x": 440,
"y": 280
},
{
"x": 440,
"y": 160
},
{
"x": 540,
"y": 160
}
]
},
{
"id": "986878cc-a9be-42b5-afc0-6c0a169c4e4d",
"type": "bezier-link",
"sourceNodeId": "3b6b8df4-445f-4e70-9674-f7aa486b3d81",
"targetNodeId": "95047b03-e966-4685-b625-3cea27415706",
"startPoint": {
"x": 340,
"y": 280
},
"endPoint": {
"x": 540,
"y": 460
},
"properties": {},
"zIndex": 1008,
"pointsList": [
{
"x": 340,
"y": 280
},
{
"x": 440,
"y": 280
},
{
"x": 440,
"y": 460
},
{
"x": 540,
"y": 460
}
]
}
]
}

View File

@@ -0,0 +1,55 @@
package message
// Message ...
type Message interface {
GetOriginator() string
GetType() string
GetPayload() []byte
SetType(string)
SetPayload([]byte)
SetOriginator(string)
MarshalBinary() ([]byte, error)
UnmarshalBinary(b []byte) error
}
// Predefined message types
const (
MessageTypePostAttributesRequest = "Post attributes"
MessageTypePostTelemetryRequest = "Post telemetry"
MessageTypeActivityEvent = "Activity event"
MessageTypeInactivityEvent = "Inactivity event"
MessageTypeConnectEvent = "Connect event"
MessageTypeDisconnectEvent = "Disconnect event"
)
// NewMessage ...
func NewMessage() Message {
return &defaultMessage{
payload: []byte{},
}
}
type defaultMessage struct {
originator string //数据发布者
messageType string //数据类型,数据来源
payload []byte //二进制数据
}
// NewMessageWithDetail ...
func NewMessageWithDetail(originator string, messageType string, payload []byte) Message {
return &defaultMessage{
originator: originator,
messageType: messageType,
payload: payload,
}
}
func (t *defaultMessage) GetOriginator() string { return t.originator }
func (t *defaultMessage) GetType() string { return t.messageType }
func (t *defaultMessage) GetPayload() []byte { return t.payload }
func (t *defaultMessage) SetType(messageType string) { t.messageType = messageType }
func (t *defaultMessage) SetPayload(payload []byte) { t.payload = payload }
func (t *defaultMessage) SetOriginator(originator string) { t.originator = originator }
func (t *defaultMessage) MarshalBinary() ([]byte, error) { return nil, nil }
func (t *defaultMessage) UnmarshalBinary(b []byte) error { return nil }

View File

@@ -0,0 +1,76 @@
package nodes
import (
"fmt"
"pandax/pkg/rule_engine/message"
"sync"
"time"
"github.com/sirupsen/logrus"
)
const DelayNodeName = "DelayNode"
type delayNode struct {
bareNode
PeriodTs int `json:"periodTs" yaml:"periodTs" jpath:"periodTs"`
MaxPendingMessages int `json:"maxPendingMessages" yaml:"maxPendingMessages" jpath:"maxPendingMessages"`
messageQueue []message.Message `jpath:"-"`
delayTimer *time.Timer `jpath:"-"`
lock sync.Mutex `jpath:"-"`
}
type delayNodeFactory struct{}
func (f delayNodeFactory) Name() string { return DelayNodeName }
func (f delayNodeFactory) Category() string { return NODE_CATEGORY_ACTION }
func (f delayNodeFactory) Create(id string, meta Metadata) (Node, error) {
labels := []string{"Success", "Failure"}
node := &delayNode{
bareNode: newBareNode(f.Name(), id, meta, labels),
lock: sync.Mutex{},
}
_, err := decodePath(meta, node)
node.messageQueue = make([]message.Message, node.MaxPendingMessages)
return node, err
}
func (n *delayNode) Handle(msg message.Message) error {
logrus.Infof("%s handle message '%s'", n.Name(), msg.GetType())
successLabelNode := n.GetLinkedNode("Success")
failureLabelNode := n.GetLinkedNode("Failure")
if successLabelNode == nil || failureLabelNode == nil {
return fmt.Errorf("no valid label linked node in %s", n.Name())
}
// check wethere the time had already been started, queue message if started
if n.delayTimer == nil {
n.messageQueue = append(n.messageQueue, msg)
n.delayTimer = time.NewTimer(time.Duration(n.PeriodTs) * time.Second)
// start timecallback goroutine
go func(n *delayNode) error {
defer n.delayTimer.Stop()
for {
<-n.delayTimer.C
n.lock.Lock()
defer n.lock.Unlock()
if len(n.messageQueue) > 0 {
msg := n.messageQueue[0]
n.messageQueue = n.messageQueue[0:]
return successLabelNode.Handle(msg)
}
}
}(n)
return nil
}
// the delay timer had already been created, just queue message
n.lock.Lock()
defer n.lock.Unlock()
if len(n.messageQueue) == n.MaxPendingMessages {
return failureLabelNode.Handle(msg)
}
n.messageQueue = append(n.messageQueue, msg)
return nil
}

View File

@@ -0,0 +1,53 @@
package nodes
import (
"fmt"
)
const (
NODE_CATEGORY_FILTER = "filter"
NODE_CATEGORY_ACTION = "action"
NODE_CATEGORY_ENRICHMENT = "enrichment"
NODE_CATEGORY_TRANSFORM = "transform"
NODE_CATEGORY_EXTERNAL = "external"
NODE_CATEGORY_OTHERS = "others"
)
// Factory is node's factory to create node based on metadata
// factory also manage node's metadta description which can be used by other
// service to present node in web
type Factory interface {
Name() string
Category() string
Create(id string, meta Metadata) (Node, error)
}
var (
// allNodeFactories hold all node's factory
allNodeFactories map[string]Factory = make(map[string]Factory)
// allNodeCategories hold node's metadata by category
allNodeCategories map[string][]string = make(map[string][]string)
)
// RegisterFactory add a new node factory and classify its category for
// metadata description
func RegisterFactory(f Factory) {
allNodeFactories[f.Name()] = f
if allNodeCategories[f.Category()] == nil {
allNodeCategories[f.Category()] = []string{}
}
allNodeCategories[f.Category()] = append(allNodeCategories[f.Category()], f.Name())
}
// NewNode is the only way to create a new node
func NewNode(nodeType string, id string, meta Metadata) (Node, error) {
if f, found := allNodeFactories[nodeType]; found {
return f.Create(id, meta)
}
return nil, fmt.Errorf("invalid node type '%s'", nodeType)
}
// GetCategoryNodes return specified category's all nodes
func GetCategoryNodes() map[string][]string { return allNodeCategories }

View File

@@ -0,0 +1,43 @@
package nodes
import (
"github.com/sirupsen/logrus"
"pandax/pkg/rule_engine/message"
)
type switchFilterNode struct {
bareNode
Scripts string `json:"scripts" yaml:"scripts"`
}
type switchFilterNodeFactory struct{}
func (f switchFilterNodeFactory) Name() string { return "SwitchNode" }
func (f switchFilterNodeFactory) Category() string { return NODE_CATEGORY_FILTER }
func (f switchFilterNodeFactory) Create(id string, meta Metadata) (Node, error) {
labels := []string{}
node := &switchFilterNode{
bareNode: newBareNode(f.Name(), id, meta, labels),
}
return decodePath(meta, node)
}
func (n *switchFilterNode) Handle(msg message.Message) error {
logrus.Infof("%s handle message '%s'", n.Name(), msg.GetType())
/*scriptEngine := NewScriptEngine()
SwitchResults, err := scriptEngine.ScriptOnSwitch(msg, n.Scripts)
if err != nil {
return nil
}
nodes := n.GetLinkedNodes()
for label, node := range nodes {
for _, switchresult := range SwitchResults {
if label == switchresult {
return node.Handle(msg)
}
}
}*/
return nil
}

View File

@@ -0,0 +1,6 @@
package nodes
// init register all node's factory
func init() {
RegisterFactory(inputNodeFactory{})
}

View File

@@ -0,0 +1,35 @@
package nodes
import (
"github.com/sirupsen/logrus"
"pandax/pkg/rule_engine/message"
)
const InputNodeName = "InputNode"
type inputNode struct {
bareNode
}
type inputNodeFactory struct{}
func (f inputNodeFactory) Name() string { return "InputNode" }
func (f inputNodeFactory) Category() string { return NODE_CATEGORY_OTHERS }
func (f inputNodeFactory) Create(id string, meta Metadata) (Node, error) {
labels := []string{}
node := &inputNode{
bareNode: newBareNode(InputNodeName, id, meta, labels),
}
return node, nil
}
func (n *inputNode) Handle(msg message.Message) error {
logrus.Infof("%s handle message '%s'", n.Name(), msg.GetType())
nodes := n.GetLinkedNodes()
for _, node := range nodes {
return node.Handle(msg)
}
return nil
}

View File

@@ -0,0 +1,62 @@
package nodes
import (
"fmt"
"github.com/XM-GO/PandaKit/utils"
)
const (
NODE_CONFIG_MESSAGE_TYPE_KEY = "messageTypeKey"
NODE_CONFIG_ORIGINATOR_TYPE_KEY = "originatorTypeKey"
)
type Metadata interface {
Keys() []string
With(key string, val interface{}) Metadata
Value(key string) (interface{}, error)
DecodePath(rawVal interface{}) error
}
type nodeMetadata struct {
keypairs map[string]interface{}
}
func NewMetadata() Metadata {
return &nodeMetadata{
keypairs: make(map[string]interface{}),
}
}
func NewMetadataWithString(vals string) Metadata {
return &nodeMetadata{}
}
func NewMetadataWithValues(vals map[string]interface{}) Metadata {
return &nodeMetadata{
keypairs: vals,
}
}
func (c *nodeMetadata) Keys() []string {
keys := []string{}
for key, _ := range c.keypairs {
keys = append(keys, key)
}
return keys
}
func (c *nodeMetadata) Value(key string) (interface{}, error) {
if val, found := c.keypairs[key]; found {
return val, nil
}
return nil, fmt.Errorf("key '%s' not found", key)
}
func (c *nodeMetadata) With(key string, val interface{}) Metadata {
c.keypairs[key] = val
return c
}
func (c *nodeMetadata) DecodePath(rawVal interface{}) error {
return utils.Map2Struct(c.keypairs, rawVal)
}

View File

@@ -0,0 +1,65 @@
package nodes
import (
"errors"
"pandax/pkg/rule_engine/message"
"github.com/sirupsen/logrus"
)
type Node interface {
Name() string
Id() string
Metadata() Metadata
MustLabels() []string
Handle(message.Message) error
AddLinkedNode(label string, node Node)
GetLinkedNode(label string) Node
GetLinkedNodes() map[string]Node
}
type bareNode struct {
name string
id string
nodes map[string]Node
meta Metadata
labels []string
}
func newBareNode(name string, id string, meta Metadata, labels []string) bareNode {
return bareNode{
name: name,
id: id,
nodes: make(map[string]Node),
meta: meta,
labels: labels,
}
}
func (n *bareNode) Name() string { return n.name }
func (n *bareNode) WithId(id string) { n.id = id }
func (n *bareNode) Id() string { return n.id }
func (n *bareNode) MustLabels() []string { return n.labels }
func (n *bareNode) AddLinkedNode(label string, node Node) { n.nodes[label] = node }
func (n *bareNode) GetLinkedNode(label string) Node {
if node, found := n.nodes[label]; found {
return node
}
logrus.Errorf("no label '%s' in node '%s'", label, n.name)
return nil
}
func (n *bareNode) GetLinkedNodes() map[string]Node { return n.nodes }
func (n *bareNode) Metadata() Metadata { return n.meta }
func (n *bareNode) Handle(message.Message) error { return errors.New("not implemented") }
func decodePath(meta Metadata, n Node) (Node, error) {
if err := meta.DecodePath(n); err != nil {
return n, err
}
return n, nil
}

View File

@@ -1,76 +0,0 @@
package rule_engine
import (
"context"
"time"
)
const (
UPDATE_RULE_STATUS_START = "start"
UPDATE_RULE_STATUS_STOP = "stop"
)
//RuleEngine RuleEngine
type RuleEngine struct {
Name string
ID string
Description string
DebugMode bool
Status string
Payload []byte
Root bool
Channel string
SubTopic string
CreateAt time.Time
LastUpdateAt time.Time
}
type PageMetadata struct {
Total uint64
Offset uint64
Limit uint64
Name string
}
type RuleEnginePage struct {
PageMetadata
RuleChains []RuleEngine
}
// Validate returns an error if representtation is invalid
func (r RuleEngine) Validate() error {
if r.ID == "" {
return ErrMalformedEntity
}
return nil
}
//RuleEngineRepository specifies realm persistence API
type RuleEngineRepository interface {
//Save save the RuleEngine
Save(context.Context, RuleEngine) error
//Update the RuleEngine
Update(context.Context, RuleEngine) (RuleEngine, error)
//Retrieve return RuleEngine by RuleEngine id
Retrieve(context.Context, string) (RuleEngine, error)
//Revoke remove RuleEngine by RuleEngine id
Revoke(context.Context, string) error
//List return all RuleEngines
List(context.Context, uint64, uint64) (RuleEnginePage, error)
}
// RuleEngineCache contains thing caching interface.
type RuleEngineCache interface {
// Save stores pair thing key, thing id.
Save(context.Context, string, string) error
// ID returns thing ID for given key.
ID(context.Context, string) (string, error)
// Remove thing from cache.
Remove(context.Context, string) error
}

View File

@@ -1,139 +0,0 @@
package rule_engine
import (
"context"
"github.com/pkg/errors"
)
const (
RULE_STATUS_CREATED = "created"
RULE_STATUS_STARTED = "started"
RULE_STATUS_STOPPED = "stopped"
RULE_STATUS_UNKNOWN = "unknown"
)
var (
// ErrConflict indicates usage of the existing email during account
// registration.
ErrConflict = errors.New("email already taken")
// ErrMalformedEntity indicates malformed entity specification
// (e.g. invalid realmname or password).
ErrMalformedEntity = errors.New("malformed entity specification")
// ErrUnauthorizedAccess indicates missing or invalid credentials provided
// when accessing a protected resource.
ErrUnauthorizedAccess = errors.New("missing or invalid credentials provided")
// ErrNotFound indicates a non-existent entity request.
ErrNotFound = errors.New("non-existent entity")
// ErrruleEngineNotFound indicates a non-existent realm request.
ErrruleEngineNotFound = errors.New("non-existent ruleEngine")
// ErrScanMetadata indicates problem with metadata in db.
ErrScanMetadata = errors.New("Failed to scan metadata")
// ErrMissingEmail indicates missing email for password reset request.
ErrMissingEmail = errors.New("missing email for password reset")
// ErrUnauthorizedPrincipal indicate the pricipal can not be recognized
ErrUnauthorizedPrincipal = errors.New("unauthorized principal")
)
//Service service
type Service interface {
AddNewruleEngine(context.Context, RuleEngine) error
GetruleEngineInfo(context.Context, string) (RuleEngine, error)
UpdateruleEngine(context.Context, RuleEngine) (RuleEngine, error)
RevokeruleEngine(context.Context, string) error
ListruleEngine(context.Context, uint64, uint64) (RuleEnginePage, error)
UpdateruleEngineStatus(context.Context, string, string) error
}
/*
var _ Service = (*ruleEngineService)(nil)
type ruleEngineService struct {
ruleEngines RuleEngineRepository
mutex sync.RWMutex
instanceManager instanceManager
ruleEnginesCache RuleEngineCache
}
//New new
func New(ruleEngines RuleEngineRepository, instancemanager instanceManager, ruleEngineCache RuleEngineCache) Service {
return &ruleEngineService{
ruleEngines: ruleEngines,
instanceManager: instancemanager,
ruleEnginesCache: ruleEngineCache,
}
}
func (svc ruleEngineService) AddNewruleEngine(ctx context.Context, ruleEngine RuleEngine) error {
return svc.ruleEngines.Save(ctx, ruleEngine)
}
func (svc ruleEngineService) GetruleEngineInfo(ctx context.Context, ruleEngineID string) (RuleEngine, error) {
ruleEngine, err := svc.ruleEngines.Retrieve(ctx, ruleEngineID)
if err != nil {
return RuleEngine{}, errors.Wrap(ErrruleEngineNotFound, err.Error())
}
return ruleEngine, nil
}
func (svc ruleEngineService) UpdateruleEngine(ctx context.Context, ruleEngine RuleEngine) (RuleEngine, error) {
old_ruleEngine, err := svc.ruleEngines.Retrieve(ctx, ruleEngine.ID)
if err != nil {
return RuleEngine{}, errors.Wrap(ErrruleEngineNotFound, err.Error())
}
if old_ruleEngine.Status == RULE_STATUS_STARTED {
return RuleEngine{}, status.Error(codes.FailedPrecondition, "")
}
return svc.ruleEngines.Update(ctx, ruleEngine)
}
func (svc ruleEngineService) RevokeruleEngine(ctx context.Context, ruleEngineID string) error {
ruleEngine, err := svc.ruleEngines.Retrieve(ctx, ruleEngineID)
if err != nil {
return errors.Wrap(ErrruleEngineNotFound, err.Error())
}
if ruleEngine.Status == RULE_STATUS_STARTED {
return status.Error(codes.FailedPrecondition, "")
}
return svc.ruleEngines.Revoke(ctx, ruleEngineID)
}
func (svc ruleEngineService) ListruleEngine(ctx context.Context, offset uint64, limit uint64) (RuleEnginePage, error) {
return svc.ruleEngines.List(ctx, offset, limit)
}
func (svc ruleEngineService) UpdateruleEngineStatus(ctx context.Context, ruleEngineID string, updatestatus string) error {
ruleEngine, err := svc.ruleEngines.Retrieve(ctx, ruleEngineID)
if err != nil {
return errors.Wrap(ErrruleEngineNotFound, err.Error())
}
switch updatestatus {
case UPDATE_RULE_STATUS_START:
if ruleEngine.Status != RULE_STATUS_CREATED && ruleEngine.Status != RULE_STATUS_STOPPED {
return status.Error(codes.FailedPrecondition, "")
}
return svc.instanceManager.startRuleEngine(&ruleEngine)
case UPDATE_RULE_STATUS_STOP:
if ruleEngine.Status != RULE_STATUS_STARTED {
return status.Error(codes.FailedPrecondition, "")
}
return svc.instanceManager.stopRuleEngine(&ruleEngine)
}
return nil
}
*/