mirror of
https://gitee.com/XM-GO/PandaX.git
synced 2026-04-23 02:48:34 +08:00
优化
This commit is contained in:
@@ -14,7 +14,6 @@ import (
|
||||
"pandax/kit/utils"
|
||||
"pandax/pkg/cache"
|
||||
"pandax/pkg/global"
|
||||
"pandax/pkg/shadow"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
@@ -97,14 +96,7 @@ func (p *DeviceApi) GetDeviceStatus(rc *restfulx.ReqCtx) {
|
||||
biz.ErrIsNil(err, "查询设备模板失败")
|
||||
// 从设备影子中读取
|
||||
res := make([]entity.DeviceStatusVo, 0)
|
||||
getDevice := shadow.InitDeviceShadow(device.Name, device.Pid)
|
||||
rs := make(map[string]shadow.DevicePoint)
|
||||
if classify == global.TslAttributesType {
|
||||
rs = getDevice.AttributesPoints
|
||||
}
|
||||
if classify == global.TslTelemetryType {
|
||||
rs = getDevice.TelemetryPoints
|
||||
}
|
||||
|
||||
for _, tel := range *template {
|
||||
sdv := entity.DeviceStatusVo{
|
||||
Name: tel.Name,
|
||||
@@ -112,32 +104,25 @@ func (p *DeviceApi) GetDeviceStatus(rc *restfulx.ReqCtx) {
|
||||
Type: tel.Type,
|
||||
Define: tel.Define,
|
||||
}
|
||||
// 有直接从设备影子中查询,没有查询时序数据库最后一条记录
|
||||
if point, ok := rs[tel.Key]; ok {
|
||||
value := point.Value
|
||||
sdv.Time = point.UpdatedAt
|
||||
sdv.Value = value
|
||||
var table string
|
||||
if classify == global.TslTelemetryType {
|
||||
table = fmt.Sprintf("%s_telemetry", strings.ToLower(device.Name))
|
||||
}
|
||||
if classify == global.TslAttributesType {
|
||||
table = fmt.Sprintf("%s_attributes", strings.ToLower(device.Name))
|
||||
}
|
||||
sql := `select ts,? from ? order by ts desc`
|
||||
one, err := global.TdDb.GetOne(sql, strings.ToLower(tel.Key), table)
|
||||
if err == nil {
|
||||
sdv.Value = one[strings.ToLower(tel.Key)]
|
||||
sdv.Time = time.Now()
|
||||
} else {
|
||||
var table string
|
||||
if classify == global.TslTelemetryType {
|
||||
table = fmt.Sprintf("%s_telemetry", strings.ToLower(device.Name))
|
||||
}
|
||||
if classify == global.TslAttributesType {
|
||||
table = fmt.Sprintf("%s_attributes", strings.ToLower(device.Name))
|
||||
}
|
||||
sql := `select ts,? from ? order by ts desc`
|
||||
one, err := global.TdDb.GetOne(sql, strings.ToLower(tel.Key), table)
|
||||
if err == nil {
|
||||
sdv.Value = one[strings.ToLower(tel.Key)]
|
||||
if value, ok := tel.Define["default_value"]; ok {
|
||||
sdv.Value = value
|
||||
sdv.Time = time.Now()
|
||||
} else {
|
||||
if value, ok := tel.Define["default_value"]; ok {
|
||||
sdv.Value = value
|
||||
sdv.Time = time.Now()
|
||||
} else {
|
||||
sdv.Value = "未知"
|
||||
sdv.Time = time.Now()
|
||||
}
|
||||
sdv.Value = "未知"
|
||||
sdv.Time = time.Now()
|
||||
}
|
||||
}
|
||||
res = append(res, sdv)
|
||||
|
||||
@@ -11,7 +11,6 @@ import (
|
||||
"pandax/kit/model"
|
||||
"pandax/kit/restfulx"
|
||||
"pandax/kit/utils"
|
||||
"pandax/pkg/cache"
|
||||
"pandax/pkg/global"
|
||||
"strings"
|
||||
|
||||
@@ -70,6 +69,7 @@ func (p *ProductApi) GetProductTsl(rc *restfulx.ReqCtx) {
|
||||
attributes := make([]map[string]interface{}, 0)
|
||||
telemetry := make([]map[string]interface{}, 0)
|
||||
commands := make([]map[string]interface{}, 0)
|
||||
events := make([]map[string]interface{}, 0)
|
||||
for _, template := range *templates {
|
||||
tslData := map[string]interface{}{
|
||||
"name": template.Name,
|
||||
@@ -86,12 +86,17 @@ func (p *ProductApi) GetProductTsl(rc *restfulx.ReqCtx) {
|
||||
if template.Classify == global.TslCommandsType {
|
||||
commands = append(commands, tslData)
|
||||
}
|
||||
if template.Classify == global.TslEventType {
|
||||
events = append(events, tslData)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
rc.ResData = map[string]interface{}{
|
||||
"attributes": attributes,
|
||||
"telemetry": telemetry,
|
||||
"commands": commands,
|
||||
"events": events,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -149,26 +154,4 @@ func (p *ProductApi) DeleteProduct(rc *restfulx.ReqCtx) {
|
||||
// 删除产品
|
||||
err := p.ProductApp.Delete(ids)
|
||||
biz.ErrIsNil(err, "产品删除失败")
|
||||
// 删除所有模型,固件
|
||||
for _, id := range ids {
|
||||
// 删除超级表
|
||||
deleteDeviceStable(id)
|
||||
// 删除所有缓存
|
||||
cache.DelProductRule(id)
|
||||
// 删除绑定的属性及OTA记录
|
||||
p.TemplateApp.Delete([]string{id})
|
||||
p.OtaAPP.Delete([]string{id})
|
||||
}
|
||||
}
|
||||
|
||||
func deleteDeviceStable(productId string) error {
|
||||
err := global.TdDb.DropStable(productId + "_" + entity.ATTRIBUTES_TSL)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = global.TdDb.DropStable(productId + "_" + entity.TELEMETRY_TSL)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -18,6 +18,8 @@ const (
|
||||
ATTRIBUTES_TSL = "attributes"
|
||||
TELEMETRY_TSL = "telemetry"
|
||||
COMMANDS_TSL = "commands"
|
||||
EVENT_TSL = "events"
|
||||
LOG_TSL = "logs"
|
||||
TAGS_TSL = "tags"
|
||||
)
|
||||
|
||||
@@ -77,7 +79,7 @@ type ProductOta struct {
|
||||
Description string `json:"description" gorm:"type:varchar(255);comment:说明"`
|
||||
}
|
||||
|
||||
type Define map[string]interface{}
|
||||
type Define map[string]any
|
||||
|
||||
func (a Define) Value() (driver.Value, error) {
|
||||
return json.Marshal(a)
|
||||
|
||||
@@ -2,6 +2,7 @@ package services
|
||||
|
||||
import (
|
||||
"pandax/apps/device/entity"
|
||||
"pandax/pkg/cache"
|
||||
"pandax/pkg/global"
|
||||
)
|
||||
|
||||
@@ -117,14 +118,26 @@ func (m *productModelImpl) Delete(ids []string) error {
|
||||
if err := global.Db.Table(m.table).Delete(&entity.Product{}, "id in (?)", ids).Error; err != nil {
|
||||
return err
|
||||
}
|
||||
// 删除所有模型,固件
|
||||
for _, id := range ids {
|
||||
// 删除超级表
|
||||
deleteDeviceStable(id)
|
||||
// 删除所有缓存
|
||||
cache.DelProductRule(id)
|
||||
// 删除绑定的属性及OTA记录
|
||||
ProductTemplateModelDao.Delete([]string{id})
|
||||
ProductOtaModelDao.Delete([]string{id})
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func createDeviceStable(productId string) error {
|
||||
// 属性表
|
||||
err := global.TdDb.CreateStable(productId + "_" + entity.ATTRIBUTES_TSL)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// 遥测表
|
||||
err = global.TdDb.CreateStable(productId + "_" + entity.TELEMETRY_TSL)
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -132,6 +145,18 @@ func createDeviceStable(productId string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func deleteDeviceStable(productId string) error {
|
||||
err := global.TdDb.DropStable(productId + "_" + entity.ATTRIBUTES_TSL)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = global.TdDb.DropStable(productId + "_" + entity.TELEMETRY_TSL)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// 获取产品数量统计
|
||||
func (m *productModelImpl) FindProductCount() (count entity.DeviceCount, err error) {
|
||||
sql := `SELECT COUNT(*) AS total, (SELECT COUNT(*) FROM products WHERE DATE(create_time) = CURDATE()) AS today FROM products`
|
||||
|
||||
@@ -64,8 +64,11 @@ func (p *RuleChainApi) GetRuleChainList(rc *restfulx.ReqCtx) {
|
||||
func (p *RuleChainApi) GetRuleChainListLabel(rc *restfulx.ReqCtx) {
|
||||
data := entity.RuleChain{}
|
||||
data.RuleName = restfulx.QueryParam(rc, "ruleName")
|
||||
data.RoleId = rc.LoginAccount.RoleId
|
||||
data.Owner = rc.LoginAccount.UserName
|
||||
|
||||
list, err := p.RuleChainApp.FindListBaseLabel(data)
|
||||
biz.ErrIsNil(err, "获取规则链Label错误")
|
||||
biz.ErrIsNilAppendErr(err, "获取规则链Label错误")
|
||||
rc.ResData = list
|
||||
}
|
||||
|
||||
|
||||
16
config.yml
16
config.yml
@@ -5,11 +5,11 @@ app:
|
||||
server:
|
||||
# debug release test
|
||||
model: release
|
||||
port: 7788
|
||||
port: 7799
|
||||
# iothub服务端口 使用的rpc端口 9000 9001 可能与minio端口冲突
|
||||
grpc-port: 9001
|
||||
http-port: 9002
|
||||
tcp-port: 9003
|
||||
grpc-port: 7701
|
||||
http-port: 7702
|
||||
tcp-port: 7703
|
||||
cors: true
|
||||
# 接口限流
|
||||
rate:
|
||||
@@ -34,14 +34,14 @@ queue:
|
||||
ch-num: 3000 #并发执行数,同时处理多少条数据
|
||||
|
||||
redis:
|
||||
host: 127.0.0.1
|
||||
password: root
|
||||
host: 192.168.10.242
|
||||
password: 123456
|
||||
port: 6379
|
||||
|
||||
mysql:
|
||||
host: 127.0.0.1:3306
|
||||
username: root
|
||||
password: '!MyEMS1'
|
||||
password: 123456
|
||||
db-name: pandax_iot
|
||||
config: charset=utf8&loc=Local&parseTime=true
|
||||
|
||||
@@ -65,7 +65,7 @@ oss:
|
||||
taos:
|
||||
username: "root"
|
||||
password: "taosdata"
|
||||
host: "127.0.0.1:6041"
|
||||
host: "192.168.10.242:6041"
|
||||
database: "iot"
|
||||
config: ""
|
||||
|
||||
|
||||
4
go.mod
4
go.mod
@@ -17,7 +17,7 @@ require (
|
||||
github.com/go-openapi/spec v0.20.6
|
||||
github.com/go-playground/validator/v10 v10.8.0
|
||||
github.com/go-redis/redis/v8 v8.11.5
|
||||
github.com/golang-queue/queue v0.1.4-0.20240218073423-0c677f44188b
|
||||
github.com/golang-queue/queue v0.1.3
|
||||
github.com/google/uuid v1.3.0
|
||||
github.com/gorilla/schema v1.2.0
|
||||
github.com/gorilla/websocket v1.5.0
|
||||
@@ -70,6 +70,7 @@ require (
|
||||
github.com/go-playground/universal-translator v0.18.0 // indirect
|
||||
github.com/go-sourcemap/sourcemap v2.1.3+incompatible // indirect
|
||||
github.com/go-sql-driver/mysql v1.6.0 // indirect
|
||||
github.com/goccy/go-json v0.9.7 // indirect
|
||||
github.com/golang-sql/civil v0.0.0-20190719163853-cb61b32ac6fe // indirect
|
||||
github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0 // indirect
|
||||
github.com/golang/protobuf v1.5.2 // indirect
|
||||
@@ -96,7 +97,6 @@ require (
|
||||
github.com/jinzhu/now v1.1.2 // indirect
|
||||
github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af // indirect
|
||||
github.com/josharian/intern v1.0.0 // indirect
|
||||
github.com/jpillora/backoff v1.0.0 // indirect
|
||||
github.com/json-iterator/go v1.1.12 // indirect
|
||||
github.com/klauspost/compress v1.17.4 // indirect
|
||||
github.com/klauspost/cpuid/v2 v2.1.0 // indirect
|
||||
|
||||
13
go.sum
13
go.sum
@@ -17,7 +17,6 @@ github.com/aliyun/alibaba-cloud-sdk-go v1.61.1443/go.mod h1:RcDobYh8k5VP6TNybz9m
|
||||
github.com/aliyun/aliyun-oss-go-sdk v2.2.0+incompatible h1:ht2+VfbXtNLGhCsnTMc6/N26nSTBK6qdhktjYyjJQkk=
|
||||
github.com/aliyun/aliyun-oss-go-sdk v2.2.0+incompatible/go.mod h1:T/Aws4fEfogEE9v+HPhhw+CntffsBHJ8nXQCwKr0/g8=
|
||||
github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
|
||||
github.com/appleboy/com v0.1.7 h1:4lYTFNoMAAXGGIC8lDxVg/NY+1aXbYqfAWN05cZhd0M=
|
||||
github.com/baiyubin/aliyun-sts-go-sdk v0.0.0-20180326062324-cfa1a18b161f h1:ZNv7On9kyUzm7fvRZumSyy/IUiSC7AzL0I1jKKtwooA=
|
||||
github.com/baiyubin/aliyun-sts-go-sdk v0.0.0-20180326062324-cfa1a18b161f/go.mod h1:AuiFmCCPBSrqvVMvuqFuk0qogytodnVFVSN5CeJB8Gc=
|
||||
github.com/brianvoe/gofakeit/v6 v6.0.2 h1:MDvplMAKJMcKZDwQvsIbhT7BV/8UF/3EEy2n14ynUyA=
|
||||
@@ -125,11 +124,13 @@ github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LB
|
||||
github.com/go-sql-driver/mysql v1.6.0 h1:BCTh4TKNUYmOmMUcQ3IipzF5prigylS7XXjEkfCHuOE=
|
||||
github.com/go-sql-driver/mysql v1.6.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg=
|
||||
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
|
||||
github.com/goccy/go-json v0.9.7 h1:IcB+Aqpx/iMHu5Yooh7jEzJk1JZ7Pjtmys2ukPr7EeM=
|
||||
github.com/goccy/go-json v0.9.7/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I=
|
||||
github.com/gofrs/uuid v4.0.0+incompatible h1:1SD/1F5pU8p29ybwgQSwpQk+mwdRrXCYuPhW6m+TnJw=
|
||||
github.com/gofrs/uuid v4.0.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM=
|
||||
github.com/goji/httpauth v0.0.0-20160601135302-2da839ab0f4d/go.mod h1:nnjvkQ9ptGaCkuDUx6wNykzzlUixGxvkme+H/lnzb+A=
|
||||
github.com/golang-queue/queue v0.1.4-0.20240218073423-0c677f44188b h1:EfOci2gtTtCMgxv2Coh+i0iEARmvnCrxcY0Mm08KzMw=
|
||||
github.com/golang-queue/queue v0.1.4-0.20240218073423-0c677f44188b/go.mod h1:5nEkJTzw9Boc8ZCylQlrJK5f/Vd8Uo58yAssRli5ckg=
|
||||
github.com/golang-queue/queue v0.1.3 h1:FGIrn8e0fN8EmL3glP0rFEcYVtWUGMEeqX4h4nnzh40=
|
||||
github.com/golang-queue/queue v0.1.3/go.mod h1:h/PhaoMwT5Jc4sQNus7APgWBUItm6QC9k6JtmwrsRos=
|
||||
github.com/golang-sql/civil v0.0.0-20190719163853-cb61b32ac6fe h1:lXe2qZdvpiX5WZkZR4hgp4KJVfY3nMkvmwbVkpv1rVY=
|
||||
github.com/golang-sql/civil v0.0.0-20190719163853-cb61b32ac6fe/go.mod h1:8vg3r2VgvsThLBIFL93Qb5yWzgyZWhEmBwUJWevAkK0=
|
||||
github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0 h1:DACJavvAHhabrF08vX0COfcOBJRhZ8lUbR+ZWIs0Y5g=
|
||||
@@ -261,8 +262,6 @@ github.com/jordan-wright/email v4.0.1-0.20210109023952-943e75fe5223+incompatible
|
||||
github.com/jordan-wright/email v4.0.1-0.20210109023952-943e75fe5223+incompatible/go.mod h1:1c7szIrayyPPB/987hsnvNzLushdWf4o/79s3P08L8A=
|
||||
github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY=
|
||||
github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y=
|
||||
github.com/jpillora/backoff v1.0.0 h1:uvFg412JmmHBHw7iwprIxkPMI+sGQ4kzOWsMeHnm2EA=
|
||||
github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4=
|
||||
github.com/json-iterator/go v1.1.5/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
|
||||
github.com/json-iterator/go v1.1.10/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
|
||||
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
|
||||
@@ -394,7 +393,7 @@ github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/
|
||||
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
|
||||
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
|
||||
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
|
||||
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
|
||||
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
|
||||
github.com/taosdata/driver-go/v3 v3.5.0 h1:30crN+E+ACURmq28kn3Y8B3jfL5knaC1fc1rLvgyXqs=
|
||||
github.com/taosdata/driver-go/v3 v3.5.0/go.mod h1:H2vo/At+rOPY1aMzUV9P49SVX7NlXb3LAbKw+MCLrmU=
|
||||
github.com/xuri/efp v0.0.0-20210322160811-ab561f5b45e3 h1:EpI0bqf/eX9SdZDwlMmahKM+CDBgNbsXMhsN28XrM8o=
|
||||
@@ -408,7 +407,7 @@ go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
|
||||
go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
|
||||
go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
|
||||
go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
|
||||
go.uber.org/goleak v1.2.1 h1:NBol2c7O1ZokfZ0LEU9K6Whx/KnwvepVetCUhtKja4A=
|
||||
go.uber.org/goleak v1.1.12 h1:gZAh5/EyT/HQwlpkCy6wTpqfH9H8Lz8zbm3dZh+OyzA=
|
||||
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
|
||||
go.uber.org/multierr v1.3.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4=
|
||||
go.uber.org/multierr v1.5.0/go.mod h1:FeouvMocqHpRaaGuG9EjoKcStLC43Zu/fmqdUMPcKYU=
|
||||
|
||||
@@ -14,7 +14,6 @@ import (
|
||||
"pandax/pkg/global/model"
|
||||
"pandax/pkg/rule_engine"
|
||||
"pandax/pkg/rule_engine/message"
|
||||
"pandax/pkg/shadow"
|
||||
"pandax/pkg/tool"
|
||||
"pandax/pkg/websocket"
|
||||
)
|
||||
@@ -66,18 +65,7 @@ func (s *HookService) handleOne(msg *netbase.DeviceEventInfo) {
|
||||
global.Log.Error("规则链执行失败", err)
|
||||
return
|
||||
}
|
||||
// 保存设备影子
|
||||
if msg.Type != message.RpcRequestFromDevice {
|
||||
SetDeviceShadow(msg.DeviceAuth, ruleMessage.Msg, msg.Type)
|
||||
}
|
||||
case message.DisConnectMes, message.ConnectMes:
|
||||
//检测设备影子并修改设备影子状态
|
||||
if msg.Type == message.ConnectMes {
|
||||
shadow.InitDeviceShadow(msg.DeviceAuth.Name, msg.DeviceAuth.ProductId)
|
||||
shadow.DeviceShadowInstance.SetOnline(msg.DeviceAuth.Name)
|
||||
} else {
|
||||
shadow.DeviceShadowInstance.SetOffline(msg.DeviceAuth.Name)
|
||||
}
|
||||
// 更改设备在线状态
|
||||
if msg.Type == message.ConnectMes {
|
||||
services.DeviceModelDao.UpdateStatus(msg.DeviceId, global.ONLINE)
|
||||
@@ -167,30 +155,3 @@ func SendZtWebsocket(deviceId, message string) {
|
||||
websocket.SendMessage(CJNR, stageid)
|
||||
}
|
||||
}
|
||||
|
||||
// SetDeviceShadow 设置设备点
|
||||
func SetDeviceShadow(etoken *model.DeviceAuth, msgVals map[string]interface{}, msgType string) {
|
||||
defer func() {
|
||||
if err := recover(); err != nil {
|
||||
global.Log.Error(err)
|
||||
}
|
||||
}()
|
||||
|
||||
if msgType == message.RowMes {
|
||||
msgType = message.TelemetryMes
|
||||
}
|
||||
for key, value := range msgVals {
|
||||
if message.AttributesMes == msgType {
|
||||
err := shadow.DeviceShadowInstance.SetDevicePoint(etoken.Name, global.TslAttributesType, key, value)
|
||||
if err != nil {
|
||||
global.Log.Error("设置设备影子点失败", err)
|
||||
}
|
||||
}
|
||||
if message.TelemetryMes == msgType {
|
||||
err := shadow.DeviceShadowInstance.SetDevicePoint(etoken.Name, global.TslTelemetryType, key, value)
|
||||
if err != nil {
|
||||
global.Log.Error("设置设备影子点失败", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2,9 +2,10 @@ package netbase
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"pandax/apps/device/entity"
|
||||
"pandax/apps/device/services"
|
||||
"pandax/iothub/server/emqxserver/protobuf"
|
||||
exhook "pandax/iothub/server/emqxserver/protobuf"
|
||||
"pandax/kit/utils"
|
||||
"pandax/pkg/cache"
|
||||
"pandax/pkg/global"
|
||||
@@ -91,20 +92,24 @@ func CreateSubTableField(productId, ty string, fields map[string]interface{}) {
|
||||
if key == "ts" {
|
||||
return
|
||||
}
|
||||
interfaceType := tool.GetInterfaceType(value)
|
||||
// 向产品tsl中添加模型
|
||||
err := global.TdDb.AddSTableField(productId+"_"+ty, key, interfaceType, 0)
|
||||
if err != nil {
|
||||
return
|
||||
check := cache.CheckSubDeviceField(key)
|
||||
if !check {
|
||||
interfaceType := tool.GetInterfaceType(value)
|
||||
err := global.TdDb.AddSTableField(productId+"_"+ty, key, interfaceType, 0)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
tsl := entity.ProductTemplate{}
|
||||
tsl.Pid = productId
|
||||
tsl.Id = utils.GenerateID()
|
||||
tsl.Name = key
|
||||
tsl.Type = interfaceType
|
||||
tsl.Key = key
|
||||
tsl.Classify = ty
|
||||
// 向产品tsl中添加模型
|
||||
services.ProductTemplateModelDao.Insert(tsl)
|
||||
cache.SetSubDeviceField(key)
|
||||
}
|
||||
tsl := entity.ProductTemplate{}
|
||||
tsl.Pid = productId
|
||||
tsl.Id = utils.GenerateID()
|
||||
tsl.Name = key
|
||||
tsl.Type = interfaceType
|
||||
tsl.Key = key
|
||||
tsl.Classify = ty
|
||||
services.ProductTemplateModelDao.Insert(tsl)
|
||||
}(key, value)
|
||||
}
|
||||
group.Wait()
|
||||
@@ -187,12 +192,11 @@ func GetRequestIdFromTopic(reg, topic string) (requestId string) {
|
||||
|
||||
func CreateConnectionInfo(msgType, protocol, clientID, peerHost string, deviceAuth *model.DeviceAuth) *DeviceEventInfo {
|
||||
ts := time.Now().Format("2006-01-02 15:04:05.000")
|
||||
ci := &tdengine.ConnectInfo{
|
||||
ClientID: clientID,
|
||||
ci := &tdengine.Events{
|
||||
DeviceId: deviceAuth.DeviceId,
|
||||
PeerHost: peerHost,
|
||||
Protocol: protocol,
|
||||
Type: msgType,
|
||||
Name: msgType,
|
||||
Type: "info",
|
||||
Content: fmt.Sprintf("设备%s, %s 事件", deviceAuth.Name, msgType),
|
||||
Ts: ts,
|
||||
}
|
||||
v, err := json.Marshal(*ci)
|
||||
|
||||
@@ -16,7 +16,7 @@ type DeviceEventInfo struct {
|
||||
func (j *DeviceEventInfo) Bytes() []byte {
|
||||
b, err := json.Marshal(j)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
return nil
|
||||
}
|
||||
return b
|
||||
}
|
||||
|
||||
31
pkg/cache/sub_device.go
vendored
Normal file
31
pkg/cache/sub_device.go
vendored
Normal file
@@ -0,0 +1,31 @@
|
||||
package cache
|
||||
|
||||
import (
|
||||
"pandax/kit/cache"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
var SubDeviceField = cache.NewTimedCache(cache.NoExpiration, 24*time.Hour)
|
||||
var SUBDEVICEKEY = "SUBDEVICEKEY"
|
||||
|
||||
func CheckSubDeviceField(field string) bool {
|
||||
fields, bool := SubDeviceField.Get(SUBDEVICEKEY)
|
||||
if !bool {
|
||||
return false
|
||||
}
|
||||
if !strings.Contains(fields.(string), field) {
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func SetSubDeviceField(data string) {
|
||||
fields, bool := SubDeviceField.Get(SUBDEVICEKEY)
|
||||
if !bool {
|
||||
fields = data
|
||||
} else {
|
||||
fields = fields.(string) + "," + data
|
||||
}
|
||||
ProductCache.Put(SUBDEVICEKEY, fields)
|
||||
}
|
||||
@@ -4,6 +4,7 @@ const (
|
||||
TslAttributesType = "attributes"
|
||||
TslTelemetryType = "telemetry"
|
||||
TslCommandsType = "commands"
|
||||
TslEventType = "events"
|
||||
)
|
||||
|
||||
// 告警等级
|
||||
|
||||
@@ -20,11 +20,3 @@ type TableDataInfo struct {
|
||||
Filed []string `json:"filed" description:"字段"`
|
||||
Info []map[string]interface{} `json:"info" description:"数据"`
|
||||
}
|
||||
|
||||
// 日志 TDengine
|
||||
type TdLog struct {
|
||||
Ts string `json:"ts" dc:"时间"`
|
||||
Device string `json:"device" dc:"设备标识"`
|
||||
Type string `json:"type" dc:"日志类型"`
|
||||
Content string `json:"content" dc:"日志内容"`
|
||||
}
|
||||
|
||||
@@ -4,22 +4,20 @@ import (
|
||||
"fmt"
|
||||
)
|
||||
|
||||
const connectTableName = "device_connect"
|
||||
const connectTableName = "events"
|
||||
|
||||
type ConnectInfo struct {
|
||||
Ts string `json:"ts"`
|
||||
ClientID string `json:"clientId"`
|
||||
Type string `json:"type"` // 连接类型
|
||||
PeerHost string `json:"peerHost"`
|
||||
SocketPort string `json:"sockPort"`
|
||||
Protocol string `json:"protocol"`
|
||||
DeviceId string `json:"deviceId"`
|
||||
type Events struct {
|
||||
Ts string `json:"ts"`
|
||||
Name string `json:"name"` //标识 connet
|
||||
Type string `json:"type"` // 事件类型 info alarm fault
|
||||
Content string `json:"content"` // 事件描述
|
||||
DeviceId string `json:"deviceId"`
|
||||
}
|
||||
|
||||
// CreateEventTable 创建设备连接事件表
|
||||
func (s *TdEngine) CreateEventTable() (err error) {
|
||||
sql := fmt.Sprintf(`CREATE TABLE IF NOT EXISTS %s.%s (ts TIMESTAMP,deviceId NCHAR(64),
|
||||
type NCHAR(64),clientId NCHAR(64),peerHost NCHAR(64),sockPort NCHAR(64),protocol NCHAR(64))`, s.dbName, connectTableName)
|
||||
sql := fmt.Sprintf(`CREATE TABLE IF NOT EXISTS %s.%s (ts TIMESTAMP,deviceId NCHAR(64),name NCHAR(64),
|
||||
type NCHAR(64),content NCHAR(255))`, s.dbName, connectTableName)
|
||||
_, err = s.db.Exec(sql)
|
||||
return
|
||||
}
|
||||
@@ -27,3 +25,25 @@ func (s *TdEngine) CreateEventTable() (err error) {
|
||||
func (s *TdEngine) InsertEvent(data map[string]any) (err error) {
|
||||
return s.InsertDevice(connectTableName, data)
|
||||
}
|
||||
|
||||
func (s *TdEngine) GetAllEvents(sql string, args ...any) (list []Events, err error) {
|
||||
rows, err := s.db.Query(sql, args...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
for rows.Next() {
|
||||
var event Events
|
||||
|
||||
err = rows.Scan(&event.Ts, &event.DeviceId, &event.Name, &event.Type, &event.Content)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
event.Ts = s.Time(event.Ts)
|
||||
|
||||
list = append(list, event)
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
@@ -1,21 +1,37 @@
|
||||
package tdengine
|
||||
|
||||
import "time"
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
const logTableName = "device_log"
|
||||
"github.com/kakuilan/kgo"
|
||||
)
|
||||
|
||||
const logTableName = "logs"
|
||||
|
||||
// 日志 TDengine
|
||||
type TdLog struct {
|
||||
Ts string `json:"ts" dc:"时间"`
|
||||
DeviceId string `json:"deviceId" dc:"设备标识"`
|
||||
TraceId string `json:"traceId" dc:"追踪"`
|
||||
Type string `json:"type" dc:"日志类型"` // 命令调用 上行 下行
|
||||
Content string `json:"content" dc:"日志内容"`
|
||||
}
|
||||
|
||||
// CreateLogStable 添加LOG超级表
|
||||
func (s *TdEngine) CreateLogStable() (err error) {
|
||||
sql := "CREATE STABLE IF NOT EXISTS ? (ts TIMESTAMP, type VARCHAR(20), content VARCHAR(1000)) TAGS (device VARCHAR(255))"
|
||||
sql := "CREATE STABLE IF NOT EXISTS ? (ts TIMESTAMP,deviceId NCHAR(64),traceId NCHAR(64),type NCHAR(20), content VARCHAR(1000))"
|
||||
_, err = s.db.Exec(sql, logTableName)
|
||||
return
|
||||
}
|
||||
|
||||
// InsertLog 写入数据
|
||||
func (s *TdEngine) InsertLog(log *TdLog) (err error) {
|
||||
sql := "INSERT INTO log_? USING device_log TAGS (?) VALUES (?, ?, ?)"
|
||||
_, err = s.db.Exec(sql, log.Device, log.Ts, log.Type, log.Content)
|
||||
|
||||
logs, err := kgo.KConv.Struct2Map(*log, "")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = s.InsertDevice(logTableName, logs)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -23,7 +39,7 @@ func (s *TdEngine) InsertLog(log *TdLog) (err error) {
|
||||
func (s *TdEngine) ClearLog() (err error) {
|
||||
ts := time.Now().Add(-7 * 24 * time.Hour).Format("2006-01-02")
|
||||
|
||||
sql := "DELETE FROM device_log WHERE ts < ?"
|
||||
sql := fmt.Sprintf("DELETE FROM %s WHERE ts < ?", logTableName)
|
||||
_, err = s.db.Exec(sql, ts)
|
||||
|
||||
return
|
||||
@@ -40,7 +56,7 @@ func (s *TdEngine) GetAllLog(sql string, args ...any) (list []TdLog, err error)
|
||||
for rows.Next() {
|
||||
var log TdLog
|
||||
|
||||
err = rows.Scan(&log.Ts, &log.Type, &log.Content, &log.Device)
|
||||
err = rows.Scan(&log.Ts, &log.DeviceId, &log.TraceId, &log.Type, &log.Content)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user