diff --git a/apps/device/api/device.go b/apps/device/api/device.go index 4c4861e..7d35e8a 100644 --- a/apps/device/api/device.go +++ b/apps/device/api/device.go @@ -14,7 +14,6 @@ import ( "pandax/kit/utils" "pandax/pkg/cache" "pandax/pkg/global" - "pandax/pkg/shadow" "strings" "time" @@ -97,14 +96,7 @@ func (p *DeviceApi) GetDeviceStatus(rc *restfulx.ReqCtx) { biz.ErrIsNil(err, "查询设备模板失败") // 从设备影子中读取 res := make([]entity.DeviceStatusVo, 0) - getDevice := shadow.InitDeviceShadow(device.Name, device.Pid) - rs := make(map[string]shadow.DevicePoint) - if classify == global.TslAttributesType { - rs = getDevice.AttributesPoints - } - if classify == global.TslTelemetryType { - rs = getDevice.TelemetryPoints - } + for _, tel := range *template { sdv := entity.DeviceStatusVo{ Name: tel.Name, @@ -112,32 +104,25 @@ func (p *DeviceApi) GetDeviceStatus(rc *restfulx.ReqCtx) { Type: tel.Type, Define: tel.Define, } - // 有直接从设备影子中查询,没有查询时序数据库最后一条记录 - if point, ok := rs[tel.Key]; ok { - value := point.Value - sdv.Time = point.UpdatedAt - sdv.Value = value + var table string + if classify == global.TslTelemetryType { + table = fmt.Sprintf("%s_telemetry", strings.ToLower(device.Name)) + } + if classify == global.TslAttributesType { + table = fmt.Sprintf("%s_attributes", strings.ToLower(device.Name)) + } + sql := `select ts,? from ? order by ts desc` + one, err := global.TdDb.GetOne(sql, strings.ToLower(tel.Key), table) + if err == nil { + sdv.Value = one[strings.ToLower(tel.Key)] + sdv.Time = time.Now() } else { - var table string - if classify == global.TslTelemetryType { - table = fmt.Sprintf("%s_telemetry", strings.ToLower(device.Name)) - } - if classify == global.TslAttributesType { - table = fmt.Sprintf("%s_attributes", strings.ToLower(device.Name)) - } - sql := `select ts,? from ? order by ts desc` - one, err := global.TdDb.GetOne(sql, strings.ToLower(tel.Key), table) - if err == nil { - sdv.Value = one[strings.ToLower(tel.Key)] + if value, ok := tel.Define["default_value"]; ok { + sdv.Value = value sdv.Time = time.Now() } else { - if value, ok := tel.Define["default_value"]; ok { - sdv.Value = value - sdv.Time = time.Now() - } else { - sdv.Value = "未知" - sdv.Time = time.Now() - } + sdv.Value = "未知" + sdv.Time = time.Now() } } res = append(res, sdv) diff --git a/apps/device/api/product.go b/apps/device/api/product.go index 568494e..69c9ab2 100644 --- a/apps/device/api/product.go +++ b/apps/device/api/product.go @@ -11,7 +11,6 @@ import ( "pandax/kit/model" "pandax/kit/restfulx" "pandax/kit/utils" - "pandax/pkg/cache" "pandax/pkg/global" "strings" @@ -70,6 +69,7 @@ func (p *ProductApi) GetProductTsl(rc *restfulx.ReqCtx) { attributes := make([]map[string]interface{}, 0) telemetry := make([]map[string]interface{}, 0) commands := make([]map[string]interface{}, 0) + events := make([]map[string]interface{}, 0) for _, template := range *templates { tslData := map[string]interface{}{ "name": template.Name, @@ -86,12 +86,17 @@ func (p *ProductApi) GetProductTsl(rc *restfulx.ReqCtx) { if template.Classify == global.TslCommandsType { commands = append(commands, tslData) } + if template.Classify == global.TslEventType { + events = append(events, tslData) + } + } rc.ResData = map[string]interface{}{ "attributes": attributes, "telemetry": telemetry, "commands": commands, + "events": events, } } @@ -149,26 +154,4 @@ func (p *ProductApi) DeleteProduct(rc *restfulx.ReqCtx) { // 删除产品 err := p.ProductApp.Delete(ids) biz.ErrIsNil(err, "产品删除失败") - // 删除所有模型,固件 - for _, id := range ids { - // 删除超级表 - deleteDeviceStable(id) - // 删除所有缓存 - cache.DelProductRule(id) - // 删除绑定的属性及OTA记录 - p.TemplateApp.Delete([]string{id}) - p.OtaAPP.Delete([]string{id}) - } -} - -func deleteDeviceStable(productId string) error { - err := global.TdDb.DropStable(productId + "_" + entity.ATTRIBUTES_TSL) - if err != nil { - return err - } - err = global.TdDb.DropStable(productId + "_" + entity.TELEMETRY_TSL) - if err != nil { - return err - } - return nil } diff --git a/apps/device/entity/product.go b/apps/device/entity/product.go index 1b66ef2..85704e5 100644 --- a/apps/device/entity/product.go +++ b/apps/device/entity/product.go @@ -18,6 +18,8 @@ const ( ATTRIBUTES_TSL = "attributes" TELEMETRY_TSL = "telemetry" COMMANDS_TSL = "commands" + EVENT_TSL = "events" + LOG_TSL = "logs" TAGS_TSL = "tags" ) @@ -77,7 +79,7 @@ type ProductOta struct { Description string `json:"description" gorm:"type:varchar(255);comment:说明"` } -type Define map[string]interface{} +type Define map[string]any func (a Define) Value() (driver.Value, error) { return json.Marshal(a) diff --git a/apps/device/services/product.go b/apps/device/services/product.go index 8f08c42..5f4cc7e 100644 --- a/apps/device/services/product.go +++ b/apps/device/services/product.go @@ -2,6 +2,7 @@ package services import ( "pandax/apps/device/entity" + "pandax/pkg/cache" "pandax/pkg/global" ) @@ -117,14 +118,26 @@ func (m *productModelImpl) Delete(ids []string) error { if err := global.Db.Table(m.table).Delete(&entity.Product{}, "id in (?)", ids).Error; err != nil { return err } + // 删除所有模型,固件 + for _, id := range ids { + // 删除超级表 + deleteDeviceStable(id) + // 删除所有缓存 + cache.DelProductRule(id) + // 删除绑定的属性及OTA记录 + ProductTemplateModelDao.Delete([]string{id}) + ProductOtaModelDao.Delete([]string{id}) + } return nil } func createDeviceStable(productId string) error { + // 属性表 err := global.TdDb.CreateStable(productId + "_" + entity.ATTRIBUTES_TSL) if err != nil { return err } + // 遥测表 err = global.TdDb.CreateStable(productId + "_" + entity.TELEMETRY_TSL) if err != nil { return err @@ -132,6 +145,18 @@ func createDeviceStable(productId string) error { return nil } +func deleteDeviceStable(productId string) error { + err := global.TdDb.DropStable(productId + "_" + entity.ATTRIBUTES_TSL) + if err != nil { + return err + } + err = global.TdDb.DropStable(productId + "_" + entity.TELEMETRY_TSL) + if err != nil { + return err + } + return nil +} + // 获取产品数量统计 func (m *productModelImpl) FindProductCount() (count entity.DeviceCount, err error) { sql := `SELECT COUNT(*) AS total, (SELECT COUNT(*) FROM products WHERE DATE(create_time) = CURDATE()) AS today FROM products` diff --git a/apps/rule/api/rulechain.go b/apps/rule/api/rulechain.go index 0f3b430..21d7984 100644 --- a/apps/rule/api/rulechain.go +++ b/apps/rule/api/rulechain.go @@ -64,8 +64,11 @@ func (p *RuleChainApi) GetRuleChainList(rc *restfulx.ReqCtx) { func (p *RuleChainApi) GetRuleChainListLabel(rc *restfulx.ReqCtx) { data := entity.RuleChain{} data.RuleName = restfulx.QueryParam(rc, "ruleName") + data.RoleId = rc.LoginAccount.RoleId + data.Owner = rc.LoginAccount.UserName + list, err := p.RuleChainApp.FindListBaseLabel(data) - biz.ErrIsNil(err, "获取规则链Label错误") + biz.ErrIsNilAppendErr(err, "获取规则链Label错误") rc.ResData = list } diff --git a/config.yml b/config.yml index d010041..14c15fd 100644 --- a/config.yml +++ b/config.yml @@ -5,11 +5,11 @@ app: server: # debug release test model: release - port: 7788 + port: 7799 # iothub服务端口 使用的rpc端口 9000 9001 可能与minio端口冲突 - grpc-port: 9001 - http-port: 9002 - tcp-port: 9003 + grpc-port: 7701 + http-port: 7702 + tcp-port: 7703 cors: true # 接口限流 rate: @@ -34,14 +34,14 @@ queue: ch-num: 3000 #并发执行数,同时处理多少条数据 redis: - host: 127.0.0.1 - password: root + host: 192.168.10.242 + password: 123456 port: 6379 mysql: host: 127.0.0.1:3306 username: root - password: '!MyEMS1' + password: 123456 db-name: pandax_iot config: charset=utf8&loc=Local&parseTime=true @@ -65,7 +65,7 @@ oss: taos: username: "root" password: "taosdata" - host: "127.0.0.1:6041" + host: "192.168.10.242:6041" database: "iot" config: "" diff --git a/go.mod b/go.mod index 480cd0c..7374ad8 100644 --- a/go.mod +++ b/go.mod @@ -17,7 +17,7 @@ require ( github.com/go-openapi/spec v0.20.6 github.com/go-playground/validator/v10 v10.8.0 github.com/go-redis/redis/v8 v8.11.5 - github.com/golang-queue/queue v0.1.4-0.20240218073423-0c677f44188b + github.com/golang-queue/queue v0.1.3 github.com/google/uuid v1.3.0 github.com/gorilla/schema v1.2.0 github.com/gorilla/websocket v1.5.0 @@ -70,6 +70,7 @@ require ( github.com/go-playground/universal-translator v0.18.0 // indirect github.com/go-sourcemap/sourcemap v2.1.3+incompatible // indirect github.com/go-sql-driver/mysql v1.6.0 // indirect + github.com/goccy/go-json v0.9.7 // indirect github.com/golang-sql/civil v0.0.0-20190719163853-cb61b32ac6fe // indirect github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0 // indirect github.com/golang/protobuf v1.5.2 // indirect @@ -96,7 +97,6 @@ require ( github.com/jinzhu/now v1.1.2 // indirect github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af // indirect github.com/josharian/intern v1.0.0 // indirect - github.com/jpillora/backoff v1.0.0 // indirect github.com/json-iterator/go v1.1.12 // indirect github.com/klauspost/compress v1.17.4 // indirect github.com/klauspost/cpuid/v2 v2.1.0 // indirect diff --git a/go.sum b/go.sum index 3de0736..1f71d43 100644 --- a/go.sum +++ b/go.sum @@ -17,7 +17,6 @@ github.com/aliyun/alibaba-cloud-sdk-go v1.61.1443/go.mod h1:RcDobYh8k5VP6TNybz9m github.com/aliyun/aliyun-oss-go-sdk v2.2.0+incompatible h1:ht2+VfbXtNLGhCsnTMc6/N26nSTBK6qdhktjYyjJQkk= github.com/aliyun/aliyun-oss-go-sdk v2.2.0+incompatible/go.mod h1:T/Aws4fEfogEE9v+HPhhw+CntffsBHJ8nXQCwKr0/g8= github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= -github.com/appleboy/com v0.1.7 h1:4lYTFNoMAAXGGIC8lDxVg/NY+1aXbYqfAWN05cZhd0M= github.com/baiyubin/aliyun-sts-go-sdk v0.0.0-20180326062324-cfa1a18b161f h1:ZNv7On9kyUzm7fvRZumSyy/IUiSC7AzL0I1jKKtwooA= github.com/baiyubin/aliyun-sts-go-sdk v0.0.0-20180326062324-cfa1a18b161f/go.mod h1:AuiFmCCPBSrqvVMvuqFuk0qogytodnVFVSN5CeJB8Gc= github.com/brianvoe/gofakeit/v6 v6.0.2 h1:MDvplMAKJMcKZDwQvsIbhT7BV/8UF/3EEy2n14ynUyA= @@ -125,11 +124,13 @@ github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LB github.com/go-sql-driver/mysql v1.6.0 h1:BCTh4TKNUYmOmMUcQ3IipzF5prigylS7XXjEkfCHuOE= github.com/go-sql-driver/mysql v1.6.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= +github.com/goccy/go-json v0.9.7 h1:IcB+Aqpx/iMHu5Yooh7jEzJk1JZ7Pjtmys2ukPr7EeM= +github.com/goccy/go-json v0.9.7/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I= github.com/gofrs/uuid v4.0.0+incompatible h1:1SD/1F5pU8p29ybwgQSwpQk+mwdRrXCYuPhW6m+TnJw= github.com/gofrs/uuid v4.0.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM= github.com/goji/httpauth v0.0.0-20160601135302-2da839ab0f4d/go.mod h1:nnjvkQ9ptGaCkuDUx6wNykzzlUixGxvkme+H/lnzb+A= -github.com/golang-queue/queue v0.1.4-0.20240218073423-0c677f44188b h1:EfOci2gtTtCMgxv2Coh+i0iEARmvnCrxcY0Mm08KzMw= -github.com/golang-queue/queue v0.1.4-0.20240218073423-0c677f44188b/go.mod h1:5nEkJTzw9Boc8ZCylQlrJK5f/Vd8Uo58yAssRli5ckg= +github.com/golang-queue/queue v0.1.3 h1:FGIrn8e0fN8EmL3glP0rFEcYVtWUGMEeqX4h4nnzh40= +github.com/golang-queue/queue v0.1.3/go.mod h1:h/PhaoMwT5Jc4sQNus7APgWBUItm6QC9k6JtmwrsRos= github.com/golang-sql/civil v0.0.0-20190719163853-cb61b32ac6fe h1:lXe2qZdvpiX5WZkZR4hgp4KJVfY3nMkvmwbVkpv1rVY= github.com/golang-sql/civil v0.0.0-20190719163853-cb61b32ac6fe/go.mod h1:8vg3r2VgvsThLBIFL93Qb5yWzgyZWhEmBwUJWevAkK0= github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0 h1:DACJavvAHhabrF08vX0COfcOBJRhZ8lUbR+ZWIs0Y5g= @@ -261,8 +262,6 @@ github.com/jordan-wright/email v4.0.1-0.20210109023952-943e75fe5223+incompatible github.com/jordan-wright/email v4.0.1-0.20210109023952-943e75fe5223+incompatible/go.mod h1:1c7szIrayyPPB/987hsnvNzLushdWf4o/79s3P08L8A= github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY= github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y= -github.com/jpillora/backoff v1.0.0 h1:uvFg412JmmHBHw7iwprIxkPMI+sGQ4kzOWsMeHnm2EA= -github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4= github.com/json-iterator/go v1.1.5/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= github.com/json-iterator/go v1.1.10/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= @@ -394,7 +393,7 @@ github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/ github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= -github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= +github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= github.com/taosdata/driver-go/v3 v3.5.0 h1:30crN+E+ACURmq28kn3Y8B3jfL5knaC1fc1rLvgyXqs= github.com/taosdata/driver-go/v3 v3.5.0/go.mod h1:H2vo/At+rOPY1aMzUV9P49SVX7NlXb3LAbKw+MCLrmU= github.com/xuri/efp v0.0.0-20210322160811-ab561f5b45e3 h1:EpI0bqf/eX9SdZDwlMmahKM+CDBgNbsXMhsN28XrM8o= @@ -408,7 +407,7 @@ go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= -go.uber.org/goleak v1.2.1 h1:NBol2c7O1ZokfZ0LEU9K6Whx/KnwvepVetCUhtKja4A= +go.uber.org/goleak v1.1.12 h1:gZAh5/EyT/HQwlpkCy6wTpqfH9H8Lz8zbm3dZh+OyzA= go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= go.uber.org/multierr v1.3.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4= go.uber.org/multierr v1.5.0/go.mod h1:FeouvMocqHpRaaGuG9EjoKcStLC43Zu/fmqdUMPcKYU= diff --git a/iothub/hook_message_work/hook_message_work.go b/iothub/hook_message_work/hook_message_work.go index aed7618..4667410 100644 --- a/iothub/hook_message_work/hook_message_work.go +++ b/iothub/hook_message_work/hook_message_work.go @@ -14,7 +14,6 @@ import ( "pandax/pkg/global/model" "pandax/pkg/rule_engine" "pandax/pkg/rule_engine/message" - "pandax/pkg/shadow" "pandax/pkg/tool" "pandax/pkg/websocket" ) @@ -66,18 +65,7 @@ func (s *HookService) handleOne(msg *netbase.DeviceEventInfo) { global.Log.Error("规则链执行失败", err) return } - // 保存设备影子 - if msg.Type != message.RpcRequestFromDevice { - SetDeviceShadow(msg.DeviceAuth, ruleMessage.Msg, msg.Type) - } case message.DisConnectMes, message.ConnectMes: - //检测设备影子并修改设备影子状态 - if msg.Type == message.ConnectMes { - shadow.InitDeviceShadow(msg.DeviceAuth.Name, msg.DeviceAuth.ProductId) - shadow.DeviceShadowInstance.SetOnline(msg.DeviceAuth.Name) - } else { - shadow.DeviceShadowInstance.SetOffline(msg.DeviceAuth.Name) - } // 更改设备在线状态 if msg.Type == message.ConnectMes { services.DeviceModelDao.UpdateStatus(msg.DeviceId, global.ONLINE) @@ -167,30 +155,3 @@ func SendZtWebsocket(deviceId, message string) { websocket.SendMessage(CJNR, stageid) } } - -// SetDeviceShadow 设置设备点 -func SetDeviceShadow(etoken *model.DeviceAuth, msgVals map[string]interface{}, msgType string) { - defer func() { - if err := recover(); err != nil { - global.Log.Error(err) - } - }() - - if msgType == message.RowMes { - msgType = message.TelemetryMes - } - for key, value := range msgVals { - if message.AttributesMes == msgType { - err := shadow.DeviceShadowInstance.SetDevicePoint(etoken.Name, global.TslAttributesType, key, value) - if err != nil { - global.Log.Error("设置设备影子点失败", err) - } - } - if message.TelemetryMes == msgType { - err := shadow.DeviceShadowInstance.SetDevicePoint(etoken.Name, global.TslTelemetryType, key, value) - if err != nil { - global.Log.Error("设置设备影子点失败", err) - } - } - } -} diff --git a/iothub/netbase/hook_base.go b/iothub/netbase/hook_base.go index 8e19acd..2560ba4 100644 --- a/iothub/netbase/hook_base.go +++ b/iothub/netbase/hook_base.go @@ -2,9 +2,10 @@ package netbase import ( "encoding/json" + "fmt" "pandax/apps/device/entity" "pandax/apps/device/services" - "pandax/iothub/server/emqxserver/protobuf" + exhook "pandax/iothub/server/emqxserver/protobuf" "pandax/kit/utils" "pandax/pkg/cache" "pandax/pkg/global" @@ -91,20 +92,24 @@ func CreateSubTableField(productId, ty string, fields map[string]interface{}) { if key == "ts" { return } - interfaceType := tool.GetInterfaceType(value) - // 向产品tsl中添加模型 - err := global.TdDb.AddSTableField(productId+"_"+ty, key, interfaceType, 0) - if err != nil { - return + check := cache.CheckSubDeviceField(key) + if !check { + interfaceType := tool.GetInterfaceType(value) + err := global.TdDb.AddSTableField(productId+"_"+ty, key, interfaceType, 0) + if err != nil { + return + } + tsl := entity.ProductTemplate{} + tsl.Pid = productId + tsl.Id = utils.GenerateID() + tsl.Name = key + tsl.Type = interfaceType + tsl.Key = key + tsl.Classify = ty + // 向产品tsl中添加模型 + services.ProductTemplateModelDao.Insert(tsl) + cache.SetSubDeviceField(key) } - tsl := entity.ProductTemplate{} - tsl.Pid = productId - tsl.Id = utils.GenerateID() - tsl.Name = key - tsl.Type = interfaceType - tsl.Key = key - tsl.Classify = ty - services.ProductTemplateModelDao.Insert(tsl) }(key, value) } group.Wait() @@ -187,12 +192,11 @@ func GetRequestIdFromTopic(reg, topic string) (requestId string) { func CreateConnectionInfo(msgType, protocol, clientID, peerHost string, deviceAuth *model.DeviceAuth) *DeviceEventInfo { ts := time.Now().Format("2006-01-02 15:04:05.000") - ci := &tdengine.ConnectInfo{ - ClientID: clientID, + ci := &tdengine.Events{ DeviceId: deviceAuth.DeviceId, - PeerHost: peerHost, - Protocol: protocol, - Type: msgType, + Name: msgType, + Type: "info", + Content: fmt.Sprintf("设备%s, %s 事件", deviceAuth.Name, msgType), Ts: ts, } v, err := json.Marshal(*ci) diff --git a/iothub/netbase/iothub_session.go b/iothub/netbase/iothub_session.go index 2684b1e..566a080 100644 --- a/iothub/netbase/iothub_session.go +++ b/iothub/netbase/iothub_session.go @@ -16,7 +16,7 @@ type DeviceEventInfo struct { func (j *DeviceEventInfo) Bytes() []byte { b, err := json.Marshal(j) if err != nil { - panic(err) + return nil } return b } diff --git a/pkg/cache/sub_device.go b/pkg/cache/sub_device.go new file mode 100644 index 0000000..fd715e1 --- /dev/null +++ b/pkg/cache/sub_device.go @@ -0,0 +1,31 @@ +package cache + +import ( + "pandax/kit/cache" + "strings" + "time" +) + +var SubDeviceField = cache.NewTimedCache(cache.NoExpiration, 24*time.Hour) +var SUBDEVICEKEY = "SUBDEVICEKEY" + +func CheckSubDeviceField(field string) bool { + fields, bool := SubDeviceField.Get(SUBDEVICEKEY) + if !bool { + return false + } + if !strings.Contains(fields.(string), field) { + return false + } + return true +} + +func SetSubDeviceField(data string) { + fields, bool := SubDeviceField.Get(SUBDEVICEKEY) + if !bool { + fields = data + } else { + fields = fields.(string) + "," + data + } + ProductCache.Put(SUBDEVICEKEY, fields) +} diff --git a/pkg/global/global_const_device.go b/pkg/global/global_const_device.go index 83ce689..9b6901e 100644 --- a/pkg/global/global_const_device.go +++ b/pkg/global/global_const_device.go @@ -4,6 +4,7 @@ const ( TslAttributesType = "attributes" TslTelemetryType = "telemetry" TslCommandsType = "commands" + TslEventType = "events" ) // 告警等级 diff --git a/pkg/tdengine/TDengineModel.go b/pkg/tdengine/TDengineModel.go index 91a3a0a..e7f9629 100644 --- a/pkg/tdengine/TDengineModel.go +++ b/pkg/tdengine/TDengineModel.go @@ -20,11 +20,3 @@ type TableDataInfo struct { Filed []string `json:"filed" description:"字段"` Info []map[string]interface{} `json:"info" description:"数据"` } - -// 日志 TDengine -type TdLog struct { - Ts string `json:"ts" dc:"时间"` - Device string `json:"device" dc:"设备标识"` - Type string `json:"type" dc:"日志类型"` - Content string `json:"content" dc:"日志内容"` -} diff --git a/pkg/tdengine/tdengine_event.go b/pkg/tdengine/tdengine_event.go index 10e2058..c902574 100644 --- a/pkg/tdengine/tdengine_event.go +++ b/pkg/tdengine/tdengine_event.go @@ -4,22 +4,20 @@ import ( "fmt" ) -const connectTableName = "device_connect" +const connectTableName = "events" -type ConnectInfo struct { - Ts string `json:"ts"` - ClientID string `json:"clientId"` - Type string `json:"type"` // 连接类型 - PeerHost string `json:"peerHost"` - SocketPort string `json:"sockPort"` - Protocol string `json:"protocol"` - DeviceId string `json:"deviceId"` +type Events struct { + Ts string `json:"ts"` + Name string `json:"name"` //标识 connet + Type string `json:"type"` // 事件类型 info alarm fault + Content string `json:"content"` // 事件描述 + DeviceId string `json:"deviceId"` } // CreateEventTable 创建设备连接事件表 func (s *TdEngine) CreateEventTable() (err error) { - sql := fmt.Sprintf(`CREATE TABLE IF NOT EXISTS %s.%s (ts TIMESTAMP,deviceId NCHAR(64), - type NCHAR(64),clientId NCHAR(64),peerHost NCHAR(64),sockPort NCHAR(64),protocol NCHAR(64))`, s.dbName, connectTableName) + sql := fmt.Sprintf(`CREATE TABLE IF NOT EXISTS %s.%s (ts TIMESTAMP,deviceId NCHAR(64),name NCHAR(64), + type NCHAR(64),content NCHAR(255))`, s.dbName, connectTableName) _, err = s.db.Exec(sql) return } @@ -27,3 +25,25 @@ func (s *TdEngine) CreateEventTable() (err error) { func (s *TdEngine) InsertEvent(data map[string]any) (err error) { return s.InsertDevice(connectTableName, data) } + +func (s *TdEngine) GetAllEvents(sql string, args ...any) (list []Events, err error) { + rows, err := s.db.Query(sql, args...) + if err != nil { + return nil, err + } + defer rows.Close() + + for rows.Next() { + var event Events + + err = rows.Scan(&event.Ts, &event.DeviceId, &event.Name, &event.Type, &event.Content) + if err != nil { + return nil, err + } + event.Ts = s.Time(event.Ts) + + list = append(list, event) + } + + return +} diff --git a/pkg/tdengine/tdengine_log.go b/pkg/tdengine/tdengine_log.go index 9c29392..7d9c689 100644 --- a/pkg/tdengine/tdengine_log.go +++ b/pkg/tdengine/tdengine_log.go @@ -1,21 +1,37 @@ package tdengine -import "time" +import ( + "fmt" + "time" -const logTableName = "device_log" + "github.com/kakuilan/kgo" +) + +const logTableName = "logs" + +// 日志 TDengine +type TdLog struct { + Ts string `json:"ts" dc:"时间"` + DeviceId string `json:"deviceId" dc:"设备标识"` + TraceId string `json:"traceId" dc:"追踪"` + Type string `json:"type" dc:"日志类型"` // 命令调用 上行 下行 + Content string `json:"content" dc:"日志内容"` +} // CreateLogStable 添加LOG超级表 func (s *TdEngine) CreateLogStable() (err error) { - sql := "CREATE STABLE IF NOT EXISTS ? (ts TIMESTAMP, type VARCHAR(20), content VARCHAR(1000)) TAGS (device VARCHAR(255))" + sql := "CREATE STABLE IF NOT EXISTS ? (ts TIMESTAMP,deviceId NCHAR(64),traceId NCHAR(64),type NCHAR(20), content VARCHAR(1000))" _, err = s.db.Exec(sql, logTableName) return } // InsertLog 写入数据 func (s *TdEngine) InsertLog(log *TdLog) (err error) { - sql := "INSERT INTO log_? USING device_log TAGS (?) VALUES (?, ?, ?)" - _, err = s.db.Exec(sql, log.Device, log.Ts, log.Type, log.Content) - + logs, err := kgo.KConv.Struct2Map(*log, "") + if err != nil { + return err + } + err = s.InsertDevice(logTableName, logs) return } @@ -23,7 +39,7 @@ func (s *TdEngine) InsertLog(log *TdLog) (err error) { func (s *TdEngine) ClearLog() (err error) { ts := time.Now().Add(-7 * 24 * time.Hour).Format("2006-01-02") - sql := "DELETE FROM device_log WHERE ts < ?" + sql := fmt.Sprintf("DELETE FROM %s WHERE ts < ?", logTableName) _, err = s.db.Exec(sql, ts) return @@ -40,7 +56,7 @@ func (s *TdEngine) GetAllLog(sql string, args ...any) (list []TdLog, err error) for rows.Next() { var log TdLog - err = rows.Scan(&log.Ts, &log.Type, &log.Content, &log.Device) + err = rows.Scan(&log.Ts, &log.DeviceId, &log.TraceId, &log.Type, &log.Content) if err != nil { return nil, err }