From 5dfa34108368fa94f745aa93ec55795da8d2da2a Mon Sep 17 00:00:00 2001 From: XM-GO <93296511+XM-GO@users.noreply.github.com> Date: Tue, 18 Apr 2023 16:29:26 +0800 Subject: [PATCH] =?UTF-8?q?=E8=A7=84=E5=88=99=E9=93=BE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .idea/PandaX.iml | 9 ++ .idea/modules.xml | 8 ++ .idea/vcs.xml | 6 + apps/visual/entity/rulechain.go | 14 ++ apps/visual/services/rulechain_log.go | 61 ++++++++ go.mod | 23 ++- go.sum | 54 ++++++- pkg/rule_engine/instance.go | 31 +--- pkg/rule_engine/instance_test.go | 29 ++-- pkg/rule_engine/manifest/manifest_sample.json | 136 +++++++++--------- pkg/rule_engine/message/message.go | 120 +++++++++------- .../nodes/action_clear_alarm_node.go | 25 ++-- .../nodes/action_create_alarm_node.go | 30 ++-- pkg/rule_engine/nodes/action_delay_node.go | 7 +- .../nodes/action_generator_node.go | 46 ++++-- pkg/rule_engine/nodes/action_log_node.go | 38 +++-- pkg/rule_engine/nodes/action_my_node.go | 29 ---- .../nodes/action_save_attributes_node.go | 33 ++++- .../nodes/action_save_timeseries_node.go | 21 ++- pkg/rule_engine/nodes/external_ding_node.go | 53 ++++++- pkg/rule_engine/nodes/external_kafka_node.go | 62 +++++++- pkg/rule_engine/nodes/external_mqtt_node.go | 35 +++-- pkg/rule_engine/nodes/external_nats_node.go | 19 ++- .../nodes/external_restapi_node.go | 74 +++++++++- .../nodes/external_rule_chain_node.go | 32 ++++- .../nodes/external_send_email_node.go | 85 ++++++++--- .../nodes/external_send_sms_node.go | 6 + pkg/rule_engine/nodes/external_wechat_node.go | 28 +++- .../nodes/filter_device_type_switch_node.go | 29 ++-- .../nodes/filter_message_type_switch_node.go | 2 - pkg/rule_engine/nodes/filter_script_node.go | 7 +- pkg/rule_engine/nodes/filter_switch_node.go | 10 +- pkg/rule_engine/nodes/init.go | 2 - pkg/rule_engine/nodes/metadata.go | 2 +- pkg/rule_engine/nodes/node.go | 2 +- pkg/rule_engine/nodes/script_engine.go | 102 ++++++++++--- pkg/rule_engine/nodes/template_engine.go | 19 +++ .../nodes/transform_delete_key_node.go | 20 ++- .../nodes/transform_rename_key_node.go | 28 ++-- .../nodes/transform_script_node.go | 18 ++- pkg/rule_engine/tool/email.go | 1 + pkg/rule_engine/tool/kafka.go | 1 + pkg/rule_engine/tool/mq.go | 1 + pkg/rule_engine/tool/mqtt.go | 1 + pkg/rule_engine/tool/rest_api.go | 1 + pkg/rule_engine/tool/sms.go | 1 + 46 files changed, 980 insertions(+), 381 deletions(-) create mode 100644 .idea/PandaX.iml create mode 100644 .idea/modules.xml create mode 100644 .idea/vcs.xml create mode 100644 apps/visual/services/rulechain_log.go delete mode 100644 pkg/rule_engine/nodes/action_my_node.go create mode 100644 pkg/rule_engine/nodes/template_engine.go create mode 100644 pkg/rule_engine/tool/email.go create mode 100644 pkg/rule_engine/tool/kafka.go create mode 100644 pkg/rule_engine/tool/mq.go create mode 100644 pkg/rule_engine/tool/mqtt.go create mode 100644 pkg/rule_engine/tool/rest_api.go create mode 100644 pkg/rule_engine/tool/sms.go diff --git a/.idea/PandaX.iml b/.idea/PandaX.iml new file mode 100644 index 0000000..5e764c4 --- /dev/null +++ b/.idea/PandaX.iml @@ -0,0 +1,9 @@ + + + + + + + + + \ No newline at end of file diff --git a/.idea/modules.xml b/.idea/modules.xml new file mode 100644 index 0000000..4ddee95 --- /dev/null +++ b/.idea/modules.xml @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/.idea/vcs.xml b/.idea/vcs.xml new file mode 100644 index 0000000..94a25f7 --- /dev/null +++ b/.idea/vcs.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/apps/visual/entity/rulechain.go b/apps/visual/entity/rulechain.go index b4c3b2a..d901075 100644 --- a/apps/visual/entity/rulechain.go +++ b/apps/visual/entity/rulechain.go @@ -2,6 +2,7 @@ package entity import ( "github.com/XM-GO/PandaKit/model" + "time" ) type VisualRuleChain struct { @@ -19,3 +20,16 @@ type VisualRuleChain struct { func (VisualRuleChain) TableName() string { return "visual_rule_chain" } + +type VisualRuleChainMsgLog struct { + MessageId string `json:"message_id"` + MsgType string `json:"msg_type"` + DeviceName string `json:"device_name"` + Ts time.Time `json:"ts"` + Content string `json:"content"` + CreatedAt time.Time // 创建时间 +} + +func (VisualRuleChainMsgLog) TableName() string { + return "visual_rule_chain_msg_log" +} diff --git a/apps/visual/services/rulechain_log.go b/apps/visual/services/rulechain_log.go new file mode 100644 index 0000000..9d34760 --- /dev/null +++ b/apps/visual/services/rulechain_log.go @@ -0,0 +1,61 @@ +// ========================================================================== +// 生成日期:2023-03-29 20:01:11 +0800 CST +// 生成路径: apps/visual/services/rulechain.go +// 生成人:panda +// ========================================================================== + +package services + +import ( + "github.com/XM-GO/PandaKit/biz" + "pandax/apps/visual/entity" + "pandax/pkg/global" +) + +type ( + VisualRuleChainMsgLogModel interface { + Insert(data entity.VisualRuleChainMsgLog) *entity.VisualRuleChainMsgLog + FindListPage(page, pageSize int, data entity.VisualRuleChainMsgLog) (*[]entity.VisualRuleChainMsgLog, int64) + Delete(ids []string) + } + + ruleChainLogModelImpl struct { + table string + } +) + +var VisualRuleChainMsgLogModelDao VisualRuleChainMsgLogModel = &ruleChainLogModelImpl{ + table: `visual_rule_chain_msg_log`, +} + +func (m *ruleChainLogModelImpl) Insert(data entity.VisualRuleChainMsgLog) *entity.VisualRuleChainMsgLog { + err := global.Db.Table(m.table).Create(&data).Error + biz.ErrIsNil(err, "添加规则链失败") + return &data +} + +func (m *ruleChainLogModelImpl) FindListPage(page, pageSize int, data entity.VisualRuleChainMsgLog) (*[]entity.VisualRuleChainMsgLog, int64) { + list := make([]entity.VisualRuleChainMsgLog, 0) + var total int64 = 0 + offset := pageSize * (page - 1) + db := global.Db.Table(m.table) + // 此处填写 where参数判断 + db.Where("delete_time IS NULL") + if data.DeviceName != "" { + db = db.Where("device_name = ?", data.DeviceName) + } + if data.MessageId != "" { + db = db.Where("message_id = ?", data.MessageId) + } + if data.MsgType != "" { + db = db.Where("msg_type = ?", data.MsgType) + } + err := db.Count(&total).Error + err = db.Order("create_at").Limit(pageSize).Offset(offset).Find(&list).Error + biz.ErrIsNil(err, "查询规则链分页列表失败") + return &list, total +} + +func (m *ruleChainLogModelImpl) Delete(ids []string) { + biz.ErrIsNil(global.Db.Table(m.table).Delete(&entity.VisualRuleChainMsgLog{}, "message_id in (?)", ids).Error, "删除规则链失败") +} diff --git a/go.mod b/go.mod index 40fdec1..f40b9d9 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module pandax go 1.18 require ( + github.com/Shopify/sarama v1.38.1 github.com/XM-GO/PandaKit v0.0.0-20220902065259-efd83b5ba4b2 github.com/dgrijalva/jwt-go v3.2.0+incompatible github.com/didip/tollbooth v4.0.2+incompatible @@ -13,6 +14,7 @@ require ( github.com/go-openapi/spec v0.20.6 github.com/google/uuid v1.3.0 github.com/gorilla/websocket v1.4.2 + github.com/jordan-wright/email v4.0.1-0.20210109023952-943e75fe5223+incompatible github.com/kakuilan/kgo v0.1.8 github.com/mitchellh/mapstructure v1.5.0 github.com/mssola/user_agent v0.5.3 @@ -32,8 +34,12 @@ require ( github.com/brianvoe/gofakeit/v6 v6.0.2 // indirect github.com/casbin/casbin/v2 v2.37.4 // indirect github.com/casbin/gorm-adapter/v3 v3.4.6 // indirect + github.com/davecgh/go-spew v1.1.1 // indirect github.com/denisenkom/go-mssqldb v0.11.0 // indirect github.com/dlclark/regexp2 v1.7.0 // indirect + github.com/eapache/go-resiliency v1.3.0 // indirect + github.com/eapache/go-xerial-snappy v0.0.0-20230111030713-bf00bc1b83b6 // indirect + github.com/eapache/queue v1.1.0 // indirect github.com/go-ole/go-ole v1.2.5 // indirect github.com/go-openapi/jsonpointer v0.19.5 // indirect github.com/go-openapi/jsonreference v0.20.0 // indirect @@ -44,8 +50,12 @@ require ( github.com/golang-sql/civil v0.0.0-20190719163853-cb61b32ac6fe // indirect github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0 // indirect github.com/golang/protobuf v1.5.2 // indirect + github.com/golang/snappy v0.0.4 // indirect github.com/google/go-querystring v1.0.0 // indirect github.com/google/pprof v0.0.0-20230207041349-798e818bf904 // indirect + github.com/hashicorp/errwrap v1.0.0 // indirect + github.com/hashicorp/go-multierror v1.1.1 // indirect + github.com/hashicorp/go-uuid v1.0.3 // indirect github.com/inconshreveable/mousetrap v1.0.0 // indirect github.com/jackc/chunkreader/v2 v2.0.1 // indirect github.com/jackc/pgconn v1.10.1 // indirect @@ -55,11 +65,16 @@ require ( github.com/jackc/pgservicefile v0.0.0-20200714003250-2b9c44734f2b // indirect github.com/jackc/pgtype v1.9.0 // indirect github.com/jackc/pgx/v4 v4.14.0 // indirect + github.com/jcmturner/aescts/v2 v2.0.0 // indirect + github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect + github.com/jcmturner/gofork v1.7.6 // indirect + github.com/jcmturner/gokrb5/v8 v8.4.3 // indirect + github.com/jcmturner/rpc/v2 v2.0.3 // indirect github.com/jinzhu/inflection v1.0.0 // indirect github.com/jinzhu/now v1.1.2 // indirect - github.com/jordan-wright/email v4.0.1-0.20210109023952-943e75fe5223+incompatible // indirect github.com/josharian/intern v1.0.0 // indirect github.com/json-iterator/go v1.1.12 // indirect + github.com/klauspost/compress v1.16.0 // indirect github.com/lib/pq v1.10.4 // indirect github.com/mailru/easyjson v0.7.6 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect @@ -72,8 +87,10 @@ require ( github.com/nats-io/nuid v1.0.1 // indirect github.com/nxadm/tail v1.4.8 // indirect github.com/patrickmn/go-cache v2.1.0+incompatible // indirect + github.com/pierrec/lz4/v4 v4.1.17 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/qiniu/go-sdk/v7 v7.11.0 // indirect + github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect github.com/richardlehane/mscfb v1.0.3 // indirect github.com/richardlehane/msoleps v1.0.1 // indirect github.com/spf13/pflag v1.0.5 // indirect @@ -82,14 +99,14 @@ require ( github.com/xuri/excelize/v2 v2.4.1 // indirect golang.org/x/image v0.0.0-20210220032944-ac19c3e999fb // indirect golang.org/x/net v0.6.0 // indirect - golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4 // indirect + golang.org/x/sync v0.1.0 // indirect golang.org/x/sys v0.5.0 // indirect golang.org/x/text v0.7.0 // indirect golang.org/x/time v0.3.0 // indirect google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013 // indirect google.golang.org/protobuf v1.28.0 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect - gopkg.in/yaml.v3 v3.0.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect gorm.io/driver/mysql v1.2.0 // indirect gorm.io/driver/postgres v1.2.3 // indirect gorm.io/driver/sqlserver v1.2.1 // indirect diff --git a/go.sum b/go.sum index c9a049f..3cbb952 100644 --- a/go.sum +++ b/go.sum @@ -8,6 +8,9 @@ github.com/Masterminds/semver/v3 v3.1.1/go.mod h1:VPu/7SZ7ePZ3QOrcuXROw5FAcLl4a0 github.com/PuerkitoBio/purell v1.1.1/go.mod h1:c11w/QuzBsJSee3cPx9rAFu61PvFxuPbtSwDGJws/X0= github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578/go.mod h1:uGdkoq3SwY9Y+13GIhn11/XLaGBb4BfwItxLd5jeuXE= github.com/QcloudApi/qcloud_sign_golang v0.0.0-20141224014652-e4130a326409/go.mod h1:1pk82RBxDY/JZnPQrtqHlUFfCctgdorsd9M06fMynOM= +github.com/Shopify/sarama v1.38.1 h1:lqqPUPQZ7zPqYlWpTh+LQ9bhYNu2xJL6k1SJN4WVe2A= +github.com/Shopify/sarama v1.38.1/go.mod h1:iwv9a67Ha8VNa+TifujYoWGxWnu2kNVAQdSdZ4X2o5g= +github.com/Shopify/toxiproxy/v2 v2.5.0 h1:i4LPT+qrSlKNtQf5QliVjdP08GyAH8+BUIc9gT0eahc= github.com/StackExchange/wmi v0.0.0-20210224194228-fe8f1750fd46 h1:5sXbqlSomvdjlRbWyNqkPsJ3Fg+tQZCbgeX1VGljbQY= github.com/StackExchange/wmi v0.0.0-20210224194228-fe8f1750fd46/go.mod h1:3eOhrUMpNV+6aFIbp5/iudMxNCF27Vw2OZgy4xEx0Fg= github.com/XM-GO/PandaKit v0.0.0-20220902065259-efd83b5ba4b2 h1:5wn9dKcH0JbmeObnxPMjOhA5nxcrCWR6O8WPXGQtLt4= @@ -58,6 +61,12 @@ github.com/dop251/goja v0.0.0-20230304130813-e2f543bf4b4c h1:/utv6nmTctV6OVgfk5+ github.com/dop251/goja v0.0.0-20230304130813-e2f543bf4b4c/go.mod h1:QMWlm50DNe14hD7t24KEqZuUdC9sOTy8W6XbCU1mlw4= github.com/dop251/goja_nodejs v0.0.0-20210225215109-d91c329300e7/go.mod h1:hn7BA7c8pLvoGndExHudxTDKZ84Pyvv+90pbBjbTz0Y= github.com/dop251/goja_nodejs v0.0.0-20211022123610-8dd9abb0616d/go.mod h1:DngW8aVqWbuLRMHItjPUyqdj+HWPvnQe8V8y1nDpIbM= +github.com/eapache/go-resiliency v1.3.0 h1:RRL0nge+cWGlxXbUzJ7yMcq6w2XBEr19dCN6HECGaT0= +github.com/eapache/go-resiliency v1.3.0/go.mod h1:5yPzW0MIvSe0JDsv0v+DvcjEv2FyD6iZYSs1ZI+iQho= +github.com/eapache/go-xerial-snappy v0.0.0-20230111030713-bf00bc1b83b6 h1:8yY/I9ndfrgrXUbOGObLHKBR4Fl3nZXwM2c7OYTT8hM= +github.com/eapache/go-xerial-snappy v0.0.0-20230111030713-bf00bc1b83b6/go.mod h1:YvSRo5mw33fLEx1+DlK6L2VV43tJt5Eyel9n9XBcR+0= +github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc= +github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I= github.com/eclipse/paho.mqtt.golang v1.4.2 h1:66wOzfUHSSI1zamx7jR6yMEI5EuHnT1G6rNA5PM12m4= github.com/eclipse/paho.mqtt.golang v1.4.2/go.mod h1:JGt0RsEwEX+Xa/agj90YJ9d9DH2b7upDZMK9HRbFvCA= github.com/emicklei/go-restful-openapi/v2 v2.9.0 h1:djsWqjhI0EVYfkLCCX6jZxUkLmYUq2q9tt09ZbixfyE= @@ -71,6 +80,7 @@ github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1m github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk= github.com/envoyproxy/go-control-plane v0.10.2-0.20220325020618-49ff273808a1/go.mod h1:KJwIaB5Mv44NWtYuAOFCVOjcI94vtpEz2JU/D2v6IjE= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= +github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw= github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= github.com/fsnotify/fsnotify v1.5.4 h1:jRbGcIw6P2Meqdwuo0H1p6JVLbL5DHKAKlYndzMwVZI= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= @@ -128,6 +138,8 @@ github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw= github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= +github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= +github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= @@ -146,9 +158,18 @@ github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+ github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/gorilla/securecookie v1.1.1/go.mod h1:ra0sb63/xPlUeL+yeDciTfxMRAA+MP+HVt/4epWDjd4= +github.com/gorilla/sessions v1.2.1/go.mod h1:dk2InVEVJ0sfLlnXv9EAgkf6ecYs/i80K/zI+bUmuGM= github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc= github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw= +github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA= +github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= +github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo= +github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM= +github.com/hashicorp/go-uuid v1.0.2/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= +github.com/hashicorp/go-uuid v1.0.3 h1:2gKiV6YVmrJ1i2CKKa9obLvRieoRGviZFL26PcT/Co8= +github.com/hashicorp/go-uuid v1.0.3/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= github.com/ianlancetaylor/demangle v0.0.0-20220319035150-800ac71e25c2/go.mod h1:aYm2/VgdVmcIU8iMfdMvDMsRAQjcfZSKFby6HOFvi/w= github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM= github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= @@ -202,6 +223,18 @@ github.com/jackc/puddle v0.0.0-20190413234325-e4ced69a3a2b/go.mod h1:m4B5Dj62Y0f github.com/jackc/puddle v0.0.0-20190608224051-11cab39313c9/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk= github.com/jackc/puddle v1.1.3/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk= github.com/jackc/puddle v1.2.0/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk= +github.com/jcmturner/aescts/v2 v2.0.0 h1:9YKLH6ey7H4eDBXW8khjYslgyqG2xZikXP0EQFKrle8= +github.com/jcmturner/aescts/v2 v2.0.0/go.mod h1:AiaICIRyfYg35RUkr8yESTqvSy7csK90qZ5xfvvsoNs= +github.com/jcmturner/dnsutils/v2 v2.0.0 h1:lltnkeZGL0wILNvrNiVCR6Ro5PGU/SeBvVO/8c/iPbo= +github.com/jcmturner/dnsutils/v2 v2.0.0/go.mod h1:b0TnjGOvI/n42bZa+hmXL+kFJZsFT7G4t3HTlQ184QM= +github.com/jcmturner/gofork v1.7.6 h1:QH0l3hzAU1tfT3rZCnW5zXl+orbkNMMRGJfdJjHVETg= +github.com/jcmturner/gofork v1.7.6/go.mod h1:1622LH6i/EZqLloHfE7IeZ0uEJwMSUyQ/nDd82IeqRo= +github.com/jcmturner/goidentity/v6 v6.0.1 h1:VKnZd2oEIMorCTsFBnJWbExfNN7yZr3EhJAxwOkZg6o= +github.com/jcmturner/goidentity/v6 v6.0.1/go.mod h1:X1YW3bgtvwAXju7V3LCIMpY0Gbxyjn/mY9zx4tFonSg= +github.com/jcmturner/gokrb5/v8 v8.4.3 h1:iTonLeSJOn7MVUtyMT+arAn5AKAPrkilzhGw8wE/Tq8= +github.com/jcmturner/gokrb5/v8 v8.4.3/go.mod h1:dqRwJGXznQrzw6cWmyo6kH+E7jksEQG/CyVWsJEsJO0= +github.com/jcmturner/rpc/v2 v2.0.3 h1:7FXXj8Ti1IaVFpSAziCZWNzbNuZmnvw/i6CqLNdWfZY= +github.com/jcmturner/rpc/v2 v2.0.3/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJki3ELg/Hc= github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E= github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc= github.com/jinzhu/now v1.1.1/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8= @@ -218,6 +251,7 @@ github.com/kakuilan/kgo v0.1.8 h1:b9UfGYNbUpWjPheOEgu/MsWUVDNWbcSit6BbNsBAPl0= github.com/kakuilan/kgo v0.1.8/go.mod h1:S9driqss6OluzqiOfUx7xN8nw0H6bFu5v7c19P09RRc= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/klauspost/compress v1.16.0 h1:iULayQNOReoYUe+1qtKOqw9CwJv3aNQu8ivo7lw1HU4= +github.com/klauspost/compress v1.16.0/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/konsorten/go-windows-terminal-sequences v1.0.2/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= @@ -278,6 +312,8 @@ github.com/onsi/ginkgo v1.16.4 h1:29JGrr5oVBm5ulCWet69zQkzWipVXIol6ygQUe/EzNc= github.com/onsi/gomega v1.10.1 h1:o0+MgICZLuZ7xjH7Vx6zS/zcu93/BEp1VwkIW1mEXCE= github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc= github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ= +github.com/pierrec/lz4/v4 v4.1.17 h1:kV4Ip+/hUBC+8T6+2EgburRtkE9ef4nbY3f4dFhGjMc= +github.com/pierrec/lz4/v4 v4.1.17/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= @@ -289,6 +325,8 @@ github.com/qiniu/dyn v1.3.0/go.mod h1:E8oERcm8TtwJiZvkQPbcAh0RL8jO1G0VXJMW3FAWdk github.com/qiniu/go-sdk/v7 v7.11.0 h1:Cdx/1E3ybv0OFKnkGwoDN/t6bCCntjrWhwWuRaqI3XQ= github.com/qiniu/go-sdk/v7 v7.11.0/go.mod h1:btsaOc8CA3hdVloULfFdDgDc+g4f3TDZEFsDY0BLE+w= github.com/qiniu/x v1.10.5/go.mod h1:03Ni9tj+N2h2aKnAz+6N0Xfl8FwMEDRC2PAlxekASDs= +github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 h1:N/ElC8H3+5XpJzTSTfLsJV/mx9Q9g7kxmchpfZyxgzM= +github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/richardlehane/mscfb v1.0.3 h1:rD8TBkYWkObWO0oLDFCbwMeZ4KoalxQy+QgniCj3nKI= github.com/richardlehane/mscfb v1.0.3/go.mod h1:YzVpcZg9czvAuhk9T+a3avCpcFPMUWm7gK3DypaEsUk= github.com/richardlehane/msoleps v1.0.1 h1:RfrALnSNXzmXLbGct/P2b4xkFz4e8Gmj/0Vj9M9xC1o= @@ -320,13 +358,16 @@ github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.2.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.7.1 h1:5TQK59W5E3v0r2duFAb7P95B6hEeOyEnHRa8MjYSMTY= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= github.com/tencentcloud/tencentcloud-sdk-go/tencentcloud/common v1.0.194/go.mod h1:7sCQWVkxcsR38nffDW057DRGk8mUjK1Ing/EFOK8s8Y= github.com/tencentcloud/tencentcloud-sdk-go/tencentcloud/kms v1.0.194/go.mod h1:yrBKWhChnDqNz1xuXdSbWXG56XawEq0G5j1lg4VwBD4= github.com/tencentyun/cos-go-sdk-v5 v0.7.33 h1:5jmJU7U/1nf/7ZPDkrUL8KlF1oDUzTHsdtLNY6x0hq4= @@ -361,6 +402,7 @@ golang.org/x/crypto v0.0.0-20210220033148-5ea612d1eb83/go.mod h1:jdWPYTVW3xRLrWP golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.0.0-20210711020723-a769d52b0f97/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= +golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/crypto v0.6.0 h1:qfktjS5LUO+fFKeJXZ+ikTRijMmljikvG68fpMMruSc= golang.org/x/crypto v0.6.0/go.mod h1:OFC/31mSvZgRz0V1QTNCzfAI1aIRzbiufJtkMIlEp58= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= @@ -382,13 +424,16 @@ golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20190813141303-74dc4d7220e7/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200425230154-ff2c4b7c35a0/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20210421230115-4e50805a0758/go.mod h1:72T/g9IO56b78aLF+1Kcs5dz7/ng1VjMUvfKvpfy+jM= golang.org/x/net v0.0.0-20210726213435-c6fcb2dbf985/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= +golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= +golang.org/x/net v0.0.0-20220725212005-46097bf591d3/go.mod h1:AaygXjzTFtRAg2ttMY5RMuhpJ3cNnI0XpyFJD1iQRSM= golang.org/x/net v0.6.0 h1:L4ZwwTvKW9gr0ZMS1yrHD9GZhIuVjOBBnaKH+SPQK0Q= golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= @@ -399,8 +444,9 @@ golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4 h1:uVc8UZUe6tr40fFVnUP5Oj+veunVezqYl9z7DYw9xzw= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o= +golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -510,8 +556,8 @@ gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= -gopkg.in/yaml.v3 v3.0.0 h1:hjy8E9ON/egN1tAYqKb61G10WtihqetD4sz2H+8nIeA= -gopkg.in/yaml.v3 v3.0.0/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gorm.io/driver/mysql v1.0.3/go.mod h1:twGxftLBlFgNVNakL7F+P/x9oYqoymG3YYT8cAfI9oI= gorm.io/driver/mysql v1.1.2/go.mod h1:4P/X9vSc3WTrhTLZ259cpFd6xKNYiSSdSZngkSBGIMM= gorm.io/driver/mysql v1.2.0 h1:l8+9VwjjyzEkw0PNPBOr2JHhLOGVk7XEnl5hk42bcvs= diff --git a/pkg/rule_engine/instance.go b/pkg/rule_engine/instance.go index 221f950..34197df 100644 --- a/pkg/rule_engine/instance.go +++ b/pkg/rule_engine/instance.go @@ -2,15 +2,12 @@ package rule_engine import ( "context" - "fmt" "github.com/sirupsen/logrus" "pandax/pkg/rule_engine/manifest" "pandax/pkg/rule_engine/message" "pandax/pkg/rule_engine/nodes" - "strings" ) -// ruleChainInstance is rulechain's runtime instance that manage all nodes in this chain, type ruleChainInstance struct { firstRuleNodeId string nodes map[string]nodes.Node @@ -40,37 +37,13 @@ func newInstanceWithManifest(m *manifest.Manifest) (*ruleChainInstance, []error) firstRuleNodeId: m.FirstRuleNodeId, nodes: nodes, } - - for _, edge := range m.Edges { - originalNode, found := r.nodes[edge.SourceNodeId] - if !found { - err := fmt.Errorf("original node '%s' no exist in", originalNode.Name()) - errs = append(errs, err) - continue - } - targetNode, found := r.nodes[edge.TargetNodeId] - if !found { - err := fmt.Errorf("target node '%s' no exist in rulechain", targetNode.Name()) - errs = append(errs, err) - continue - } - types := make([]string, 0) - if _, ok := edge.Properties["lineType"]; !ok { - types = append(types, "True") - } else { - types = strings.Split(edge.Properties["lineType"].(string), "/") - } - for _, ty := range types { - originalNode.AddLinkedNode(ty, targetNode) - } - } return r, errs } -// StartRuleChain +// StartRuleChain TODO 是否需要添加context func (c *ruleChainInstance) StartRuleChain(context context.Context, message message.Message) error { if node, found := c.nodes[c.firstRuleNodeId]; found { - go node.Handle(message) + node.Handle(message) } return nil } diff --git a/pkg/rule_engine/instance_test.go b/pkg/rule_engine/instance_test.go index c835239..a35f13d 100644 --- a/pkg/rule_engine/instance_test.go +++ b/pkg/rule_engine/instance_test.go @@ -1,6 +1,7 @@ package rule_engine import ( + "context" "io/ioutil" "pandax/pkg/rule_engine/message" "pandax/pkg/rule_engine/nodes" @@ -12,18 +13,21 @@ func TestNewRuleChainInstance(t *testing.T) { if err != nil { t.Error(err) } - _, errs := NewRuleChainInstance(buf) + instance, errs := NewRuleChainInstance(buf) if len(errs) > 0 { t.Error(errs[0]) } + + metadata := message.NewDefaultMetadata(map[string]interface{}{"deviceName": "aa", "namespace": "default", "test": "aa"}) + msg := message.NewMessageWithDetail("1", nodes.DEVICE, message.EventTelemetryType, map[string]interface{}{"temperature": 60}, metadata) + t.Log("开始执行力流程") + instance.StartRuleChain(context.Background(), msg) } func TestScriptEngine(t *testing.T) { metadata := message.NewDefaultMetadata(map[string]interface{}{"device": "aa"}) - msg := message.NewMessageWithDetail("1", message.MessageTypeConnectEvent, map[string]interface{}{"aa": 5}, metadata) - scriptEngine := nodes.NewScriptEngine() - const script = ` - function Switch(msg, metadata, msgType) { + msg := message.NewMessageWithDetail("1", "device", message.EventUpEventType, map[string]interface{}{"aa": 5}, metadata) + const baseScript = ` function nextRelation(metadata, msg) { return ['one','nine']; } @@ -34,9 +38,9 @@ func TestScriptEngine(t *testing.T) { return ['two']; } return nextRelation(metadata, msg); - } ` - SwitchResults, err := scriptEngine.ScriptOnSwitch(msg, script) + scriptEngine := nodes.NewScriptEngine(msg, "Switch", baseScript) + SwitchResults, err := scriptEngine.ScriptOnSwitch() if err != nil { t.Error(err) @@ -46,16 +50,15 @@ func TestScriptEngine(t *testing.T) { func TestScriptOnMessage(t *testing.T) { metadata := message.NewDefaultMetadata(map[string]interface{}{"device": "aa"}) - msg := message.NewMessageWithDetail("1", message.MessageTypeConnectEvent, map[string]interface{}{"aa": 5}, metadata) - scriptEngine := nodes.NewScriptEngine() - const script = ` - function Transform(msg, metadata, msgType) { + msg := message.NewMessageWithDetail("1", "device", message.EventUpEventType, map[string]interface{}{"aa": 5}, metadata) + + const baseScript = ` msg.bb = "33" metadata.event = 55 return {msg: msg, metadata: metadata, msgType: msgType}; - } ` - ScriptOnMessageResults, err := scriptEngine.ScriptOnMessage(msg, script) + scriptEngine := nodes.NewScriptEngine(msg, "Transform", baseScript) + ScriptOnMessageResults, err := scriptEngine.ScriptOnMessage() if err != nil { t.Error(err) diff --git a/pkg/rule_engine/manifest/manifest_sample.json b/pkg/rule_engine/manifest/manifest_sample.json index 2cd9164..8d0eb42 100644 --- a/pkg/rule_engine/manifest/manifest_sample.json +++ b/pkg/rule_engine/manifest/manifest_sample.json @@ -1,129 +1,121 @@ { "nodes": [ { - "id": "d2cc123c-d0d7-4830-8547-21d5e52b4011", + "id": "input", "type": "InputNode", - "x": 120, - "y": 340, - "properties": { - "debugMode": false, - "status": false - }, - "zIndex": 1013, + "x": 300, + "y": 200, + "properties": {}, "text": { - "x": 130, - "y": 340, - "value": "输入" + "x": 310, + "y": 200, + "value": "input" } }, { - "id": "6c497a23-ece2-41fa-927d-9e2e9a1c7316", - "type": "MyNode", - "x": 400, - "y": 340, + "id": "cf717a24-2917-444f-9927-74a0d486ad7a", + "type": "LogNode", + "x": 820, + "y": 200, "properties": { - "debugMode": false, - "status": false + "name": "log", + "script": "function ToString(msg, metadata, msgType) {\n return '\\nIncoming message:\\n' + JSON.stringify(msg) + \n '\\nIncoming metadata:\\n' + JSON.stringify(metadata);\n\n }\n " }, - "zIndex": 1002, "text": { - "x": 410, - "y": 340, - "value": "测试节点" + "x": 830, + "y": 200, + "value": "log" } }, { - "id": "7235c088-73e0-4683-982a-77b0fd31313d", - "type": "MyNode", - "x": 660, - "y": 340, + "id": "c56b90cc-64fc-4782-8d9e-b0b24dabc876", + "type": "MessageGeneratorNode", + "x": 520, + "y": 40, "properties": { - "debugMode": false, - "status": false + "name": "ada", + "messageCount": 2, + "periodSecond": 5, + "script": "function Generate(msg, metadata, msgType) {\n var msg = { temp: 42, humidity: 77 };\nvar metadata = { data: 40 };\nvar msgType = \"telemetry\";\n\nreturn { msg: msg, metadata: metadata, msgType: msgType };\n }\n " }, - "zIndex": 1005, "text": { - "x": 670, - "y": 340, - "value": "测试节点" + "x": 530, + "y": 40, + "value": "generator" } } ], "edges": [ { - "id": "b6c8b0c4-1481-40d3-b345-780f96efa909", - "type": "bezier-link", - "sourceNodeId": "d2cc123c-d0d7-4830-8547-21d5e52b4011", - "targetNodeId": "6c497a23-ece2-41fa-927d-9e2e9a1c7316", + "id": "ca4c7e4b-2ffa-4f72-83d0-456d9db72add", + "type": "node-link", + "sourceNodeId": "input", + "targetNodeId": "c56b90cc-64fc-4782-8d9e-b0b24dabc876", "startPoint": { - "x": 180, - "y": 340 + "x": 350, + "y": 200 }, "endPoint": { - "x": 340, - "y": 340 + "x": 460, + "y": 40 }, - "properties": { - "lineType": "True" - }, - "zIndex": 1003, + "properties": {}, "pointsList": [ { - "x": 180, - "y": 340 + "x": 350, + "y": 200 }, { - "x": 280, - "y": 340 + "x": 450, + "y": 200 }, { - "x": 240, - "y": 340 + "x": 360, + "y": 40 }, { - "x": 340, - "y": 340 + "x": 460, + "y": 40 } ] }, { - "id": "1e38236e-716b-44e7-8d96-6424f63fdacc", - "type": "bezier-link", - "sourceNodeId": "6c497a23-ece2-41fa-927d-9e2e9a1c7316", - "targetNodeId": "7235c088-73e0-4683-982a-77b0fd31313d", + "id": "feeb016c-c52c-48ed-8d2c-fde36439da7d", + "type": "node-link", + "sourceNodeId": "c56b90cc-64fc-4782-8d9e-b0b24dabc876", + "targetNodeId": "cf717a24-2917-444f-9927-74a0d486ad7a", "startPoint": { - "x": 460, - "y": 340 + "x": 580, + "y": 40 }, "endPoint": { - "x": 600, - "y": 340 + "x": 770, + "y": 200 }, "properties": { - "lineType": "Next" + "type": "Success" }, "text": { - "x": 530, - "y": 340, - "value": "Next" + "x": 675, + "y": 120, + "value": "Success" }, - "zIndex": 1006, "pointsList": [ { - "x": 460, - "y": 340 + "x": 580, + "y": 40 }, { - "x": 560, - "y": 340 + "x": 680, + "y": 40 }, { - "x": 500, - "y": 340 + "x": 670, + "y": 200 }, { - "x": 600, - "y": 340 + "x": 770, + "y": 200 } ] } diff --git a/pkg/rule_engine/message/message.go b/pkg/rule_engine/message/message.go index cab114e..8696258 100644 --- a/pkg/rule_engine/message/message.go +++ b/pkg/rule_engine/message/message.go @@ -3,20 +3,39 @@ package message import ( "encoding/json" "github.com/google/uuid" - "github.com/sirupsen/logrus" "time" ) +//消息类型 +const ( + EventConnectType = "connect" + EventDisConnectType = "disconnect" + EventUpEventType = "event" + EventAlarmType = "alarm" + EventTelemetryType = "telemetry" + EventAttributesType = "attributes" +) + +// 数据类型Originator +const ( + DEVICE = "DEVICE" + GATEWAY = "GATEWAY" +) + // Message ... type Message interface { + GetId() string + GetTs() time.Time GetOriginator() string + GetUserId() string GetType() string GetMsg() map[string]interface{} - GetMsgLogs() []MesLog GetMetadata() Metadata + GetAllMap() map[string]interface{} //msg 和 Metadata的合并 SetType(string) SetMsg(map[string]interface{}) SetOriginator(string) + SetUserId(string) SetMetadata(Metadata) MarshalBinary() ([]byte, error) } @@ -29,58 +48,69 @@ type Metadata interface { GetValues() map[string]interface{} } -// Predefined message types -const ( - EventConnectType = "connect" - EventDisConnectType = "disconnect" - EventUpEventType = "event" - EventAlarmType = "alarm" - EventTelemetryType = "telemetry" - EventAttributesType = "attributes" -) - // NewMessage ... func NewMessage() Message { return &defaultMessage{ - id: uuid.New().String(), - ts: time.Now(), - msg: map[string]interface{}{}, - mesLog: make([]MesLog, 0), + Id: uuid.New().String(), + Ts: time.Now(), + Msg: map[string]interface{}{}, } } type defaultMessage struct { - id string //uuid - ts time.Time //时间戳 - msgType string //消息类型, attributes(参数),telemetry(遥测),Connect连接事件 - originator string //数据发布者 设备 规则链 - userId string //客户Id UUID - deviceId string //设备Id UUID - msg map[string]interface{} //数据 数据结构JSON 设备原始数据 msg - metadata Metadata //消息的元数据 包括,设备名称,设备类型,命名空间,时间戳等 - mesLog []MesLog + Id string //uuid 消息Id + Ts time.Time //时间戳 + MsgType string //消息类型, attributes(参数),telemetry(遥测),Connect连接事件 + Originator string //数据发布者 设备 规则链 + UserId string //客户Id UUID 设备发布人 + Msg map[string]interface{} //数据 数据结构JSON 设备原始数据 msg + Metadata Metadata //消息的元数据 包括,设备名称,设备类型,命名空间,时间戳等 } // NewMessageWithDetail ... -func NewMessageWithDetail(originator string, messageType string, msg map[string]interface{}, metadata Metadata) Message { +func NewMessageWithDetail(userId, originator string, messageType string, msg map[string]interface{}, metadata Metadata) Message { return &defaultMessage{ - originator: originator, - msgType: messageType, - msg: msg, - metadata: metadata, - mesLog: make([]MesLog, 0), + Id: uuid.New().String(), + Ts: time.Now(), + UserId: userId, + Originator: originator, + MsgType: messageType, + Msg: msg, + Metadata: metadata, } } -func (t *defaultMessage) GetOriginator() string { return t.originator } -func (t *defaultMessage) GetType() string { return t.msgType } -func (t *defaultMessage) GetMsg() map[string]interface{} { return t.msg } -func (t *defaultMessage) GetMsgLogs() []MesLog { return t.mesLog } -func (t *defaultMessage) GetMetadata() Metadata { return t.metadata } -func (t *defaultMessage) SetType(msgType string) { t.msgType = msgType } -func (t *defaultMessage) SetMsg(msg map[string]interface{}) { t.msg = msg } -func (t *defaultMessage) SetOriginator(originator string) { t.originator = originator } -func (t *defaultMessage) SetMetadata(metadata Metadata) { t.metadata = metadata } +func (t *defaultMessage) GetId() string { return t.Id } +func (t *defaultMessage) GetTs() time.Time { return t.Ts } +func (t *defaultMessage) GetOriginator() string { return t.Originator } +func (t *defaultMessage) GetUserId() string { return t.UserId } +func (t *defaultMessage) GetType() string { return t.MsgType } +func (t *defaultMessage) GetMsg() map[string]interface{} { return t.Msg } +func (t *defaultMessage) GetMetadata() Metadata { return t.Metadata } +func (t *defaultMessage) GetAllMap() map[string]interface{} { + data := make(map[string]interface{}) + for msgKey, msgValue := range t.GetMsg() { + for metaKey, metaValue := range t.GetMetadata().GetValues() { + if msgKey == metaKey { + data[msgKey] = metaValue + } else { + if _, ok := data[msgKey]; !ok { + data[msgKey] = msgValue + } + if _, ok := data[metaKey]; !ok { + data[metaKey] = metaValue + } + } + } + } + return data +} +func (t *defaultMessage) SetType(msgType string) { t.MsgType = msgType } +func (t *defaultMessage) SetMsg(msg map[string]interface{}) { t.Msg = msg } +func (t *defaultMessage) SetOriginator(originator string) { t.Originator = originator } +func (t *defaultMessage) SetUserId(userId string) { t.UserId = userId } +func (t *defaultMessage) SetMetadata(metadata Metadata) { t.Metadata = metadata } + func (t *defaultMessage) MarshalBinary() ([]byte, error) { return json.Marshal(t) } @@ -112,7 +142,7 @@ func (t *defaultMetadata) Keys() []string { func (t *defaultMetadata) GetKeyValue(key string) interface{} { if _, found := t.values[key]; !found { - logrus.Fatalf("no key '%s' in metadata", key) + return nil } return t.values[key] } @@ -127,11 +157,3 @@ func (t *defaultMetadata) GetValues() map[string]interface{} { func (t *defaultMetadata) SetValues(values map[string]interface{}) { t.values = values } - -type MesLog struct { - NodeName string `json:"nodeName"` - startTime time.Time `json:"startTime"` - endTime time.Time `json:"endTime"` - result string `json:"result"` - Remark string `json:"remark"` -} diff --git a/pkg/rule_engine/nodes/action_clear_alarm_node.go b/pkg/rule_engine/nodes/action_clear_alarm_node.go index 06803c9..d14df65 100644 --- a/pkg/rule_engine/nodes/action_clear_alarm_node.go +++ b/pkg/rule_engine/nodes/action_clear_alarm_node.go @@ -1,10 +1,8 @@ package nodes import ( - "pandax/pkg/rule_engine/message" - "time" - "github.com/sirupsen/logrus" + "pandax/pkg/rule_engine/message" ) const ClearAlarmNodeName = "ClearAlarmNode" @@ -13,17 +11,13 @@ type clearAlarmNodeFactory struct{} type clearAlarmNode struct { bareNode - DetailBuilderScript string `json:"detailBuilderScript" yaml:"detailBuilderScript"` - AlarmType string `json:"alarmType" yaml:"alarmType"` - AlarmSeverity string `json:"alarmSeverity" yaml:"alarmSeverity"` - Propagate string `json:"propagate" yaml:"propagate"` - AlarmStartTime *time.Time `json:"alarmStartTime" yaml:"alarmStartTime"` - AlarmEndTime *time.Time `json:"alarmEndTime" yaml:"alarmEndTime"` + Script string `json:"script" yaml:"script"` + AlarmType string `json:"alarmType" yaml:"alarmType"` } func (f clearAlarmNodeFactory) Name() string { return ClearAlarmNodeName } func (f clearAlarmNodeFactory) Category() string { return NODE_CATEGORY_ACTION } -func (f clearAlarmNodeFactory) Labels() []string { return []string{"Created", "Updated", "Failure"} } +func (f clearAlarmNodeFactory) Labels() []string { return []string{"Cleared", "Failure"} } func (f clearAlarmNodeFactory) Create(id string, meta Metadata) (Node, error) { node := &clearAlarmNode{ bareNode: newBareNode(f.Name(), id, meta, f.Labels()), @@ -33,5 +27,16 @@ func (f clearAlarmNodeFactory) Create(id string, meta Metadata) (Node, error) { func (n *clearAlarmNode) Handle(msg message.Message) error { logrus.Infof("%s handle message '%s'", n.Name(), msg.GetType()) + cleared := n.GetLinkedNode("Cleared") + failure := n.GetLinkedNode("Failure") + + scriptEngine := NewScriptEngine(msg, "Details", n.Script) + details, err := scriptEngine.ScriptAlarmDetails() + if err != nil { + return failure.Handle(msg) + } + // TODO 编写创建告警信息 + logrus.Info(details) + cleared.Handle(msg) return nil } diff --git a/pkg/rule_engine/nodes/action_create_alarm_node.go b/pkg/rule_engine/nodes/action_create_alarm_node.go index 9024e45..1453aa3 100644 --- a/pkg/rule_engine/nodes/action_create_alarm_node.go +++ b/pkg/rule_engine/nodes/action_create_alarm_node.go @@ -1,20 +1,18 @@ package nodes import ( - "fmt" - "pandax/pkg/rule_engine/message" - "github.com/sirupsen/logrus" + "pandax/pkg/rule_engine/message" ) type createAlarmNode struct { bareNode - DetailBuilderScript string `json:"detailBuilderScript" yaml:"detailBuilderScript"` - AlarmType string `json:"alarmType" yaml:"alarmType"` - AlarmSeverity string `json:"alarmSeverity" yaml:"alarmSeverity"` - Propagate string `json:"propagate" yaml:"propagate"` - AlarmStartTime string `json:"alarmStartTime" yaml:"alarmStartTime"` - AlarmEndTime string `json:"alarmEndTime" yaml:"alarmEndTime"` + Script string `json:"script" yaml:"script"` + AlarmType string `json:"alarmType" yaml:"alarmType"` + AlarmSeverity int64 `json:"alarmSeverity" yaml:"alarmSeverity"` + Propagate string `json:"propagate" yaml:"propagate"` + /* AlarmStartTime string `json:"alarmStartTime" yaml:"alarmStartTime"` + AlarmEndTime string `json:"alarmEndTime" yaml:"alarmEndTime"`*/ } type createAlarmNodeFactory struct{} @@ -33,11 +31,19 @@ func (n *createAlarmNode) Handle(msg message.Message) error { logrus.Infof("%s handle message '%s'", n.Name(), msg.GetType()) node1 := n.GetLinkedNode("Created") - node2 := n.GetLinkedNode("Updated") + //node2 := n.GetLinkedNode("Updated") node3 := n.GetLinkedNode("Failure") - if node1 == nil || node2 == nil || node3 == nil { - return fmt.Errorf("no valid label linked node in %s", n.Name()) + + scriptEngine := NewScriptEngine(msg, "Details", n.Script) + details, err := scriptEngine.ScriptAlarmDetails() + if err != nil { + if node3 != nil { + return node3.Handle(msg) + } } + // TODO 创建告警 + logrus.Info(details) + node1.Handle(msg) return nil } diff --git a/pkg/rule_engine/nodes/action_delay_node.go b/pkg/rule_engine/nodes/action_delay_node.go index afde132..2105669 100644 --- a/pkg/rule_engine/nodes/action_delay_node.go +++ b/pkg/rule_engine/nodes/action_delay_node.go @@ -13,8 +13,8 @@ const DelayNodeName = "DelayNode" type delayNode struct { bareNode - PeriodTs int `json:"periodTs" yaml:"periodTs" jpath:"periodTs"` - MaxPendingMessages int `json:"maxPendingMessages" yaml:"maxPendingMessages" jpath:"maxPendingMessages"` + PeriodTs int `json:"periodTs" yaml:"periodTs" jpath:"periodTs"` //周期时间 + MaxPendingMessages int `json:"maxPendingMessages" yaml:"maxPendingMessages" jpath:"maxPendingMessages"` //最大等待消息数 messageQueue []message.Message `jpath:"-"` delayTimer *time.Timer `jpath:"-"` lock sync.Mutex `jpath:"-"` @@ -48,7 +48,7 @@ func (n *delayNode) Handle(msg message.Message) error { if n.delayTimer == nil { n.messageQueue = append(n.messageQueue, msg) n.delayTimer = time.NewTimer(time.Duration(n.PeriodTs) * time.Second) - // start timecallback goroutine + go func(n *delayNode) error { defer n.delayTimer.Stop() for { @@ -64,7 +64,6 @@ func (n *delayNode) Handle(msg message.Message) error { }(n) return nil } - // the delay timer had already been created, just queue message n.lock.Lock() defer n.lock.Unlock() if len(n.messageQueue) == n.MaxPendingMessages { diff --git a/pkg/rule_engine/nodes/action_generator_node.go b/pkg/rule_engine/nodes/action_generator_node.go index bcb1971..4cffe8c 100644 --- a/pkg/rule_engine/nodes/action_generator_node.go +++ b/pkg/rule_engine/nodes/action_generator_node.go @@ -1,22 +1,23 @@ package nodes import ( - "fmt" "github.com/sirupsen/logrus" "pandax/pkg/rule_engine/message" + "time" ) type messageGeneratorNode struct { bareNode - DetailBuilderScript string `json:"detail_builder_script" yaml:"detail_builder_script"` - FrequenceInSecond int32 `json:"frequency" yaml:"frequency"` + Script string `json:"script" yaml:"script"` + PeriodSecond int64 `json:"periodSecond" yaml:"periodSecond"` //周期 + MessageCount int64 `json:"messageCount" yaml:"messageCount"` } type messageGeneratorNodeFactory struct{} -func (f messageGeneratorNodeFactory) Name() string { return "GeneratorNode" } +func (f messageGeneratorNodeFactory) Name() string { return "MessageGeneratorNode" } func (f messageGeneratorNodeFactory) Category() string { return NODE_CATEGORY_ACTION } -func (f messageGeneratorNodeFactory) Labels() []string { return []string{"Created", "Updated"} } +func (f messageGeneratorNodeFactory) Labels() []string { return []string{"Success", "Failure"} } func (f messageGeneratorNodeFactory) Create(id string, meta Metadata) (Node, error) { node := &messageGeneratorNode{ bareNode: newBareNode(f.Name(), id, meta, f.Labels()), @@ -27,11 +28,36 @@ func (f messageGeneratorNodeFactory) Create(id string, meta Metadata) (Node, err func (n *messageGeneratorNode) Handle(msg message.Message) error { logrus.Infof("%s handle message '%s'", n.Name(), msg.GetType()) - createdLabelNode := n.GetLinkedNode("Created") - updatedLabelNode := n.GetLinkedNode("Updated") - if createdLabelNode == nil || updatedLabelNode == nil { - return fmt.Errorf("no valid label linked node in %s", n.Name()) - } + successLabelNode := n.GetLinkedNode("Success") + failureLabelNode := n.GetLinkedNode("Failure") + + ticker := time.NewTicker(time.Duration(n.PeriodSecond) * time.Second) + count := 0 + + go func() { + for { + <-ticker.C + count++ + if int64(count) == n.MessageCount { + ticker.Stop() + return + } + scriptEngine := NewScriptEngine(msg, "Generate", n.Script) + generate, err := scriptEngine.ScriptGenerate() + if err != nil { + if failureLabelNode != nil { + go failureLabelNode.Handle(msg) + } + return + } + msg.SetMsg(generate["msg"].(map[string]interface{})) + msg.SetType(generate["msgType"].(string)) + msg.SetMetadata(message.NewDefaultMetadata(generate["metadata"].(map[string]interface{}))) + if successLabelNode != nil { + go successLabelNode.Handle(msg) + } + } + }() return nil } diff --git a/pkg/rule_engine/nodes/action_log_node.go b/pkg/rule_engine/nodes/action_log_node.go index 48b35d0..34350a3 100644 --- a/pkg/rule_engine/nodes/action_log_node.go +++ b/pkg/rule_engine/nodes/action_log_node.go @@ -1,8 +1,8 @@ package nodes import ( - "fmt" - "log" + "pandax/apps/visual/entity" + "pandax/apps/visual/services" "pandax/pkg/rule_engine/message" ) @@ -27,15 +27,31 @@ func (n *logNode) Handle(msg message.Message) error { successLableNode := n.GetLinkedNode("Success") failureLableNode := n.GetLinkedNode("Failure") - scriptEngine := NewScriptEngine() - logMessage, err := scriptEngine.ScriptToString(msg, n.Script) - - if successLableNode == nil || failureLableNode == nil { - return fmt.Errorf("no valid label linked node in %s", n.Name()) - } + scriptEngine := NewScriptEngine(msg, "ToString", n.Script) + logMessage, err := scriptEngine.ScriptToString() if err != nil { - return failureLableNode.Handle(msg) + if failureLableNode != nil { + return failureLableNode.Handle(msg) + } else { + return err + } } - log.Println(logMessage) - return successLableNode.Handle(msg) + services.VisualRuleChainMsgLogModelDao.Insert(entity.VisualRuleChainMsgLog{ + MessageId: msg.GetId(), + MsgType: msg.GetType(), + DeviceName: msg.GetMetadata().GetValues()["deviceName"].(string), + Ts: msg.GetTs(), + Content: logMessage, + }) + if err != nil { + if failureLableNode != nil { + return failureLableNode.Handle(msg) + } else { + return err + } + } + if successLableNode != nil { + return successLableNode.Handle(msg) + } + return nil } diff --git a/pkg/rule_engine/nodes/action_my_node.go b/pkg/rule_engine/nodes/action_my_node.go deleted file mode 100644 index a62dfcb..0000000 --- a/pkg/rule_engine/nodes/action_my_node.go +++ /dev/null @@ -1,29 +0,0 @@ -package nodes - -import ( - "log" - "pandax/pkg/rule_engine/message" -) - -type MyNode struct { - bareNode -} - -type myNodeFactory struct{} - -func (f myNodeFactory) Name() string { return "MyNode" } -func (f myNodeFactory) Category() string { return NODE_CATEGORY_ACTION } -func (f myNodeFactory) Labels() []string { return []string{"Next", "Next2"} } -func (f myNodeFactory) Create(id string, meta Metadata) (Node, error) { - node := &MyNode{ - bareNode: newBareNode(f.Name(), id, meta, f.Labels()), - } - return decodePath(meta, node) -} - -func (n *MyNode) Handle(msg message.Message) error { - nextLableNode := n.GetLinkedNode("Next") - - log.Println(nextLableNode.Name()) - return nil -} diff --git a/pkg/rule_engine/nodes/action_save_attributes_node.go b/pkg/rule_engine/nodes/action_save_attributes_node.go index 6509a2b..94d82f7 100644 --- a/pkg/rule_engine/nodes/action_save_attributes_node.go +++ b/pkg/rule_engine/nodes/action_save_attributes_node.go @@ -1,7 +1,6 @@ package nodes import ( - "fmt" "pandax/pkg/rule_engine/message" ) @@ -22,13 +21,33 @@ func (f saveAttributesNodeFactory) Create(id string, meta Metadata) (Node, error } func (n *SaveAttributesNode) Handle(msg message.Message) error { - successLableNode := n.GetLinkedNode("Success") - failureLableNode := n.GetLinkedNode("Failure") - if successLableNode == nil || failureLableNode == nil { - return fmt.Errorf("no valid label linked node in %s", n.Name()) + successLabelNode := n.GetLinkedNode("Success") + failureLabelNode := n.GetLinkedNode("Failure") + if msg.GetType() != message.EventAttributesType { + if failureLabelNode != nil { + return failureLabelNode.Handle(msg) + } else { + return nil + } } - if msg.GetType() != "POST_ATTRIBUTES_REQUEST" { - return failureLableNode.Handle(msg) + //deviceName := msg.GetMetadata().GetValues()["deviceName"].(string) + //namespace := msg.GetMetadata().GetValues()["namespace"].(string) + //marshal, err := json.Marshal(msg.GetMsg()) + + //if err != nil { + // if failureLabelNode != nil { + // return failureLabelNode.Handle(msg) + // } else { + // return nil + // } + //} + + // todo 添加设备上报参数 + + if successLabelNode != nil { + return successLabelNode.Handle(msg) + } else { + return nil } return nil diff --git a/pkg/rule_engine/nodes/action_save_timeseries_node.go b/pkg/rule_engine/nodes/action_save_timeseries_node.go index e62a171..1838bd8 100644 --- a/pkg/rule_engine/nodes/action_save_timeseries_node.go +++ b/pkg/rule_engine/nodes/action_save_timeseries_node.go @@ -19,7 +19,24 @@ func (f saveTimeSeriesNodeFactory) Create(id string, meta Metadata) (Node, error func (n *SaveTimeSeriesNode) Handle(msg message.Message) error { successLableNode := n.GetLinkedNode("Success") - //failureLableNode := n.GetLinkedNode("Failure") + failureLableNode := n.GetLinkedNode("Failure") + if msg.GetType() != message.EventTelemetryType { + if failureLableNode != nil { + return failureLableNode.Handle(msg) + } else { + return nil + } + } + /*deviceName := msg.GetMetadata().GetValues()["deviceName"].(string) + namespace := msg.GetMetadata().GetValues()["namespace"].(string) + marshal, err := json.Marshal(msg.GetMsg())*/ + + // todo 添加设备上报遥测 + + if successLableNode != nil { + return successLableNode.Handle(msg) + } else { + return nil + } - return successLableNode.Handle(msg) } diff --git a/pkg/rule_engine/nodes/external_ding_node.go b/pkg/rule_engine/nodes/external_ding_node.go index 57bc354..5f2fe38 100644 --- a/pkg/rule_engine/nodes/external_ding_node.go +++ b/pkg/rule_engine/nodes/external_ding_node.go @@ -1,17 +1,26 @@ package nodes import ( + "crypto/hmac" + "crypto/sha256" + "encoding/base64" + "encoding/json" + "fmt" + "github.com/XM-GO/PandaKit/httpclient" "github.com/sirupsen/logrus" + "net/url" "pandax/pkg/rule_engine/message" + "time" ) type externalDingNode struct { bareNode WebHook string `json:"webHook" yaml:"webHook"` + Secret string `json:"secret"` MsgType string `json:"msgType" yaml:"msgType"` Content string `json:"content" yaml:"content"` IsAtAll bool `json:"isAtAll" yaml:"isAtAll"` - atMobiles []string `json:"atMobiles" yaml:"atMobiles"` + AtMobiles []string `json:"atMobiles" yaml:"atMobiles"` } type externalDingNodeFactory struct{} @@ -29,5 +38,47 @@ func (f externalDingNodeFactory) Create(id string, meta Metadata) (Node, error) func (n *externalDingNode) Handle(msg message.Message) error { logrus.Infof("%s handle message '%s'", n.Name(), msg.GetType()) + successLabelNode := n.GetLinkedNode("Success") + failureLabelNode := n.GetLinkedNode("Failure") + //获取消息 + template, err := ParseTemplate(n.Content, msg.GetAllMap()) + if err != nil { + return err + } + sendData := map[string]interface{}{ + "msgtype": "text", + "text": map[string]string{"content": template}, + } + if n.IsAtAll { + sendData["at"] = map[string]interface{}{"isAtAll": n.IsAtAll} + } else { + sendData["at"] = map[string]interface{}{"atMobiles": n.AtMobiles} + } + marshal, _ := json.Marshal(sendData) + + timestamp := time.Now().UnixMilli() + sign := getSign(timestamp, n.Secret) + + url := fmt.Sprintf("%s×tamp=%d&sign=%s", n.WebHook, timestamp, sign) + + postJson := httpclient.NewRequest(url).Header("Content-Type", "application/json").PostJson(string(marshal)) + if postJson.StatusCode != 200 { + if failureLabelNode != nil { + return failureLabelNode.Handle(msg) + } else { + return err + } + } + if successLabelNode != nil { + return successLabelNode.Handle(msg) + } return nil } + +func getSign(timestamp int64, secret string) string { + stringToSign := fmt.Sprintf("%d\n%s", timestamp, secret) + hash := hmac.New(sha256.New, []byte(secret)) + hash.Write([]byte(stringToSign)) + signData := hash.Sum(nil) + return url.QueryEscape(base64.StdEncoding.EncodeToString(signData)) +} diff --git a/pkg/rule_engine/nodes/external_kafka_node.go b/pkg/rule_engine/nodes/external_kafka_node.go index 89c075b..6bff75a 100644 --- a/pkg/rule_engine/nodes/external_kafka_node.go +++ b/pkg/rule_engine/nodes/external_kafka_node.go @@ -1,15 +1,22 @@ package nodes import ( + "encoding/json" + "github.com/Shopify/sarama" "github.com/sirupsen/logrus" "pandax/pkg/rule_engine/message" + "strings" + "time" ) type externalKafkaNode struct { bareNode - Server string `json:"server" yaml:"server"` - Topic string `json:"topic" yaml:"topic"` - KafkaCli string + Server string `json:"server" yaml:"server"` //kafka集群 "10.130.138.164:9092,10.130.138.165:9093" + Topic string `json:"topic" yaml:"topic"` //topic + KeyPattern string `json:"keyPattern" yaml:"keyPattern"` //metadataKey or messageKey + ProducesBatchSize int64 `json:"producesBatchSize" yaml:"producesBatchSize"` + OtherProperties map[string]interface{} `json:"otherProperties"` //发送的其他参数 + KafkaCli sarama.SyncProducer } type externalKafkaNodeFactory struct{} @@ -21,11 +28,58 @@ func (f externalKafkaNodeFactory) Create(id string, meta Metadata) (Node, error) node := &externalKafkaNode{ bareNode: newBareNode(f.Name(), id, meta, f.Labels()), } - return decodePath(meta, node) + _, err := decodePath(meta, node) + if err != nil { + return node, err + } + config := sarama.NewConfig() + config.Producer.Return.Successes = true + config.Producer.Timeout = 5 * time.Second + config.Producer.MaxMessageBytes = int(node.ProducesBatchSize) + p, err := sarama.NewSyncProducer(strings.Split(node.Server, ","), config) + if err != nil { + logrus.Errorf("sarama.NewSyncProducer err, message=%s \n", err) + return node, err + } + node.KafkaCli = p + return node, nil } func (n *externalKafkaNode) Handle(msg message.Message) error { logrus.Infof("%s handle message '%s'", n.Name(), msg.GetType()) + defer n.KafkaCli.Close() + successLabelNode := n.GetLinkedNode("Success") + failureLabelNode := n.GetLinkedNode("Failure") + value := sarama.ByteEncoder("") + if n.KeyPattern == "metadataKey" { + marshal, err := json.Marshal(msg.GetMetadata().GetValues()) + if err != nil { + return err + } + value = marshal + } else { + marshal, err := json.Marshal(msg.GetMsg()) + if err != nil { + return err + } + value = marshal + } + + kafkaM := &sarama.ProducerMessage{ + Topic: n.Topic, + Value: value, + } + _, _, err := n.KafkaCli.SendMessage(kafkaM) + if err != nil { + if failureLabelNode != nil { + return failureLabelNode.Handle(msg) + } else { + return err + } + } + if successLabelNode != nil { + return successLabelNode.Handle(msg) + } return nil } diff --git a/pkg/rule_engine/nodes/external_mqtt_node.go b/pkg/rule_engine/nodes/external_mqtt_node.go index 9c0e81f..f47835c 100644 --- a/pkg/rule_engine/nodes/external_mqtt_node.go +++ b/pkg/rule_engine/nodes/external_mqtt_node.go @@ -13,6 +13,8 @@ import ( type externalMqttNode struct { bareNode TopicPattern string `json:"topicPattern"` + Username string `json:"username"` + Password string `json:"password"` Host string `json:"host"` Port string `json:"port"` ConnectTimeoutSec int `json:"connectTimeoutSec"` @@ -28,38 +30,55 @@ func (f externalMqttNodeFactory) Name() string { return "MqttNode" } func (f externalMqttNodeFactory) Category() string { return NODE_CATEGORY_EXTERNAL } func (f externalMqttNodeFactory) Labels() []string { return []string{"Success", "Failure"} } func (f externalMqttNodeFactory) Create(id string, meta Metadata) (Node, error) { - node := &externalMqttNode{ bareNode: newBareNode(f.Name(), id, meta, f.Labels()), } + _, err := decodePath(meta, node) + if err != nil { + return node, err + } broker := fmt.Sprintf("tcp://%s:%s", node.Host, node.Port) opts := mqtt.NewClientOptions().AddBroker(broker) - opts.SetClientID(node.ClientId) + if node.ClientId != "" { + opts.SetClientID(node.ClientId) + } opts.SetCleanSession(node.CleanSession) opts.SetConnectTimeout(time.Duration(node.ConnectTimeoutSec) * time.Second) + if node.Username != "" { + opts.SetUsername(node.Username) + } + if node.Password != "" { + opts.SetPassword(node.Password) + } node.MqttCli = mqtt.NewClient(opts) if token := node.MqttCli.Connect(); token.Wait() && token.Error() != nil { logrus.WithError(token.Error()) return nil, token.Error() } - return decodePath(meta, node) + return node, nil } func (n *externalMqttNode) Handle(msg message.Message) error { logrus.Infof("%s handle message '%s'", n.Name(), msg.GetType()) - + defer n.MqttCli.Disconnect(1000) successLabelNode := n.GetLinkedNode("Success") failureLabelNode := n.GetLinkedNode("Failure") topic := n.TopicPattern //need fix add msg.metadata in it sendmqttmsg, err := json.Marshal(msg.GetMsg()) if err != nil { - return nil + return err } token := n.MqttCli.Publish(topic, 1, false, sendmqttmsg) if token.Wait() && token.Error() != nil { - fmt.Println(token.Error()) - return failureLabelNode.Handle(msg) + if failureLabelNode != nil { + return failureLabelNode.Handle(msg) + } else { + return token.Error() + } } - return successLabelNode.Handle(msg) + if successLabelNode != nil { + return successLabelNode.Handle(msg) + } + return nil } diff --git a/pkg/rule_engine/nodes/external_nats_node.go b/pkg/rule_engine/nodes/external_nats_node.go index 202caca..e52c270 100644 --- a/pkg/rule_engine/nodes/external_nats_node.go +++ b/pkg/rule_engine/nodes/external_nats_node.go @@ -37,12 +37,23 @@ func (f externalNatsNodeFactory) Create(id string, meta Metadata) (Node, error) func (n *externalNatsNode) Handle(msg message.Message) error { logrus.Infof("%s handle message '%s'", n.Name(), msg.GetType()) + defer n.client.Close() successLabelNode := n.GetLinkedNode("Success") failureLabelNode := n.GetLinkedNode("Failure") - err := n.client.Publish(n.Subject, []byte(n.Body)) + template, err := ParseTemplate(n.Body, msg.GetAllMap()) if err != nil { - n.client.Close() - return failureLabelNode.Handle(msg) + return err } - return successLabelNode.Handle(msg) + err = n.client.Publish(n.Subject, []byte(template)) + if err != nil { + if failureLabelNode != nil { + return failureLabelNode.Handle(msg) + } else { + return err + } + } + if successLabelNode != nil { + return successLabelNode.Handle(msg) + } + return nil } diff --git a/pkg/rule_engine/nodes/external_restapi_node.go b/pkg/rule_engine/nodes/external_restapi_node.go index 3e1d69c..cffcd88 100644 --- a/pkg/rule_engine/nodes/external_restapi_node.go +++ b/pkg/rule_engine/nodes/external_restapi_node.go @@ -1,6 +1,9 @@ package nodes import ( + "encoding/json" + "errors" + "github.com/XM-GO/PandaKit/httpclient" "github.com/sirupsen/logrus" "pandax/pkg/rule_engine/message" ) @@ -9,7 +12,7 @@ type externalRestapiNode struct { bareNode RestEndpointUrlPattern string `json:"restEndpointUrlPattern" yaml:"restEndpointUrlPattern"` RequestMethod string `json:"requestMethod" yaml:"requestMethod"` - headers map[string]string `json:"headers" yaml:"headers"` + Headers map[string]string `json:"headers" yaml:"headers"` } type externalRestapiNodeFactory struct{} @@ -26,6 +29,73 @@ func (f externalRestapiNodeFactory) Create(id string, meta Metadata) (Node, erro func (n *externalRestapiNode) Handle(msg message.Message) error { logrus.Infof("%s handle message '%s'", n.Name(), msg.GetType()) - + successLableNode := n.GetLinkedNode("Success") + failureLableNode := n.GetLinkedNode("Failure") + if n.RequestMethod == "GET" { + resp := httpclient.NewRequest(n.RestEndpointUrlPattern).Get() + if resp.StatusCode != 200 { + return errors.New("网络请求失败") + } + var response map[string]interface{} + err := json.Unmarshal(resp.Body, &response) + if err != nil && failureLableNode != nil { + return failureLableNode.Handle(msg) + } else { + if successLableNode != nil { + metadata := msg.GetMetadata() + for key, value := range response { + metadata.SetKeyValue(key, value) + } + msg.SetMetadata(metadata) + return successLableNode.Handle(msg) + } + } + } + if n.RequestMethod == "POST" { + binary, _ := msg.MarshalBinary() + req := httpclient.NewRequest(n.RestEndpointUrlPattern) + for key, value := range n.Headers { + req.Header(key, value) + } + resp := req.PostJson(string(binary)) + if resp.StatusCode != 200 { + if failureLableNode != nil { + return failureLableNode.Handle(msg) + } + } else { + if successLableNode != nil { + return successLableNode.Handle(msg) + } + } + } + /*if n.RequestMethod == "PUT" { + binary, _ := msg.MarshalBinary() + req := httpclient.NewRequest(n.RestEndpointUrlPattern) + for key,value := range n.Headers { + req.Header(key,value) + } + _, err := http.HttpPut(n.RestEndpointUrlPattern, n.Headers, nil, binary) + if err != nil { + if failureLableNode != nil { + return failureLableNode.Handle(msg) + } + } else { + if successLableNode != nil { + return successLableNode.Handle(msg) + } + } + } + if n.RequestMethod == "DELETE" { + _, err := http.HttpDelete(n.RestEndpointUrlPattern) + if err != nil { + if failureLableNode != nil { + return failureLableNode.Handle(msg) + } + } else { + if successLableNode != nil { + return successLableNode.Handle(msg) + } + } + }*/ return nil } diff --git a/pkg/rule_engine/nodes/external_rule_chain_node.go b/pkg/rule_engine/nodes/external_rule_chain_node.go index da09f20..48e48a9 100644 --- a/pkg/rule_engine/nodes/external_rule_chain_node.go +++ b/pkg/rule_engine/nodes/external_rule_chain_node.go @@ -1,8 +1,13 @@ package nodes import ( - "github.com/sirupsen/logrus" + "pandax/apps/visual/services" + "pandax/pkg/rule_engine/manifest" "pandax/pkg/rule_engine/message" + + "errors" + "fmt" + "github.com/sirupsen/logrus" ) type externalRuleChainNode struct { @@ -13,13 +18,11 @@ type externalRuleChainNode struct { type externalRuleChainNodeFactory struct{} func (f externalRuleChainNodeFactory) Name() string { return "RuleChainNode" } -func (f externalRuleChainNodeFactory) Category() string { return NODE_CATEGORY_EXTERNAL } -func (f externalRuleChainNodeFactory) Labels() []string { return []string{"Success", "Failure"} } +func (f externalRuleChainNodeFactory) Category() string { return NODE_CATEGORY_FLOWS } +func (f externalRuleChainNodeFactory) Labels() []string { return []string{} } func (f externalRuleChainNodeFactory) Create(id string, meta Metadata) (Node, error) { - labels := []string{"Success", "Failure"} - node := &externalRuleChainNode{ - bareNode: newBareNode(f.Name(), id, meta, labels), + bareNode: newBareNode(f.Name(), id, meta, f.Labels()), } return decodePath(meta, node) @@ -27,6 +30,21 @@ func (f externalRuleChainNodeFactory) Create(id string, meta Metadata) (Node, er func (n *externalRuleChainNode) Handle(msg message.Message) error { logrus.Infof("%s handle message '%s'", n.Name(), msg.GetType()) - + data := services.VisualRuleChainModelDao.FindOne(n.RuleId) + if data == nil { + return errors.New(fmt.Sprintf("节点 %s ,获取规则链失败", n.Name())) + } + m, err := manifest.New([]byte(data.RuleDataJson)) + if err != nil { + logrus.WithError(err).Errorf("invalidi manifest file") + return err + } + nodes, err := GetNodes(m) + if err != nil { + return errors.New(fmt.Sprintf("节点 %s ,构建节点失败", n.Name())) + } + if node, found := nodes[m.FirstRuleNodeId]; found { + go node.Handle(msg) + } return nil } diff --git a/pkg/rule_engine/nodes/external_send_email_node.go b/pkg/rule_engine/nodes/external_send_email_node.go index 40583ed..0f33b8c 100644 --- a/pkg/rule_engine/nodes/external_send_email_node.go +++ b/pkg/rule_engine/nodes/external_send_email_node.go @@ -1,18 +1,28 @@ package nodes import ( + "crypto/tls" + "fmt" "github.com/sirupsen/logrus" + "net/smtp" "pandax/pkg/rule_engine/message" + "strings" + + "github.com/jordan-wright/email" ) type externalSendEmailNode struct { bareNode - From string `json:"from" yaml:"from"` - To string `json:"to" yaml:"to"` - Cc string `json:"cc" yaml:"cc"` - Bcc string `json:"bcc" yaml:"bcc"` - Subject string `json:"subject" yaml:"subject"` - Body string `json:"body" yaml:"body"` + Host string `json:"host"` // 服务器地址 + Port int `json:"port"` // 服务器端口 + From string `json:"from"` // 邮箱账号 + Nickname string `json:"nickname"` // 发件人 + Secret string `json:"secret"` // 邮箱密码 + IsSSL bool `json:"isSsl"` // 是否开启ssl + + To string `json:"to"` //收件人 + Subject string `json:"subject"` //主题 + Body string `json:"body"` //内容 } type externalSendEmailNodeFactory struct{} @@ -21,10 +31,14 @@ func (f externalSendEmailNodeFactory) Name() string { return "SendEmailNode" func (f externalSendEmailNodeFactory) Category() string { return NODE_CATEGORY_EXTERNAL } func (f externalSendEmailNodeFactory) Labels() []string { return []string{"Success", "Failure"} } func (f externalSendEmailNodeFactory) Create(id string, meta Metadata) (Node, error) { - labels := []string{"Success", "Failure"} node := &externalSendEmailNode{ - bareNode: newBareNode(f.Name(), id, meta, labels), + bareNode: newBareNode(f.Name(), id, meta, f.Labels()), + /*Host: email.Host, + Port: email.Port, + Nickname: email.Nickname, + Secret: email.Secret, + IsSSL: email.IsSSL,*/ } return decodePath(meta, node) } @@ -33,19 +47,48 @@ func (n *externalSendEmailNode) Handle(msg message.Message) error { logrus.Infof("%s handle message '%s'", n.Name(), msg.GetType()) successLabelNode := n.GetLinkedNode("Success") - //failureLabelNode := n.GetLinkedNode("Failure") + failureLabelNode := n.GetLinkedNode("Failure") - /*dialer := runtime.NewDialer(runtime.EMAIL) - variables := map[string]string{ - "from": n.From, - "to": n.To, - "cc": n.Cc, - "bcc": n.Bcc, - "subject": n.Subject, - "body": n.Body, + tos := strings.Split(n.To, ",") + if tos[len(tos)-1] == "" { // 判断切片的最后一个元素是否为空,为空则移除 + tos = tos[:len(tos)-1] } - if err := dialer.DialAndSend(msg.GetMetadata(), variables); err != nil { - return failureLabelNode.Handle(msg) - }*/ - return successLabelNode.Handle(msg) + err := n.send(tos, msg) + if err != nil { + if failureLabelNode != nil { + return failureLabelNode.Handle(msg) + } else { + return err + } + } + if successLabelNode != nil { + return successLabelNode.Handle(msg) + } + return nil +} + +func (m *externalSendEmailNode) send(to []string, msg message.Message) error { + + auth := smtp.PlainAuth("", m.From, m.Secret, m.Host) + e := email.NewEmail() + if m.Nickname != "" { + e.From = fmt.Sprintf("%s <%s>", m.Nickname, m.From) + } else { + e.From = m.From + } + e.To = to + e.Subject = m.Subject + template, err := ParseTemplate(m.Body, msg.GetMetadata().GetValues()) + if err != nil { + return err + } + e.HTML = []byte(template) + hostAddr := fmt.Sprintf("%s:%d", m.Host, m.Port) + if m.IsSSL { + err = e.SendWithTLS(hostAddr, auth, &tls.Config{ServerName: m.Host}) + } else { + err = e.Send(hostAddr, auth) + } + + return err } diff --git a/pkg/rule_engine/nodes/external_send_sms_node.go b/pkg/rule_engine/nodes/external_send_sms_node.go index a062eb7..ff854dc 100644 --- a/pkg/rule_engine/nodes/external_send_sms_node.go +++ b/pkg/rule_engine/nodes/external_send_sms_node.go @@ -7,6 +7,12 @@ import ( type externalSendSmsNode struct { bareNode + SecretId string `json:"secretId" yaml:"secretId"` + SecretKey string `json:"secretKey" yaml:"secretKey"` + SdkAppId string `json:"sdkAppId" yaml:"sdkAppId"` //应用Id(腾讯) 或 签名名称(阿里) + PhoneNumber string `json:"phoneNumber" yaml:"phoneNumber"` //发送到手机号 + TemplateId string `json:"templateId" yaml:"templateId"` //短信模板Id + TemplateParam map[string]interface{} `json:"templateParam" yaml:"templateParam"` //模板参数: 模板参数的个数需要与 TemplateId 对应模板的变量个数保持一致,若无模板参数,则设置为空*/ } type externalSendSmsNodeFactory struct{} diff --git a/pkg/rule_engine/nodes/external_wechat_node.go b/pkg/rule_engine/nodes/external_wechat_node.go index f22202a..0600ae6 100644 --- a/pkg/rule_engine/nodes/external_wechat_node.go +++ b/pkg/rule_engine/nodes/external_wechat_node.go @@ -1,6 +1,8 @@ package nodes import ( + "encoding/json" + "github.com/XM-GO/PandaKit/httpclient" "github.com/sirupsen/logrus" "pandax/pkg/rule_engine/message" ) @@ -11,7 +13,7 @@ type externalWechatNode struct { MsgType string `json:"msgType" yaml:"msgType"` Content string `json:"content" yaml:"content"` IsAtAll bool `json:"isAtAll" yaml:"isAtAll"` - atMobiles []string `json:"atMobiles" yaml:"atMobiles"` + AtMobiles []string `json:"atMobiles" yaml:"atMobiles"` } type externalWechatNodeFactory struct{} @@ -29,5 +31,29 @@ func (f externalWechatNodeFactory) Create(id string, meta Metadata) (Node, error func (n *externalWechatNode) Handle(msg message.Message) error { logrus.Infof("%s handle message '%s'", n.Name(), msg.GetType()) + successLabelNode := n.GetLinkedNode("Success") + failureLabelNode := n.GetLinkedNode("Failure") + template, err := ParseTemplate(n.Content, msg.GetAllMap()) + sendData := map[string]interface{}{ + "msgtype": "text", + "text": map[string]interface{}{"content": template}, + } + if n.IsAtAll { + sendData["text"].(map[string]interface{})["mentioned_mobile_list"] = []string{"@all"} + } else { + sendData["text"].(map[string]interface{})["mentioned_mobile_list"] = n.AtMobiles + } + marshal, _ := json.Marshal(sendData) + postJson := httpclient.NewRequest(n.WebHook).Header("Content-Type", "application/json").PostJson(string(marshal)) + if postJson.StatusCode != 200 { + if failureLabelNode != nil { + return failureLabelNode.Handle(msg) + } else { + return err + } + } + if successLabelNode != nil { + return successLabelNode.Handle(msg) + } return nil } diff --git a/pkg/rule_engine/nodes/filter_device_type_switch_node.go b/pkg/rule_engine/nodes/filter_device_type_switch_node.go index 82d7638..ea38b28 100644 --- a/pkg/rule_engine/nodes/filter_device_type_switch_node.go +++ b/pkg/rule_engine/nodes/filter_device_type_switch_node.go @@ -1,16 +1,10 @@ package nodes import ( - "fmt" "github.com/sirupsen/logrus" "pandax/pkg/rule_engine/message" ) -const ( - DEVICE = "DEVICE" - GATEWAY = "GATEWAY" -) - //检查关联关系 //该消息来自与哪个实体或到那个实体 type deviceTypeSwitchNode struct { @@ -21,7 +15,9 @@ type deviceTypeSwitchNodeFactory struct{} func (f deviceTypeSwitchNodeFactory) Name() string { return "DeviceTypeSwitchNode" } func (f deviceTypeSwitchNodeFactory) Category() string { return NODE_CATEGORY_FILTER } -func (f deviceTypeSwitchNodeFactory) Labels() []string { return []string{DEVICE, GATEWAY} } +func (f deviceTypeSwitchNodeFactory) Labels() []string { + return []string{message.DEVICE, message.GATEWAY} +} func (f deviceTypeSwitchNodeFactory) Create(id string, meta Metadata) (Node, error) { node := &deviceTypeSwitchNode{ bareNode: newBareNode(f.Name(), id, meta, f.Labels()), @@ -32,15 +28,18 @@ func (f deviceTypeSwitchNodeFactory) Create(id string, meta Metadata) (Node, err func (n *deviceTypeSwitchNode) Handle(msg message.Message) error { logrus.Infof("%s handle message '%s'", n.Name(), msg.GetType()) - deviceLabelNode := n.GetLinkedNode(DEVICE) - gatewayLabelNode := n.GetLinkedNode(GATEWAY) + deviceLabelNode := n.GetLinkedNode(message.DEVICE) + gatewayLabelNode := n.GetLinkedNode(message.GATEWAY) - if deviceLabelNode == nil && gatewayLabelNode == nil { - return fmt.Errorf("no device and gateway label linked node in %s", n.Name()) + if msg.GetMetadata().GetKeyValue("deviceType") == message.DEVICE { + if deviceLabelNode != nil { + return deviceLabelNode.Handle(msg) + } } - - if msg.GetMetadata().GetKeyValue("deviceType") == DEVICE { - return deviceLabelNode.Handle(msg) + if msg.GetMetadata().GetKeyValue("deviceType") == message.GATEWAY { + if gatewayLabelNode != nil { + return gatewayLabelNode.Handle(msg) + } } - return gatewayLabelNode.Handle(msg) + return nil } diff --git a/pkg/rule_engine/nodes/filter_message_type_switch_node.go b/pkg/rule_engine/nodes/filter_message_type_switch_node.go index 3cbc9b3..e20c510 100644 --- a/pkg/rule_engine/nodes/filter_message_type_switch_node.go +++ b/pkg/rule_engine/nodes/filter_message_type_switch_node.go @@ -2,7 +2,6 @@ package nodes import ( "github.com/sirupsen/logrus" - "log" "pandax/pkg/rule_engine/message" ) @@ -36,7 +35,6 @@ func (n *messageTypeSwitchNode) Handle(msg message.Message) error { nodes := n.GetLinkedNodes() messageType := msg.GetType() - log.Println("开始执行messageTypeSwitchNode") for label, node := range nodes { if messageType == label { return node.Handle(msg) diff --git a/pkg/rule_engine/nodes/filter_script_node.go b/pkg/rule_engine/nodes/filter_script_node.go index c9a1e68..aa77322 100644 --- a/pkg/rule_engine/nodes/filter_script_node.go +++ b/pkg/rule_engine/nodes/filter_script_node.go @@ -2,7 +2,6 @@ package nodes import ( "github.com/sirupsen/logrus" - "log" "pandax/pkg/rule_engine/message" ) @@ -30,9 +29,8 @@ func (n *scriptFilterNode) Handle(msg message.Message) error { trueLabelNode := n.GetLinkedNode("True") falseLabelNode := n.GetLinkedNode("False") - scriptEngine := NewScriptEngine() - isTrue, error := scriptEngine.ScriptOnFilter(msg, n.Script) - log.Println(isTrue) + scriptEngine := NewScriptEngine(msg, "Filter", n.Script) + isTrue, error := scriptEngine.ScriptOnFilter() if isTrue == true && error == nil && trueLabelNode != nil { return trueLabelNode.Handle(msg) } else { @@ -40,5 +38,6 @@ func (n *scriptFilterNode) Handle(msg message.Message) error { return falseLabelNode.Handle(msg) } } + return nil } diff --git a/pkg/rule_engine/nodes/filter_switch_node.go b/pkg/rule_engine/nodes/filter_switch_node.go index 0edb697..cfef3ca 100644 --- a/pkg/rule_engine/nodes/filter_switch_node.go +++ b/pkg/rule_engine/nodes/filter_switch_node.go @@ -2,7 +2,6 @@ package nodes import ( "github.com/sirupsen/logrus" - "log" "pandax/pkg/rule_engine/message" ) @@ -36,17 +35,16 @@ func (f switchFilterNodeFactory) Create(id string, meta Metadata) (Node, error) func (n *switchFilterNode) Handle(msg message.Message) error { logrus.Infof("%s handle message '%s'", n.Name(), msg.GetType()) - scriptEngine := NewScriptEngine() - SwitchResults, err := scriptEngine.ScriptOnSwitch(msg, n.Script) - log.Println("开始执行switchFilterNode", SwitchResults) + scriptEngine := NewScriptEngine(msg, "Switch", n.Script) + SwitchResults, err := scriptEngine.ScriptOnSwitch() if err != nil { - return nil + return err } nodes := n.GetLinkedNodes() for label, node := range nodes { for _, switchresult := range SwitchResults { if label == switchresult { - node.Handle(msg) + go node.Handle(msg) } } } diff --git a/pkg/rule_engine/nodes/init.go b/pkg/rule_engine/nodes/init.go index 043a619..f7d8eed 100644 --- a/pkg/rule_engine/nodes/init.go +++ b/pkg/rule_engine/nodes/init.go @@ -31,6 +31,4 @@ func init() { RegisterFactory(externalSendSmsNodeFactory{}) RegisterFactory(externalRuleChainNodeFactory{}) - RegisterFactory(myNodeFactory{}) - } diff --git a/pkg/rule_engine/nodes/metadata.go b/pkg/rule_engine/nodes/metadata.go index f3f131b..84c38c6 100644 --- a/pkg/rule_engine/nodes/metadata.go +++ b/pkg/rule_engine/nodes/metadata.go @@ -58,6 +58,6 @@ func (c *nodeMetadata) With(key string, val interface{}) Metadata { } func (c *nodeMetadata) DecodePath(rawVal interface{}) error { - //return tool.Map2Struct(c.keypairs, rawVal) + //return utils.Map2Struct(c.keypairs, rawVal) return mapstructure.Decode(c.keypairs, rawVal) } diff --git a/pkg/rule_engine/nodes/node.go b/pkg/rule_engine/nodes/node.go index 5652ef5..06b8a20 100644 --- a/pkg/rule_engine/nodes/node.go +++ b/pkg/rule_engine/nodes/node.go @@ -49,7 +49,6 @@ func (n *bareNode) GetLinkedNode(label string) Node { if node, found := n.nodes[label]; found { return node } - logrus.Errorf("no label '%s' in node '%s'", label, n.name) return nil } @@ -73,6 +72,7 @@ func GetNodes(m *manifest.Manifest) (map[string]Node, error) { metadata := NewMetadataWithValues(n.Properties) node, err := NewNode(n.Type, n.Id, metadata) if err != nil { + logrus.Errorf("new node '%s' failure", n.Id) continue } if _, found := nodes[n.Id]; found { diff --git a/pkg/rule_engine/nodes/script_engine.go b/pkg/rule_engine/nodes/script_engine.go index b4da5d0..d885ab4 100644 --- a/pkg/rule_engine/nodes/script_engine.go +++ b/pkg/rule_engine/nodes/script_engine.go @@ -1,36 +1,45 @@ package nodes import ( + "fmt" "github.com/dop251/goja" "github.com/sirupsen/logrus" "pandax/pkg/rule_engine/message" ) type ScriptEngine interface { - ScriptOnMessage(msg message.Message, script string) (message.Message, error) - //used by filter_switch_node - ScriptOnSwitch(msg message.Message, script string) ([]string, error) - //used by filter_script_node - ScriptOnFilter(msg message.Message, script string) (bool, error) - ScriptToString(msg message.Message, script string) (string, error) + ScriptOnMessage() (message.Message, error) + ScriptOnSwitch() ([]string, error) + ScriptOnFilter() (bool, error) + ScriptToString() (string, error) + ScriptAlarmDetails() (map[string]interface{}, error) + ScriptGenerate() (map[string]interface{}, error) } type baseScriptEngine struct { + Fun string + Script string + Msg message.Message } -func NewScriptEngine() ScriptEngine { - return &baseScriptEngine{} +func NewScriptEngine(msg message.Message, fun string, script string) ScriptEngine { + return &baseScriptEngine{ + Fun: fun, + Script: fmt.Sprintf("function %s(msg, metadata, msgType) { %s }", fun, script), + Msg: msg, + } } -func (bse *baseScriptEngine) ScriptOnMessage(msg message.Message, script string) (message.Message, error) { +func (bse *baseScriptEngine) ScriptOnMessage() (message.Message, error) { + msg := bse.Msg vm := goja.New() - _, err := vm.RunString(script) + _, err := vm.RunString(bse.Script) if err != nil { logrus.Info("JS代码有问题") return nil, err } var fn func(map[string]interface{}, map[string]interface{}, string) map[string]interface{} - err = vm.ExportTo(vm.Get("Transform"), &fn) + err = vm.ExportTo(vm.Get(bse.Fun), &fn) if err != nil { logrus.Info("Js函数映射到 Go 函数失败!") return nil, err @@ -40,19 +49,18 @@ func (bse *baseScriptEngine) ScriptOnMessage(msg message.Message, script string) msg.SetMetadata(message.NewDefaultMetadata(datas["metadata"].(map[string]interface{}))) msg.SetType(datas["msgType"].(string)) return msg, nil - - return nil, nil } -func (bse *baseScriptEngine) ScriptOnSwitch(msg message.Message, script string) ([]string, error) { +func (bse *baseScriptEngine) ScriptOnSwitch() ([]string, error) { + msg := bse.Msg vm := goja.New() - _, err := vm.RunString(script) + _, err := vm.RunString(bse.Script) if err != nil { logrus.Info("JS代码有问题") return nil, err } var fn func(map[string]interface{}, map[string]interface{}, string) []string - err = vm.ExportTo(vm.Get("Switch"), &fn) + err = vm.ExportTo(vm.Get(bse.Fun), &fn) if err != nil { logrus.Info("Js函数映射到 Go 函数失败!") return nil, err @@ -61,15 +69,16 @@ func (bse *baseScriptEngine) ScriptOnSwitch(msg message.Message, script string) return datas, nil } -func (bse *baseScriptEngine) ScriptOnFilter(msg message.Message, script string) (bool, error) { +func (bse *baseScriptEngine) ScriptOnFilter() (bool, error) { + msg := bse.Msg vm := goja.New() - _, err := vm.RunString(script) + _, err := vm.RunString(bse.Script) if err != nil { logrus.Info("JS代码有问题") return false, err } var fn func(map[string]interface{}, map[string]interface{}, string) bool - err = vm.ExportTo(vm.Get("Filter"), &fn) + err = vm.ExportTo(vm.Get(bse.Fun), &fn) if err != nil { logrus.Info("Js函数映射到 Go 函数失败!") return false, err @@ -78,7 +87,56 @@ func (bse *baseScriptEngine) ScriptOnFilter(msg message.Message, script string) return datas, nil } -func (bse *baseScriptEngine) ScriptToString(msg message.Message, script string) (string, error) { - - return "", nil +func (bse *baseScriptEngine) ScriptToString() (string, error) { + msg := bse.Msg + vm := goja.New() + _, err := vm.RunString(bse.Script) + if err != nil { + logrus.Info("JS代码有问题") + return "", err + } + var fn func(map[string]interface{}, map[string]interface{}, string) string + err = vm.ExportTo(vm.Get(bse.Fun), &fn) + if err != nil { + logrus.Info("Js函数映射到 Go 函数失败!") + return "", err + } + data := fn(msg.GetMsg(), msg.GetMetadata().GetValues(), msg.GetType()) + return data, nil +} + +func (bse *baseScriptEngine) ScriptAlarmDetails() (map[string]interface{}, error) { + msg := bse.Msg + vm := goja.New() + _, err := vm.RunString(bse.Script) + if err != nil { + logrus.Info("JS代码有问题") + return nil, err + } + var fn func(map[string]interface{}, map[string]interface{}, string) map[string]interface{} + err = vm.ExportTo(vm.Get(bse.Fun), &fn) + if err != nil { + logrus.Info("Js函数映射到 Go 函数失败!") + return nil, err + } + datas := fn(msg.GetMsg(), msg.GetMetadata().GetValues(), msg.GetType()) + return datas, nil +} + +func (bse *baseScriptEngine) ScriptGenerate() (map[string]interface{}, error) { + msg := bse.Msg + vm := goja.New() + _, err := vm.RunString(bse.Script) + if err != nil { + logrus.Info("JS代码有问题") + return nil, err + } + var fn func(map[string]interface{}, map[string]interface{}, string) map[string]interface{} + err = vm.ExportTo(vm.Get(bse.Fun), &fn) + if err != nil { + logrus.Info("Js函数映射到 Go 函数失败!") + return nil, err + } + datas := fn(msg.GetMsg(), msg.GetMetadata().GetValues(), msg.GetType()) + return datas, nil } diff --git a/pkg/rule_engine/nodes/template_engine.go b/pkg/rule_engine/nodes/template_engine.go new file mode 100644 index 0000000..2ebb5a8 --- /dev/null +++ b/pkg/rule_engine/nodes/template_engine.go @@ -0,0 +1,19 @@ +package nodes + +import ( + "bytes" + "text/template" +) + +func ParseTemplate(content string, data map[string]interface{}) (string, error) { + tmpl, err := template.New("template").Parse(content) + if err != nil { + return "", err + } + buffer := &bytes.Buffer{} + err = tmpl.Execute(buffer, data) + if err != nil { + return "", err + } + return buffer.String(), nil +} diff --git a/pkg/rule_engine/nodes/transform_delete_key_node.go b/pkg/rule_engine/nodes/transform_delete_key_node.go index 864ebeb..f20e418 100644 --- a/pkg/rule_engine/nodes/transform_delete_key_node.go +++ b/pkg/rule_engine/nodes/transform_delete_key_node.go @@ -3,12 +3,13 @@ package nodes import ( "github.com/sirupsen/logrus" "pandax/pkg/rule_engine/message" + "strings" ) type transformDeleteKeyNode struct { bareNode - FormType string `json:"formType" yaml:"formType"` //msg metadata - Keys []string `json:"keys" yaml:"keys"` + FormType string `json:"formType" yaml:"formType"` //msg metadata + Keys string `json:"keys" yaml:"keys"` } type transformDeleteKeyNodeFactory struct{} @@ -27,9 +28,10 @@ func (n *transformDeleteKeyNode) Handle(msg message.Message) error { successLabelNode := n.GetLinkedNode("Success") failureLabelNode := n.GetLinkedNode("Failure") + keys := strings.Split(n.Keys, ",") if n.FormType == "msg" { data := msg.GetMsg() - for _, key := range n.Keys { + for _, key := range keys { if _, found := data[key]; found { delete(data, key) msg.SetMsg(data) @@ -37,7 +39,7 @@ func (n *transformDeleteKeyNode) Handle(msg message.Message) error { } } else if n.FormType == "metadata" { data := msg.GetMetadata() - for _, key := range n.Keys { + for _, key := range keys { if data.GetKeyValue(key) != nil { values := data.GetValues() delete(values, key) @@ -45,8 +47,12 @@ func (n *transformDeleteKeyNode) Handle(msg message.Message) error { } } } else { - failureLabelNode.Handle(msg) + if failureLabelNode != nil { + return failureLabelNode.Handle(msg) + } } - - return successLabelNode.Handle(msg) + if successLabelNode != nil { + return successLabelNode.Handle(msg) + } + return nil } diff --git a/pkg/rule_engine/nodes/transform_rename_key_node.go b/pkg/rule_engine/nodes/transform_rename_key_node.go index 717b2dd..ebbc62e 100644 --- a/pkg/rule_engine/nodes/transform_rename_key_node.go +++ b/pkg/rule_engine/nodes/transform_rename_key_node.go @@ -11,8 +11,8 @@ type transformRenameKeyNode struct { Keys []KeyName `json:"keys" yaml:"keys"` } type KeyName struct { - oldName string `json:"oldName" yaml:"oldName"` - newName string `json:"newName" yaml:"newName"` + OldName string `json:"oldName" yaml:"oldName"` + NewName string `json:"newName" yaml:"newName"` } type transformRenameKeyNodeFactory struct{} @@ -20,7 +20,7 @@ func (f transformRenameKeyNodeFactory) Name() string { return "RenameKeyNode func (f transformRenameKeyNodeFactory) Category() string { return NODE_CATEGORY_TRANSFORM } func (f transformRenameKeyNodeFactory) Labels() []string { return []string{"Success", "Failure"} } func (f transformRenameKeyNodeFactory) Create(id string, meta Metadata) (Node, error) { - node := &transformScriptNode{ + node := &transformRenameKeyNode{ bareNode: newBareNode(f.Name(), id, meta, f.Labels()), } return decodePath(meta, node) @@ -34,25 +34,29 @@ func (n *transformRenameKeyNode) Handle(msg message.Message) error { if n.FormType == "msg" { data := msg.GetMsg() for _, key := range n.Keys { - if _, found := data[key.oldName]; found { - data[key.newName] = data[key.oldName] - delete(data, key.oldName) + if _, found := data[key.OldName]; found { + data[key.NewName] = data[key.OldName] + delete(data, key.OldName) msg.SetMsg(data) } } } else if n.FormType == "metadata" { data := msg.GetMetadata() for _, key := range n.Keys { - if data.GetKeyValue(key.oldName) != nil { + if data.GetKeyValue(key.OldName) != nil { values := data.GetValues() - values[key.newName] = values[key.oldName] - delete(values, key.oldName) + values[key.NewName] = values[key.OldName] + delete(values, key.OldName) msg.SetMetadata(message.NewDefaultMetadata(values)) } } } else { - failureLabelNode.Handle(msg) + if failureLabelNode != nil { + return failureLabelNode.Handle(msg) + } } - - return successLabelNode.Handle(msg) + if successLabelNode != nil { + return successLabelNode.Handle(msg) + } + return nil } diff --git a/pkg/rule_engine/nodes/transform_script_node.go b/pkg/rule_engine/nodes/transform_script_node.go index d579554..cbee53e 100644 --- a/pkg/rule_engine/nodes/transform_script_node.go +++ b/pkg/rule_engine/nodes/transform_script_node.go @@ -16,9 +16,8 @@ func (f transformScriptNodeFactory) Name() string { return "ScriptNode" } func (f transformScriptNodeFactory) Category() string { return NODE_CATEGORY_TRANSFORM } func (f transformScriptNodeFactory) Labels() []string { return []string{"Success", "Failure"} } func (f transformScriptNodeFactory) Create(id string, meta Metadata) (Node, error) { - labels := []string{"Success", "Failure"} node := &transformScriptNode{ - bareNode: newBareNode(f.Name(), id, meta, labels), + bareNode: newBareNode(f.Name(), id, meta, f.Labels()), } return decodePath(meta, node) } @@ -29,10 +28,17 @@ func (n *transformScriptNode) Handle(msg message.Message) error { successLabelNode := n.GetLinkedNode("Success") failureLabelNode := n.GetLinkedNode("Failure") - scriptEngine := NewScriptEngine() - newMessage, err := scriptEngine.ScriptOnMessage(msg, n.Script) + scriptEngine := NewScriptEngine(msg, "Transform", n.Script) + newMessage, err := scriptEngine.ScriptOnMessage() if err != nil { - return failureLabelNode.Handle(msg) + if failureLabelNode != nil { + return failureLabelNode.Handle(msg) + } else { + return err + } } - return successLabelNode.Handle(newMessage) + if successLabelNode != nil { + return successLabelNode.Handle(newMessage) + } + return nil } diff --git a/pkg/rule_engine/tool/email.go b/pkg/rule_engine/tool/email.go new file mode 100644 index 0000000..05b1676 --- /dev/null +++ b/pkg/rule_engine/tool/email.go @@ -0,0 +1 @@ +package tool diff --git a/pkg/rule_engine/tool/kafka.go b/pkg/rule_engine/tool/kafka.go new file mode 100644 index 0000000..05b1676 --- /dev/null +++ b/pkg/rule_engine/tool/kafka.go @@ -0,0 +1 @@ +package tool diff --git a/pkg/rule_engine/tool/mq.go b/pkg/rule_engine/tool/mq.go new file mode 100644 index 0000000..05b1676 --- /dev/null +++ b/pkg/rule_engine/tool/mq.go @@ -0,0 +1 @@ +package tool diff --git a/pkg/rule_engine/tool/mqtt.go b/pkg/rule_engine/tool/mqtt.go new file mode 100644 index 0000000..05b1676 --- /dev/null +++ b/pkg/rule_engine/tool/mqtt.go @@ -0,0 +1 @@ +package tool diff --git a/pkg/rule_engine/tool/rest_api.go b/pkg/rule_engine/tool/rest_api.go new file mode 100644 index 0000000..05b1676 --- /dev/null +++ b/pkg/rule_engine/tool/rest_api.go @@ -0,0 +1 @@ +package tool diff --git a/pkg/rule_engine/tool/sms.go b/pkg/rule_engine/tool/sms.go new file mode 100644 index 0000000..05b1676 --- /dev/null +++ b/pkg/rule_engine/tool/sms.go @@ -0,0 +1 @@ +package tool