diff --git a/pkg/rule_engine/instance.go b/pkg/rule_engine/instance.go new file mode 100644 index 0000000..cb7e1f1 --- /dev/null +++ b/pkg/rule_engine/instance.go @@ -0,0 +1,89 @@ +package rule_engine + +import ( + "context" + "fmt" + "github.com/sirupsen/logrus" + "pandax/pkg/rule_engine/manifest" + "pandax/pkg/rule_engine/message" + "pandax/pkg/rule_engine/nodes" +) + +// ruleChainInstance is rulechain's runtime instance that manage all nodes in this chain, +type ruleChainInstance struct { + firstRuleNodeId string + nodes map[string]nodes.Node +} + +func newRuleChainInstance(data []byte) (*ruleChainInstance, []error) { + errors := []error{} + + manifest, err := manifest.New(data) + if err != nil { + errors = append(errors, err) + logrus.WithError(err).Errorf("invalidi manifest file") + return nil, errors + } + return newInstanceWithManifest(manifest) +} + +// newWithManifest create rule chain by user's manifest file +func newInstanceWithManifest(m *manifest.Manifest) (*ruleChainInstance, []error) { + errs := []error{} + r := &ruleChainInstance{ + firstRuleNodeId: m.FirstRuleNodeId, + nodes: make(map[string]nodes.Node), + } + // Create All nodes + for _, n := range m.Nodes { + metadata := nodes.NewMetadataWithValues(n.Properties) + node, err := nodes.NewNode(n.Type, n.Id, metadata) + if err != nil { + errs = append(errs, err) + continue + } + if _, found := r.nodes[n.Id]; found { + err := fmt.Errorf("node '%s' already exist in rulechain", n.Id) + errs = append(errs, err) + continue + } + r.nodes[n.Id] = node + } + + for _, edge := range m.Edges { + originalNode, found := r.nodes[edge.SourceNodeId] + if !found { + err := fmt.Errorf("original node '%s' no exist in", originalNode.Name()) + errs = append(errs, err) + continue + } + targetNode, found := r.nodes[edge.TargetNodeId] + if !found { + err := fmt.Errorf("target node '%s' no exist in rulechain", targetNode.Name()) + errs = append(errs, err) + continue + } + originalNode.AddLinkedNode(edge.Type, targetNode) + } + for name, node := range r.nodes { + targetNodes := node.GetLinkedNodes() + mustLabels := node.MustLabels() + for _, label := range mustLabels { + if _, found := targetNodes[label]; !found { + err := fmt.Errorf("the label '%s' in node '%s' no exist'", label, name) + errs = append(errs, err) + continue + } + } + } + + return r, errs +} + +// StartRuleChain TODO 是否需要添加context +func (c *ruleChainInstance) StartRuleChain(context context.Context, message message.Message) error { + if node, found := c.nodes[c.firstRuleNodeId]; found { + go node.Handle(message) + } + return nil +} diff --git a/pkg/rule_engine/manifest/manifest.go b/pkg/rule_engine/manifest/manifest.go new file mode 100644 index 0000000..fa32a41 --- /dev/null +++ b/pkg/rule_engine/manifest/manifest.go @@ -0,0 +1,58 @@ +package manifest + +import ( + "encoding/json" + "github.com/sirupsen/logrus" +) + +type Node struct { + Type string `json:"type" yaml:"type"` + Id string `json:"id" yaml:"id"` + Properties map[string]interface{} `json:"properties" yaml:"properties"` //debugMode +} + +type Edge struct { + SourceNodeId string `json:"sourceNodeId" yaml:"sourceNodeId"` + TargetNodeId string `json:"targetNodeId" yaml:"targetNodeId"` + Type string `json:"type" yaml:"type"` //success or fail +} + +type Manifest struct { + FirstRuleNodeId string `json:"firstRuleNodeId" yaml:"firstRuleNodeId"` + Nodes []Node `json:"nodes" yaml:"nodes"` + Edges []Edge `json:"edges" yaml:"edges"` +} + +func New(data []byte) (*Manifest, error) { + firstRuleNodeId := "" + manifest := make(map[string]interface{}) + if err := json.Unmarshal(data, manifest); err != nil { + logrus.WithError(err).Errorf("invalid node chain manifest file") + return nil, err + } + nodes := make([]Node, 0) + for _, node := range manifest["nodes"].([]map[string]interface{}) { + if node["type"].(string) == "InputNode" { + firstRuleNodeId = node["id"].(string) + } + nodes = append(nodes, Node{ + Id: node["id"].(string), + Type: node["type"].(string), + Properties: node["properties"].(map[string]interface{}), + }) + } + edges := make([]Edge, 0) + for _, edge := range manifest["edges"].([]map[string]interface{}) { + edges = append(edges, Edge{ + Type: edge["type"].(string), + SourceNodeId: edge["sourceNodeId"].(string), + TargetNodeId: edge["targetNodeId"].(string), + }) + } + m := &Manifest{ + FirstRuleNodeId: firstRuleNodeId, + Nodes: nodes, + Edges: edges, + } + return m, nil +} diff --git a/pkg/rule_engine/manifest/manifest_sample.json b/pkg/rule_engine/manifest/manifest_sample.json new file mode 100644 index 0000000..a6dcf80 --- /dev/null +++ b/pkg/rule_engine/manifest/manifest_sample.json @@ -0,0 +1,125 @@ +{ + "nodes": [ + { + "id": "3b6b8df4-445f-4e70-9674-f7aa486b3d81", + "type": "InputNode", + "x": 280, + "y": 280, + "properties": { + "icon": "/src/assets/icon_module/svg/start.svg", + "debugMode": false, + "status": false + }, + "zIndex": 1002, + "text": { + "x": 290, + "y": 280, + "value": "输入" + } + }, + { + "id": "45afb241-1977-4cbf-8af3-c12844d5666b", + "type": "DelayNode", + "x": 600, + "y": 160, + "properties": { + "icon": "/src/assets/icon_module/svg/function.svg", + "debugMode": false, + "status": false + }, + "zIndex": 1004, + "text": { + "x": 610, + "y": 160, + "value": "延迟" + } + }, + { + "id": "95047b03-e966-4685-b625-3cea27415706", + "type": "SwitchNode", + "x": 600, + "y": 460, + "properties": { + "icon": "/src/assets/icon_module/svg/switch.svg", + "debugMode": false, + "status": false + }, + "zIndex": 1006, + "text": { + "x": 610, + "y": 460, + "value": "分流" + } + } + ], + "edges": [ + { + "id": "fde7f2de-cc0f-467d-a614-4505f058dc2a", + "type": "bezier-link", + "sourceNodeId": "3b6b8df4-445f-4e70-9674-f7aa486b3d81", + "targetNodeId": "45afb241-1977-4cbf-8af3-c12844d5666b", + "startPoint": { + "x": 340, + "y": 280 + }, + "endPoint": { + "x": 540, + "y": 160 + }, + "properties": {}, + "zIndex": 1007, + "pointsList": [ + { + "x": 340, + "y": 280 + }, + { + "x": 440, + "y": 280 + }, + { + "x": 440, + "y": 160 + }, + { + "x": 540, + "y": 160 + } + ] + }, + { + "id": "986878cc-a9be-42b5-afc0-6c0a169c4e4d", + "type": "bezier-link", + "sourceNodeId": "3b6b8df4-445f-4e70-9674-f7aa486b3d81", + "targetNodeId": "95047b03-e966-4685-b625-3cea27415706", + "startPoint": { + "x": 340, + "y": 280 + }, + "endPoint": { + "x": 540, + "y": 460 + }, + "properties": {}, + "zIndex": 1008, + "pointsList": [ + { + "x": 340, + "y": 280 + }, + { + "x": 440, + "y": 280 + }, + { + "x": 440, + "y": 460 + }, + { + "x": 540, + "y": 460 + } + ] + } + ] +} \ No newline at end of file diff --git a/pkg/rule_engine/message/message.go b/pkg/rule_engine/message/message.go new file mode 100644 index 0000000..493b931 --- /dev/null +++ b/pkg/rule_engine/message/message.go @@ -0,0 +1,55 @@ +package message + +// Message ... +type Message interface { + GetOriginator() string + GetType() string + GetPayload() []byte + SetType(string) + SetPayload([]byte) + SetOriginator(string) + MarshalBinary() ([]byte, error) + UnmarshalBinary(b []byte) error +} + +// Predefined message types +const ( + MessageTypePostAttributesRequest = "Post attributes" + MessageTypePostTelemetryRequest = "Post telemetry" + MessageTypeActivityEvent = "Activity event" + MessageTypeInactivityEvent = "Inactivity event" + MessageTypeConnectEvent = "Connect event" + MessageTypeDisconnectEvent = "Disconnect event" +) + +// NewMessage ... +func NewMessage() Message { + return &defaultMessage{ + payload: []byte{}, + } +} + +type defaultMessage struct { + originator string //数据发布者 + messageType string //数据类型,数据来源 + payload []byte //二进制数据 +} + +// NewMessageWithDetail ... +func NewMessageWithDetail(originator string, messageType string, payload []byte) Message { + return &defaultMessage{ + originator: originator, + messageType: messageType, + payload: payload, + } +} + +func (t *defaultMessage) GetOriginator() string { return t.originator } +func (t *defaultMessage) GetType() string { return t.messageType } +func (t *defaultMessage) GetPayload() []byte { return t.payload } +func (t *defaultMessage) SetType(messageType string) { t.messageType = messageType } +func (t *defaultMessage) SetPayload(payload []byte) { t.payload = payload } +func (t *defaultMessage) SetOriginator(originator string) { t.originator = originator } + +func (t *defaultMessage) MarshalBinary() ([]byte, error) { return nil, nil } +func (t *defaultMessage) UnmarshalBinary(b []byte) error { return nil } diff --git a/pkg/rule_engine/nodes/action_delay_node.go b/pkg/rule_engine/nodes/action_delay_node.go new file mode 100644 index 0000000..802d104 --- /dev/null +++ b/pkg/rule_engine/nodes/action_delay_node.go @@ -0,0 +1,76 @@ +package nodes + +import ( + "fmt" + "pandax/pkg/rule_engine/message" + "sync" + "time" + + "github.com/sirupsen/logrus" +) + +const DelayNodeName = "DelayNode" + +type delayNode struct { + bareNode + PeriodTs int `json:"periodTs" yaml:"periodTs" jpath:"periodTs"` + MaxPendingMessages int `json:"maxPendingMessages" yaml:"maxPendingMessages" jpath:"maxPendingMessages"` + messageQueue []message.Message `jpath:"-"` + delayTimer *time.Timer `jpath:"-"` + lock sync.Mutex `jpath:"-"` +} + +type delayNodeFactory struct{} + +func (f delayNodeFactory) Name() string { return DelayNodeName } +func (f delayNodeFactory) Category() string { return NODE_CATEGORY_ACTION } + +func (f delayNodeFactory) Create(id string, meta Metadata) (Node, error) { + labels := []string{"Success", "Failure"} + node := &delayNode{ + bareNode: newBareNode(f.Name(), id, meta, labels), + lock: sync.Mutex{}, + } + _, err := decodePath(meta, node) + node.messageQueue = make([]message.Message, node.MaxPendingMessages) + return node, err +} + +func (n *delayNode) Handle(msg message.Message) error { + logrus.Infof("%s handle message '%s'", n.Name(), msg.GetType()) + + successLabelNode := n.GetLinkedNode("Success") + failureLabelNode := n.GetLinkedNode("Failure") + if successLabelNode == nil || failureLabelNode == nil { + return fmt.Errorf("no valid label linked node in %s", n.Name()) + } + + // check wethere the time had already been started, queue message if started + if n.delayTimer == nil { + n.messageQueue = append(n.messageQueue, msg) + n.delayTimer = time.NewTimer(time.Duration(n.PeriodTs) * time.Second) + // start timecallback goroutine + go func(n *delayNode) error { + defer n.delayTimer.Stop() + for { + <-n.delayTimer.C + n.lock.Lock() + defer n.lock.Unlock() + if len(n.messageQueue) > 0 { + msg := n.messageQueue[0] + n.messageQueue = n.messageQueue[0:] + return successLabelNode.Handle(msg) + } + } + }(n) + return nil + } + // the delay timer had already been created, just queue message + n.lock.Lock() + defer n.lock.Unlock() + if len(n.messageQueue) == n.MaxPendingMessages { + return failureLabelNode.Handle(msg) + } + n.messageQueue = append(n.messageQueue, msg) + return nil +} diff --git a/pkg/rule_engine/nodes/factory.go b/pkg/rule_engine/nodes/factory.go new file mode 100644 index 0000000..b607f7c --- /dev/null +++ b/pkg/rule_engine/nodes/factory.go @@ -0,0 +1,53 @@ +package nodes + +import ( + "fmt" +) + +const ( + NODE_CATEGORY_FILTER = "filter" + NODE_CATEGORY_ACTION = "action" + NODE_CATEGORY_ENRICHMENT = "enrichment" + NODE_CATEGORY_TRANSFORM = "transform" + NODE_CATEGORY_EXTERNAL = "external" + NODE_CATEGORY_OTHERS = "others" +) + +// Factory is node's factory to create node based on metadata +// factory also manage node's metadta description which can be used by other +// service to present node in web +type Factory interface { + Name() string + Category() string + Create(id string, meta Metadata) (Node, error) +} + +var ( + // allNodeFactories hold all node's factory + allNodeFactories map[string]Factory = make(map[string]Factory) + + // allNodeCategories hold node's metadata by category + allNodeCategories map[string][]string = make(map[string][]string) +) + +// RegisterFactory add a new node factory and classify its category for +// metadata description +func RegisterFactory(f Factory) { + allNodeFactories[f.Name()] = f + + if allNodeCategories[f.Category()] == nil { + allNodeCategories[f.Category()] = []string{} + } + allNodeCategories[f.Category()] = append(allNodeCategories[f.Category()], f.Name()) +} + +// NewNode is the only way to create a new node +func NewNode(nodeType string, id string, meta Metadata) (Node, error) { + if f, found := allNodeFactories[nodeType]; found { + return f.Create(id, meta) + } + return nil, fmt.Errorf("invalid node type '%s'", nodeType) +} + +// GetCategoryNodes return specified category's all nodes +func GetCategoryNodes() map[string][]string { return allNodeCategories } diff --git a/pkg/rule_engine/nodes/filter_switch_node.go b/pkg/rule_engine/nodes/filter_switch_node.go new file mode 100644 index 0000000..722e76c --- /dev/null +++ b/pkg/rule_engine/nodes/filter_switch_node.go @@ -0,0 +1,43 @@ +package nodes + +import ( + "github.com/sirupsen/logrus" + "pandax/pkg/rule_engine/message" +) + +type switchFilterNode struct { + bareNode + Scripts string `json:"scripts" yaml:"scripts"` +} + +type switchFilterNodeFactory struct{} + +func (f switchFilterNodeFactory) Name() string { return "SwitchNode" } +func (f switchFilterNodeFactory) Category() string { return NODE_CATEGORY_FILTER } + +func (f switchFilterNodeFactory) Create(id string, meta Metadata) (Node, error) { + labels := []string{} + node := &switchFilterNode{ + bareNode: newBareNode(f.Name(), id, meta, labels), + } + return decodePath(meta, node) +} + +func (n *switchFilterNode) Handle(msg message.Message) error { + logrus.Infof("%s handle message '%s'", n.Name(), msg.GetType()) + + /*scriptEngine := NewScriptEngine() + SwitchResults, err := scriptEngine.ScriptOnSwitch(msg, n.Scripts) + if err != nil { + return nil + } + nodes := n.GetLinkedNodes() + for label, node := range nodes { + for _, switchresult := range SwitchResults { + if label == switchresult { + return node.Handle(msg) + } + } + }*/ + return nil +} diff --git a/pkg/rule_engine/nodes/init.go b/pkg/rule_engine/nodes/init.go new file mode 100644 index 0000000..d8d4d7b --- /dev/null +++ b/pkg/rule_engine/nodes/init.go @@ -0,0 +1,6 @@ +package nodes + +// init register all node's factory +func init() { + RegisterFactory(inputNodeFactory{}) +} diff --git a/pkg/rule_engine/nodes/input_node.go b/pkg/rule_engine/nodes/input_node.go new file mode 100644 index 0000000..5da4162 --- /dev/null +++ b/pkg/rule_engine/nodes/input_node.go @@ -0,0 +1,35 @@ +package nodes + +import ( + "github.com/sirupsen/logrus" + "pandax/pkg/rule_engine/message" +) + +const InputNodeName = "InputNode" + +type inputNode struct { + bareNode +} + +type inputNodeFactory struct{} + +func (f inputNodeFactory) Name() string { return "InputNode" } +func (f inputNodeFactory) Category() string { return NODE_CATEGORY_OTHERS } + +func (f inputNodeFactory) Create(id string, meta Metadata) (Node, error) { + labels := []string{} + node := &inputNode{ + bareNode: newBareNode(InputNodeName, id, meta, labels), + } + return node, nil +} + +func (n *inputNode) Handle(msg message.Message) error { + logrus.Infof("%s handle message '%s'", n.Name(), msg.GetType()) + + nodes := n.GetLinkedNodes() + for _, node := range nodes { + return node.Handle(msg) + } + return nil +} diff --git a/pkg/rule_engine/nodes/metadata.go b/pkg/rule_engine/nodes/metadata.go new file mode 100644 index 0000000..1af89c4 --- /dev/null +++ b/pkg/rule_engine/nodes/metadata.go @@ -0,0 +1,62 @@ +package nodes + +import ( + "fmt" + "github.com/XM-GO/PandaKit/utils" +) + +const ( + NODE_CONFIG_MESSAGE_TYPE_KEY = "messageTypeKey" + NODE_CONFIG_ORIGINATOR_TYPE_KEY = "originatorTypeKey" +) + +type Metadata interface { + Keys() []string + With(key string, val interface{}) Metadata + Value(key string) (interface{}, error) + DecodePath(rawVal interface{}) error +} + +type nodeMetadata struct { + keypairs map[string]interface{} +} + +func NewMetadata() Metadata { + return &nodeMetadata{ + keypairs: make(map[string]interface{}), + } +} + +func NewMetadataWithString(vals string) Metadata { + return &nodeMetadata{} +} + +func NewMetadataWithValues(vals map[string]interface{}) Metadata { + return &nodeMetadata{ + keypairs: vals, + } +} + +func (c *nodeMetadata) Keys() []string { + keys := []string{} + for key, _ := range c.keypairs { + keys = append(keys, key) + } + return keys +} + +func (c *nodeMetadata) Value(key string) (interface{}, error) { + if val, found := c.keypairs[key]; found { + return val, nil + } + return nil, fmt.Errorf("key '%s' not found", key) +} + +func (c *nodeMetadata) With(key string, val interface{}) Metadata { + c.keypairs[key] = val + return c +} + +func (c *nodeMetadata) DecodePath(rawVal interface{}) error { + return utils.Map2Struct(c.keypairs, rawVal) +} diff --git a/pkg/rule_engine/nodes/node.go b/pkg/rule_engine/nodes/node.go new file mode 100644 index 0000000..7a875d2 --- /dev/null +++ b/pkg/rule_engine/nodes/node.go @@ -0,0 +1,65 @@ +package nodes + +import ( + "errors" + "pandax/pkg/rule_engine/message" + + "github.com/sirupsen/logrus" +) + +type Node interface { + Name() string + Id() string + Metadata() Metadata + MustLabels() []string + Handle(message.Message) error + + AddLinkedNode(label string, node Node) + GetLinkedNode(label string) Node + GetLinkedNodes() map[string]Node +} + +type bareNode struct { + name string + id string + nodes map[string]Node + meta Metadata + labels []string +} + +func newBareNode(name string, id string, meta Metadata, labels []string) bareNode { + return bareNode{ + name: name, + id: id, + nodes: make(map[string]Node), + meta: meta, + labels: labels, + } +} + +func (n *bareNode) Name() string { return n.name } +func (n *bareNode) WithId(id string) { n.id = id } +func (n *bareNode) Id() string { return n.id } +func (n *bareNode) MustLabels() []string { return n.labels } +func (n *bareNode) AddLinkedNode(label string, node Node) { n.nodes[label] = node } + +func (n *bareNode) GetLinkedNode(label string) Node { + if node, found := n.nodes[label]; found { + return node + } + logrus.Errorf("no label '%s' in node '%s'", label, n.name) + return nil +} + +func (n *bareNode) GetLinkedNodes() map[string]Node { return n.nodes } + +func (n *bareNode) Metadata() Metadata { return n.meta } + +func (n *bareNode) Handle(message.Message) error { return errors.New("not implemented") } + +func decodePath(meta Metadata, n Node) (Node, error) { + if err := meta.DecodePath(n); err != nil { + return n, err + } + return n, nil +} diff --git a/pkg/rule_engine/rule_engine.go b/pkg/rule_engine/rule_engine.go deleted file mode 100644 index c5fdad3..0000000 --- a/pkg/rule_engine/rule_engine.go +++ /dev/null @@ -1,76 +0,0 @@ -package rule_engine - -import ( - "context" - "time" -) - -const ( - UPDATE_RULE_STATUS_START = "start" - UPDATE_RULE_STATUS_STOP = "stop" -) - -//RuleEngine RuleEngine -type RuleEngine struct { - Name string - ID string - Description string - DebugMode bool - Status string - Payload []byte - Root bool - Channel string - SubTopic string - CreateAt time.Time - LastUpdateAt time.Time -} - -type PageMetadata struct { - Total uint64 - Offset uint64 - Limit uint64 - Name string -} - -type RuleEnginePage struct { - PageMetadata - RuleChains []RuleEngine -} - -// Validate returns an error if representtation is invalid -func (r RuleEngine) Validate() error { - if r.ID == "" { - return ErrMalformedEntity - } - return nil -} - -//RuleEngineRepository specifies realm persistence API -type RuleEngineRepository interface { - //Save save the RuleEngine - Save(context.Context, RuleEngine) error - - //Update the RuleEngine - Update(context.Context, RuleEngine) (RuleEngine, error) - - //Retrieve return RuleEngine by RuleEngine id - Retrieve(context.Context, string) (RuleEngine, error) - - //Revoke remove RuleEngine by RuleEngine id - Revoke(context.Context, string) error - - //List return all RuleEngines - List(context.Context, uint64, uint64) (RuleEnginePage, error) -} - -// RuleEngineCache contains thing caching interface. -type RuleEngineCache interface { - // Save stores pair thing key, thing id. - Save(context.Context, string, string) error - - // ID returns thing ID for given key. - ID(context.Context, string) (string, error) - - // Remove thing from cache. - Remove(context.Context, string) error -} diff --git a/pkg/rule_engine/rule_service.go b/pkg/rule_engine/rule_service.go deleted file mode 100644 index 203458f..0000000 --- a/pkg/rule_engine/rule_service.go +++ /dev/null @@ -1,139 +0,0 @@ -package rule_engine - -import ( - "context" - "github.com/pkg/errors" -) - -const ( - RULE_STATUS_CREATED = "created" - RULE_STATUS_STARTED = "started" - RULE_STATUS_STOPPED = "stopped" - RULE_STATUS_UNKNOWN = "unknown" -) - -var ( - // ErrConflict indicates usage of the existing email during account - // registration. - ErrConflict = errors.New("email already taken") - - // ErrMalformedEntity indicates malformed entity specification - // (e.g. invalid realmname or password). - ErrMalformedEntity = errors.New("malformed entity specification") - - // ErrUnauthorizedAccess indicates missing or invalid credentials provided - // when accessing a protected resource. - ErrUnauthorizedAccess = errors.New("missing or invalid credentials provided") - - // ErrNotFound indicates a non-existent entity request. - ErrNotFound = errors.New("non-existent entity") - - // ErrruleEngineNotFound indicates a non-existent realm request. - ErrruleEngineNotFound = errors.New("non-existent ruleEngine") - - // ErrScanMetadata indicates problem with metadata in db. - ErrScanMetadata = errors.New("Failed to scan metadata") - - // ErrMissingEmail indicates missing email for password reset request. - ErrMissingEmail = errors.New("missing email for password reset") - - // ErrUnauthorizedPrincipal indicate the pricipal can not be recognized - ErrUnauthorizedPrincipal = errors.New("unauthorized principal") -) - -//Service service -type Service interface { - AddNewruleEngine(context.Context, RuleEngine) error - GetruleEngineInfo(context.Context, string) (RuleEngine, error) - UpdateruleEngine(context.Context, RuleEngine) (RuleEngine, error) - RevokeruleEngine(context.Context, string) error - ListruleEngine(context.Context, uint64, uint64) (RuleEnginePage, error) - UpdateruleEngineStatus(context.Context, string, string) error -} - -/* -var _ Service = (*ruleEngineService)(nil) - -type ruleEngineService struct { - ruleEngines RuleEngineRepository - mutex sync.RWMutex - instanceManager instanceManager - ruleEnginesCache RuleEngineCache -} - -//New new -func New(ruleEngines RuleEngineRepository, instancemanager instanceManager, ruleEngineCache RuleEngineCache) Service { - return &ruleEngineService{ - ruleEngines: ruleEngines, - instanceManager: instancemanager, - ruleEnginesCache: ruleEngineCache, - } -} - -func (svc ruleEngineService) AddNewruleEngine(ctx context.Context, ruleEngine RuleEngine) error { - return svc.ruleEngines.Save(ctx, ruleEngine) -} - -func (svc ruleEngineService) GetruleEngineInfo(ctx context.Context, ruleEngineID string) (RuleEngine, error) { - ruleEngine, err := svc.ruleEngines.Retrieve(ctx, ruleEngineID) - if err != nil { - return RuleEngine{}, errors.Wrap(ErrruleEngineNotFound, err.Error()) - } - - return ruleEngine, nil -} - -func (svc ruleEngineService) UpdateruleEngine(ctx context.Context, ruleEngine RuleEngine) (RuleEngine, error) { - - old_ruleEngine, err := svc.ruleEngines.Retrieve(ctx, ruleEngine.ID) - if err != nil { - return RuleEngine{}, errors.Wrap(ErrruleEngineNotFound, err.Error()) - } - if old_ruleEngine.Status == RULE_STATUS_STARTED { - return RuleEngine{}, status.Error(codes.FailedPrecondition, "") - } - - return svc.ruleEngines.Update(ctx, ruleEngine) -} - -func (svc ruleEngineService) RevokeruleEngine(ctx context.Context, ruleEngineID string) error { - - ruleEngine, err := svc.ruleEngines.Retrieve(ctx, ruleEngineID) - if err != nil { - return errors.Wrap(ErrruleEngineNotFound, err.Error()) - } - if ruleEngine.Status == RULE_STATUS_STARTED { - return status.Error(codes.FailedPrecondition, "") - } - - return svc.ruleEngines.Revoke(ctx, ruleEngineID) -} - -func (svc ruleEngineService) ListruleEngine(ctx context.Context, offset uint64, limit uint64) (RuleEnginePage, error) { - return svc.ruleEngines.List(ctx, offset, limit) -} - -func (svc ruleEngineService) UpdateruleEngineStatus(ctx context.Context, ruleEngineID string, updatestatus string) error { - - ruleEngine, err := svc.ruleEngines.Retrieve(ctx, ruleEngineID) - if err != nil { - return errors.Wrap(ErrruleEngineNotFound, err.Error()) - } - - switch updatestatus { - case UPDATE_RULE_STATUS_START: - if ruleEngine.Status != RULE_STATUS_CREATED && ruleEngine.Status != RULE_STATUS_STOPPED { - return status.Error(codes.FailedPrecondition, "") - } - - return svc.instanceManager.startRuleEngine(&ruleEngine) - case UPDATE_RULE_STATUS_STOP: - if ruleEngine.Status != RULE_STATUS_STARTED { - return status.Error(codes.FailedPrecondition, "") - } - - return svc.instanceManager.stopRuleEngine(&ruleEngine) - } - return nil -} -*/