[feat]网关子设备,直接上传,自动创建设备模型,无需手动创建

This commit is contained in:
PandaX
2023-10-17 11:14:18 +08:00
parent f26c5a2647
commit 55e399e5cb
23 changed files with 1457 additions and 74 deletions

View File

@@ -16,8 +16,10 @@ type DeviceGroup struct {
Path string `json:"path" gorm:"type:varchar(255);comment:设备分组路径"`
Description string `json:"description" gorm:"type:varchar(255);comment:设备分组说明"`
Sort int64 `json:"sort" gorm:"type:int;comment:排序"`
Status string `gorm:"status;type:varchar(1);comment:状态" json:"status"`
Status string `gorm:"type:varchar(1);comment:状态" json:"status"`
Ext Ext `json:"ext" gorm:"type:json;comment:扩展"` //可扩展的kv map,承载设备组的外围信息
IsDefault string `gorm:"type:varchar(1);comment:是否默认" json:"isDefault"`
Children []DeviceGroup `json:"children" gorm:"-"` //子节点
RoleId int64 `gorm:"-"` // 角色数据权限

View File

@@ -47,8 +47,8 @@ type Product struct {
ProtocolName string `json:"protocolName" gorm:"type:varchar(64);comment:协议名称"` //MQTT COAP WebSocket LwM2M
DeviceType string `json:"deviceType" gorm:"type:varchar(64);comment:设备类型"` // 直连设备 网关设备 网关子设备 监控设备
RuleChainId string `json:"ruleChainId" gorm:"type:varchar(64);comment:规则链Id"` //可空,如果空就走根规则链
Status string `gorm:"status;type:varchar(1);comment:状态" json:"status"`
Status string `gorm:"type:varchar(1);comment:状态" json:"status"`
IsDefault string `gorm:"type:varchar(1);comment:是否默认" json:"isDefault"`
RoleId int64 `gorm:"-"` // 角色数据权限
}

View File

@@ -41,21 +41,26 @@ func (m *deviceModelImpl) Insert(data entity.Device) *entity.Device {
biz.IsTrue(list != nil && len(*list) == 0, "设备名称已经存在")
//2 创建认证TOKEN IOTHUB使用
token := GetDeviceToken(&data)
err := global.RedisDb.Set(data.Token, token.GetMarshal(), time.Hour*24*365)
biz.ErrIsNil(err, "设备缓存失败")
// 子网关不需要设置token
if data.DeviceType == global.GATEWAYS {
data.Token = ""
err := global.RedisDb.Set(data.Name, token.GetMarshal(), time.Hour*24*365)
biz.ErrIsNil(err, "设备缓存失败")
} else {
err := global.RedisDb.Set(data.Token, token.GetMarshal(), time.Hour*24*365)
biz.ErrIsNil(err, "设备缓存失败")
}
//3 添加设备
err = tx.Table(m.table).Create(&data).Error
err := tx.Table(m.table).Create(&data).Error
biz.ErrIsNil(err, "添加设备失败")
// 创建超级表 失败就
if data.Pid != "" {
err = createDeviceTable(data.Pid, data.Name)
if err != nil {
tx.Rollback()
biz.ErrIsNil(err, "添加设备失败,设备表创建失败")
}
}
tx.Commit()
return &data
}
@@ -152,6 +157,7 @@ func (m *deviceModelImpl) FindList(data entity.Device) *[]entity.DeviceRes {
return &list
}
// TODO 如果更改的是产品tdengine的设备表也要更改
func (m *deviceModelImpl) Update(data entity.Device) *entity.Device {
if data.DeviceType == global.GATEWAYS {
data.Token = ""

View File

@@ -12,6 +12,7 @@ type (
ProductModel interface {
Insert(data entity.Product) *entity.Product
FindOne(id string) *entity.ProductRes
FindDefault() *entity.Product
FindListPage(page, pageSize int, data entity.Product) (*[]entity.ProductRes, int64)
FindList(data entity.Product) *[]entity.ProductRes
Update(data entity.Product) *entity.Product
@@ -51,6 +52,14 @@ func (m *productModelImpl) FindOne(id string) *entity.ProductRes {
return resData
}
func (m *productModelImpl) FindDefault() *entity.Product {
resData := new(entity.Product)
err := global.Db.Table(m.table).Where("is_default = ?", "1").First(resData).Error
log.Println(err)
biz.ErrIsNil(err, "查询默认产品失败")
return resData
}
func (m *productModelImpl) FindListPage(page, pageSize int, data entity.Product) (*[]entity.ProductRes, int64) {
list := make([]entity.ProductRes, 0)
var total int64 = 0
@@ -77,7 +86,6 @@ func (m *productModelImpl) FindListPage(page, pageSize int, data entity.Product)
}
err := db.Count(&total).Error
err = db.Order("create_time").Preload("ProductCategory").Limit(pageSize).Offset(offset).Find(&list).Error
log.Println(err)
biz.ErrIsNil(err, "查询产品分页列表失败")
return &list, total
}
@@ -111,8 +119,7 @@ func (m *productModelImpl) FindList(data entity.Product) *[]entity.ProductRes {
func (m *productModelImpl) Update(data entity.Product) *entity.Product {
// go的一些默认值 int 0 bool false 保存失败需要先转成map
err := global.Db.Table(m.table).Where("id = ?", data.Id).Updates(data).Error
log.Println("update", err)
//biz.ErrIsNil(, "修改产品失败")
biz.ErrIsNil(err, "修改产品失败")
return &data
}

View File

@@ -2,7 +2,6 @@ package services
import (
"github.com/PandaXGO/PandaKit/biz"
"log"
"pandax/apps/device/entity"
"pandax/pkg/global"
)
@@ -117,7 +116,6 @@ func (m *otaModelImpl) Update(data entity.ProductOta) *entity.ProductOta {
return &data
}
func (m *otaModelImpl) updateLatest(id string, IsLatest bool) {
log.Println(id, IsLatest)
err := global.Db.Table(m.table).Where("id = ?", id).Update("is_latest", IsLatest).Error
global.Log.Error("更新失败", err)
}

View File

@@ -4,7 +4,6 @@ import (
"github.com/PandaXGO/PandaKit/biz"
"github.com/PandaXGO/PandaKit/model"
"github.com/PandaXGO/PandaKit/restfulx"
"log"
"pandax/apps/job/api/from"
"pandax/apps/job/entity"
"pandax/apps/job/jobs"
@@ -86,7 +85,6 @@ func (l *JobApi) StartJobForService(rc *restfulx.ReqCtx) {
j.OrgId = job.OrgId
j.Owner = job.Owner
job.EntryId, err = jobs.AddJob(jobs.Crontab, j)
log.Println(err)
biz.ErrIsNil(err, "添加JOB失败")
l.JobApp.Update(*job)

View File

@@ -4,7 +4,6 @@ import (
"github.com/PandaXGO/PandaKit/model"
"github.com/PandaXGO/PandaKit/restfulx"
"github.com/PandaXGO/PandaKit/utils"
"log"
"pandax/apps/log/entity"
"pandax/apps/log/services"
)
@@ -36,7 +35,6 @@ func (l *LogOperApi) GetOperLog(rc *restfulx.ReqCtx) {
func (l *LogOperApi) DeleteOperLog(rc *restfulx.ReqCtx) {
operIds := restfulx.PathParam(rc, "operId")
group := utils.IdsStrToIdsIntGroup(operIds)
log.Println("group", group)
l.LogOperApp.Delete(group)
}

View File

@@ -7,8 +7,6 @@ import (
"github.com/emicklei/go-restful/v3"
"github.com/kakuilan/kgo"
"github.com/mssola/user_agent"
"log"
"pandax/apps/system/api/form"
"pandax/apps/system/api/vo"
"pandax/apps/system/entity"
@@ -59,7 +57,6 @@ func (u *UserApi) RefreshToken(rc *restfulx.ReqCtx) {
func (u *UserApi) Login(rc *restfulx.ReqCtx) {
var l form.Login
restfulx.BindJsonAndValid(rc, &l)
log.Println(l)
biz.IsTrue(captcha.Verify(l.CaptchaId, l.Captcha), "验证码认证失败")
login := u.UserApp.Login(entity.Login{Username: l.Username, Password: l.Password})

View File

@@ -6,8 +6,10 @@ server:
# debug release test
model: release
port: 7788
# iothub使用的rpc端口 9000 9001 可能与minio端口冲突
# iothub服务端口 使用的rpc端口 9000 9001 可能与minio端口冲突
grpc-port: 9001
http-port: 9002
tcp-port: 9003
cors: true
# 数据上报 队列池
queue-num: 1000

File diff suppressed because it is too large Load Diff

View File

@@ -1,12 +1,10 @@
package mqttclient
import (
"encoding/json"
"errors"
"fmt"
mqtt "github.com/eclipse/paho.mqtt.golang"
"math/rand"
"pandax/pkg/global_model"
"time"
)
@@ -48,7 +46,7 @@ func (rpc RpcRequest) RequestCmd(rpcPayload string) (respPayload string, err err
}
rpc.Client.Sub(respTopic, 0, mqMessagePubHandler)
if rpc.Timeout == 0 {
rpc.Timeout = 30
rpc.Timeout = 20
}
defer func() {
close(repsChan)
@@ -64,19 +62,6 @@ func (rpc RpcRequest) RequestCmd(rpcPayload string) (respPayload string, err err
}
}
// RequestAttributes rpc 下发属性
func (rpc RpcRequest) RequestAttributes(rpcPayload global_model.RpcPayload) error {
topic := fmt.Sprintf(RpcReqTopic, rpc.RequestId)
if rpcPayload.Method == "" {
rpcPayload.Method = "setAttributes"
}
payload, err := json.Marshal(rpcPayload)
if err != nil {
return err
}
return rpc.Client.Pub(topic, 0, string(payload))
}
func (rpc RpcRequest) Pub(reqPayload string) error {
topic := fmt.Sprintf(RpcRespTopic, rpc.RequestId)
return rpc.Client.Pub(topic, 0, reqPayload)

View File

@@ -82,14 +82,14 @@ func (s *HookService) handleOne(msg *netbase.DeviceEventInfo) {
//检测设备影子并修改设备影子状态
if msg.Type == message.ConnectMes {
shadow.InitDeviceShadow(msg.DeviceAuth.Name, msg.DeviceAuth.ProductId)
shadow.DeviceShadowInstance.SetOnline(msg.DeviceAuth.Name)
err := shadow.DeviceShadowInstance.SetOnline(msg.DeviceAuth.Name)
log.Println(err)
} else {
shadow.DeviceShadowInstance.SetOffline(msg.DeviceAuth.Name)
}
// 更改设备在线状态
if msg.Type == message.ConnectMes {
err := services.DeviceModelDao.UpdateStatus(msg.DeviceId, global.ONLINE)
global.Log.Error(err)
services.DeviceModelDao.UpdateStatus(msg.DeviceId, global.ONLINE)
} else {
services.DeviceModelDao.UpdateStatus(msg.DeviceId, global.OFFLINE)
}
@@ -177,7 +177,7 @@ func SetDeviceShadow(etoken *global_model.DeviceAuth, msgVals map[string]interfa
biz.ErrIsNilAppendErr(err, "设置设备影子点失败")
}
if message.TelemetryMes == msgType {
log.Println(etoken.Name, global.TslTelemetryType, tel.Key, msgVals[tel.Key])
log.Println(etoken.Name)
err := shadow.DeviceShadowInstance.SetDevicePoint(etoken.Name, global.TslTelemetryType, tel.Key, msgVals[tel.Key])
biz.ErrIsNilAppendErr(err, "设置设备影子点失败")
}

View File

@@ -1,23 +1,25 @@
package iothub
import (
"fmt"
"pandax/iothub/hook_message_work"
"pandax/iothub/server/emqxserver"
"pandax/iothub/server/httpserver"
"pandax/iothub/server/tcpserver"
updserver "pandax/iothub/server/udpserver"
"pandax/pkg/global"
)
func InitIothub() {
service := hook_message_work.NewHookService()
// 初始化EMQX
go emqxserver.InitEmqxHook("", service)
go emqxserver.InitEmqxHook(fmt.Sprintf(":%d", global.Conf.Server.GrpcPort), service)
// 初始化HTTP
go httpserver.InitHttpHook("", service)
go httpserver.InitHttpHook(fmt.Sprintf(":%d", global.Conf.Server.HttpPort), service)
//初始化TCP
go tcpserver.InitTcpHook("", service)
go tcpserver.InitTcpHook(fmt.Sprintf(":%d", global.Conf.Server.TcpPort), service)
go updserver.InitUdpHook("", service)
go updserver.InitUdpHook(fmt.Sprintf(":%d", global.Conf.Server.TcpPort), service)
// 开启线程处理消息
go service.MessageWork()
}

View File

@@ -2,19 +2,22 @@ package netbase
import (
"encoding/json"
"pandax/apps/device/entity"
"pandax/apps/device/services"
"pandax/iothub/server/emqxserver/protobuf"
"pandax/pkg/global"
"pandax/pkg/global_model"
"pandax/pkg/tdengine"
"pandax/pkg/tool"
"regexp"
"strings"
"sync"
"time"
)
func Auth(authToken string) bool {
// 根据token去查设备Id以及设备类型
if authToken == "pandax" {
if authToken == global.Conf.Mqtt.Username {
return true
}
etoken := &global_model.DeviceAuth{}
@@ -48,7 +51,14 @@ func Auth(authToken string) bool {
return true
}
// SubAuth 获取子设备的认证信息
func SubAuth(name string) (*global_model.DeviceAuth, bool) {
defer func() {
if Rerr := recover(); Rerr != nil {
global.Log.Error(Rerr)
return
}
}()
etoken := &global_model.DeviceAuth{}
// redis 中有就查询,没有就添加
exists, err := global.RedisDb.Exists(global.RedisDb.Context(), name).Result()
@@ -56,15 +66,14 @@ func SubAuth(name string) (*global_model.DeviceAuth, bool) {
err = etoken.GetDeviceToken(name)
} else {
device, err := services.DeviceModelDao.FindOneByName(name)
// 没有设备就要创建设备
// 没有设备就要创建设备
if err != nil {
global.Log.Infof("设备标识 %s 不存在", name)
global.Log.Infof("设备标识 %s 不存在, ", name)
return nil, false
}
etoken = services.GetDeviceToken(&device.Device)
etoken.DeviceProtocol = device.Product.ProtocolName
// todo 子设备没有token
err = global.RedisDb.Set(device.Token, etoken.GetMarshal(), time.Hour*24*365)
err = global.RedisDb.Set(name, etoken.GetMarshal(), time.Hour*24*365)
if err != nil {
global.Log.Infof("设备标识 %s添加缓存失败", name)
return nil, false
@@ -77,6 +86,58 @@ func SubAuth(name string) (*global_model.DeviceAuth, bool) {
return etoken, true
}
// CreateSubTable 创建子设备表
func CreateSubTable(productId, deviceName string) error {
err := global.TdDb.CreateTable(productId+"_"+entity.ATTRIBUTES_TSL, deviceName+"_"+entity.ATTRIBUTES_TSL)
if err != nil {
global.Log.Error(err)
return err
}
err = global.TdDb.CreateTable(productId+"_"+entity.TELEMETRY_TSL, deviceName+"_"+entity.TELEMETRY_TSL)
if err != nil {
global.Log.Error(err)
return err
}
return nil
}
// CreateSubTableField 添加子设备字段
func CreateSubTableField(productId, ty string, fields map[string]interface{}) {
var group sync.WaitGroup
for key, value := range fields {
group.Add(1)
go func(key string, value any) {
defer func() {
group.Done()
if err := recover(); err != nil {
global.Log.Error("自动创建tsl错误", err)
global.TdDb.DelTableField(productId+"_"+ty, key)
}
}()
if key == "ts" {
return
}
interfaceType := tool.GetInterfaceType(value)
// 向产品tsl中添加模型
err := global.TdDb.AddSTableField(productId+"_"+ty, key, interfaceType, 0)
if err != nil {
return
}
one := services.ProductModelDao.FindOne(productId)
tsl := entity.ProductTemplate{}
tsl.Pid = productId
tsl.Id = global_model.GenerateID()
tsl.OrgId = one.OrgId
tsl.Name = key
tsl.Type = interfaceType
tsl.Key = key
tsl.Classify = ty
services.ProductTemplateModelDao.Insert(tsl)
}(key, value)
}
group.Wait()
}
// 解析遥测数据类型 返回标准带时间戳格式
func UpdateDeviceTelemetryData(data string) map[string]interface{} {
tel := make(map[string]interface{})
@@ -94,7 +155,12 @@ func UpdateDeviceTelemetryData(data string) map[string]interface{} {
}
resTel := make(map[string]interface{})
json.Unmarshal(marshal, &resTel)
resTel["ts"] = tel["ts"]
format := tool.TimeToFormat(tel["ts"])
if format == "" {
return nil
}
resTel["ts"] = format
return resTel
}
}

View File

@@ -186,6 +186,7 @@ func (s *HookGrpcService) OnMessagePublish(ctx context.Context, in *exhook2.Mess
// 获取topic类型
ts := time.Now().Format("2006-01-02 15:04:05.000")
eventType := IotHubTopic.GetMessageType(in.Message.Topic)
datas := string(in.GetMessage().GetPayload())
data := &netbase.DeviceEventInfo{
Type: eventType,
@@ -193,6 +194,7 @@ func (s *HookGrpcService) OnMessagePublish(ctx context.Context, in *exhook2.Mess
DeviceId: etoken.DeviceId,
DeviceAuth: etoken,
}
// 如果是网关子设备单独处理
if eventType == message.GATEWAY {
subData := make(map[string]interface{})
@@ -203,10 +205,9 @@ func (s *HookGrpcService) OnMessagePublish(ctx context.Context, in *exhook2.Mess
return res, nil
}
// key就是device name
for key, value := range subData {
auth, isSub := netbase.SubAuth(key)
for deviceName, value := range subData {
auth, isSub := netbase.SubAuth(deviceName)
if !isSub {
global.Log.Infof("子设备%s 标识不存在,请先创建设备标识", key)
continue
}
data.DeviceAuth = auth
@@ -220,6 +221,8 @@ func (s *HookGrpcService) OnMessagePublish(ctx context.Context, in *exhook2.Mess
}
bytes, _ := json.Marshal(attributesData)
data.Datas = string(bytes)
// 创建tdengine的设备属性表
netbase.CreateSubTableField(auth.ProductId, global.TslAttributesType, attributesData)
// 子设备发送到队列里
s.HookService.MessageCh <- data
}
@@ -233,6 +236,8 @@ func (s *HookGrpcService) OnMessagePublish(ctx context.Context, in *exhook2.Mess
}
bytes, _ := json.Marshal(telemetryData)
data.Datas = string(bytes)
// 创建tdengine的设备遥测表
netbase.CreateSubTableField(auth.ProductId, global.TslTelemetryType, telemetryData)
// 子设备发送到队列里
s.HookService.MessageCh <- data
}

View File

@@ -3,7 +3,6 @@ package updserver
import (
"context"
"encoding/hex"
"log"
"net"
"pandax/iothub/hook_message_work"
"pandax/iothub/netbase"
@@ -44,7 +43,6 @@ func InitUdpHook(addr string, hs *hook_message_work.HookService) {
}
func (hhs *HookUdpService) hook(data []byte) {
log.Println("udp msg", string(data))
hhs.Send("success")
}

View File

@@ -5,6 +5,8 @@ import "fmt"
type Server struct {
Port int `yaml:"port"`
GrpcPort int `yaml:"grpc-port"`
TcpPort int `yaml:"tcp-port"`
HttpPort int `yaml:"http-port"`
Model string `yaml:"model"`
Cors bool `yaml:"cors"`
Rate *Rate `yaml:"rate"`

View File

@@ -1,7 +1,6 @@
package initialize
import (
"log"
"pandax/apps/device/entity"
"pandax/apps/device/services"
"pandax/pkg/events"
@@ -18,7 +17,6 @@ func InitEvents() {
})
if list != nil {
for _, product := range *list {
log.Println("修改了产品的规则链", product.Id)
global.ProductCache.Put(product.Id, codeData)
}
}

View File

@@ -3,7 +3,6 @@ package nodes
import (
"encoding/json"
"github.com/sirupsen/logrus"
"log"
"pandax/apps/device/services"
"pandax/pkg/global"
"pandax/pkg/rule_engine/message"
@@ -36,7 +35,6 @@ func (n *clearAlarmNode) Handle(msg *message.Message) error {
alarm := services.DeviceAlarmModelDao.FindOneByType(msg.Metadata.GetValue("deviceId").(string), n.AlarmType, "0")
if alarm.DeviceId != "" {
log.Println("清除告警")
alarm.State = global.CLEARED
marshal, _ := json.Marshal(msg.Msg)
alarm.Details = string(marshal)

View File

@@ -3,7 +3,6 @@ package nodes
import (
"errors"
"github.com/sirupsen/logrus"
"log"
"pandax/pkg/rule_engine/manifest"
"pandax/pkg/rule_engine/message"
)
@@ -71,7 +70,6 @@ func GetNodes(m *manifest.Manifest) (map[string]Node, error) {
metadata := NewMetadataWithValues(n.Properties)
node, err := NewNode(n.Type, n.Id, metadata)
if err != nil {
log.Println(err)
logrus.Errorf("new node '%s' failure", n.Id)
continue
}

View File

@@ -45,7 +45,7 @@ func (s *TdEngine) CreateStable(table string) (err error) {
// CreateTable 添加子表
func (s *TdEngine) CreateTable(stable, table string) (err error) {
sql := fmt.Sprintf("CREATE TABLE %s USING %s TAGS ('%s')", table, stable, table)
sql := fmt.Sprintf("CREATE TABLE IF NOT EXISTS %s USING %s TAGS ('%s')", table, stable, table)
_, err = s.db.Exec(sql)
return
}

View File

@@ -2,7 +2,9 @@ package tool
import (
"encoding/json"
"pandax/pkg/global"
"strings"
"time"
)
// SnakeString snake string, XxYy to xx_yy , XxYY to xx_y_y
@@ -100,3 +102,26 @@ func StringToStruct(m string, s interface{}) error {
}
return nil
}
func TimeToFormat(val interface{}) string {
switch v := val.(type) {
case int64:
// 如果是时间戳类型,将其转换为时间对象
t := time.Unix(v, 0)
// 格式化时间字符串
formattedTime := t.Format("2006-01-02 15:04:05")
return formattedTime
case string:
// 如果是字符串类型,将其解析为时间对象
t, err := time.Parse("2006-01-02 15:04:05", v)
if err != nil {
global.Log.Error("时间格式非标准格式")
return ""
}
// 格式化时间字符串
formattedTime := t.Format("2006-01-02 15:04:05")
return formattedTime
default:
return ""
}
}

View File

@@ -1 +1 @@
taskkill /pid 31740 -t -f
taskkill /pid 15028 -t -f