diff --git a/apps/device/api/device.go b/apps/device/api/device.go index d99dc63..6e2e4cd 100644 --- a/apps/device/api/device.go +++ b/apps/device/api/device.go @@ -124,10 +124,8 @@ func (p *DeviceApi) InsertDevice(rc *restfulx.ReqCtx) { data.LinkStatus = global.INACTIVE data.LastAt = time.Now() p.DeviceApp.Insert(data) - // 视频设备不创建超级表 - if data.DeviceType != global.MONITOR { - createDeviceTable(data.Pid, data.Name) - } + // 创建超级表 + createDeviceTable(data.Pid, data.Name) } // UpdateDevice 修改Device @@ -145,9 +143,7 @@ func (p *DeviceApi) DeleteDevice(rc *restfulx.ReqCtx) { for _, id := range ids { list := p.DeviceApp.FindOne(id) // 删除表 - if list.DeviceType != global.MONITOR { - deleteDeviceTable(list.Name) - } + deleteDeviceTable(list.Name) } p.DeviceApp.Delete(ids) } diff --git a/apps/device/api/device_alarm.go b/apps/device/api/device_alarm.go index ede50e8..9da0be7 100644 --- a/apps/device/api/device_alarm.go +++ b/apps/device/api/device_alarm.go @@ -16,13 +16,15 @@ type DeviceAlarmApi struct { // GetDeviceAlarmList 告警列表数据 func (p *DeviceAlarmApi) GetDeviceAlarmList(rc *restfulx.ReqCtx) { - data := entity.DeviceAlarm{} + data := entity.DeviceAlarmForm{} pageNum := restfulx.QueryInt(rc, "pageNum", 1) pageSize := restfulx.QueryInt(rc, "pageSize", 10) data.DeviceId = restfulx.QueryParam(rc, "deviceId") data.Type = restfulx.QueryParam(rc, "type") data.Level = restfulx.QueryParam(rc, "level") data.State = restfulx.QueryParam(rc, "state") + data.StartTime = restfulx.QueryParam(rc, "startTime") + data.EndTime = restfulx.QueryParam(rc, "endTime") list, total := p.DeviceAlarmApp.FindListPage(pageNum, pageSize, data) diff --git a/apps/device/api/product.go b/apps/device/api/product.go index ac335d9..00c0582 100644 --- a/apps/device/api/product.go +++ b/apps/device/api/product.go @@ -70,10 +70,8 @@ func (p *ProductApi) InsertProduct(rc *restfulx.ReqCtx) { data.Id = kgo.KStr.Uniqid("p_") data.Owner = rc.LoginAccount.UserName p.ProductApp.Insert(data) - // 创建taos数据库超级表 摄像头产品不创建 - if data.DeviceType != global.MONITOR { - createDeviceStable(data.Id) - } + // 创建taos数据库超级表 + createDeviceStable(data.Id) } // UpdateProduct 修改Product diff --git a/apps/device/entity/device_exp.go b/apps/device/entity/device_exp.go index d0a00c9..1c840c1 100644 --- a/apps/device/entity/device_exp.go +++ b/apps/device/entity/device_exp.go @@ -19,6 +19,12 @@ type DeviceAlarm struct { Details string `gorm:"type:varchar(255);comment:告警详情" json:"details"` } +type DeviceAlarmForm struct { + DeviceAlarm + StartTime string `gorm:"-" json:"startTime"` + EndTime string `gorm:"-" json:"endTime"` +} + type DeviceCmdLog struct { Id string `json:"id" gorm:"primary_key;"` DeviceId string `gorm:"type:varchar(64);comment:所属设备" json:"deviceId"` diff --git a/apps/device/services/device.go b/apps/device/services/device.go index 43ad352..568fabe 100644 --- a/apps/device/services/device.go +++ b/apps/device/services/device.go @@ -35,7 +35,7 @@ func (m *deviceModelImpl) Insert(data entity.Device) *entity.Device { biz.IsTrue(list != nil && len(*list) == 0, "设备名称已经存在") //2 创建认证TOKEN IOTHUB使用 etoken := getDeviceToken(&data) - if data.DeviceType != global.GATEWAYS && data.DeviceType != global.MONITOR { + if data.DeviceType != global.GATEWAYS { data.Token = etoken.Token } //3 添加设备 diff --git a/apps/device/services/device_alarm.go b/apps/device/services/device_alarm.go index b357e91..039ffb0 100644 --- a/apps/device/services/device_alarm.go +++ b/apps/device/services/device_alarm.go @@ -11,7 +11,7 @@ type ( Insert(data entity.DeviceAlarm) error FindOne(id string) *entity.DeviceAlarm FindOneByType(deviceId, ty, state string) *entity.DeviceAlarm - FindListPage(page, pageSize int, data entity.DeviceAlarm) (*[]entity.DeviceAlarm, int64) + FindListPage(page, pageSize int, data entity.DeviceAlarmForm) (*[]entity.DeviceAlarm, int64) Update(data entity.DeviceAlarm) error Delete(ids []string) } @@ -46,7 +46,7 @@ func (m *alarmModelImpl) FindOneByType(deviceId, ty, state string) *entity.Devic return resData } -func (m *alarmModelImpl) FindListPage(page, pageSize int, data entity.DeviceAlarm) (*[]entity.DeviceAlarm, int64) { +func (m *alarmModelImpl) FindListPage(page, pageSize int, data entity.DeviceAlarmForm) (*[]entity.DeviceAlarm, int64) { list := make([]entity.DeviceAlarm, 0) var total int64 = 0 offset := pageSize * (page - 1) @@ -66,6 +66,12 @@ func (m *alarmModelImpl) FindListPage(page, pageSize int, data entity.DeviceAlar if data.State != "" { db = db.Where("state = ?", data.State) } + if data.StartTime != "" { + db = db.Where("time > ?", data.StartTime) + } + if data.EndTime != "" { + db = db.Where("time < ?", data.EndTime) + } err := db.Count(&total).Error err = db.Order("time").Limit(pageSize).Offset(offset).Find(&list).Error biz.ErrIsNil(err, "查询设备告警分页列表失败") diff --git a/apps/device/services/product.go b/apps/device/services/product.go index f15c575..ea6adaf 100644 --- a/apps/device/services/product.go +++ b/apps/device/services/product.go @@ -32,9 +32,7 @@ var ProductModelDao ProductModel = &productModelImpl{ func (m *productModelImpl) Insert(data entity.Product) *entity.Product { // 添加产品及规则链到redis中 - if data.DeviceType != global.MONITOR { - setProductRule(&data) - } + setProductRule(&data) err := global.Db.Table(m.table).Create(&data).Error biz.ErrIsNil(err, "添加产品失败") return &data diff --git a/apps/rule/api/rulechain_log.go b/apps/rule/api/rulechain_log.go index 8c190fd..e25184c 100644 --- a/apps/rule/api/rulechain_log.go +++ b/apps/rule/api/rulechain_log.go @@ -6,7 +6,6 @@ import ( "pandax/apps/rule/entity" "pandax/apps/rule/services" "pandax/pkg/rule_engine/nodes" - "strings" ) type RuleChainMsgLogApi struct { @@ -23,6 +22,7 @@ func (p *RuleChainMsgLogApi) GetRuleChainMsgLogList(rc *restfulx.ReqCtx) { pageNum := restfulx.QueryInt(rc, "pageNum", 1) pageSize := restfulx.QueryInt(rc, "pageSize", 10) data.DeviceName = restfulx.QueryParam(rc, "deviceName") + data.MsgType = restfulx.QueryParam(rc, "msgType") list, total := p.RuleChainMsgLogApp.FindListPage(pageNum, pageSize, data) rc.ResData = model.ResultPage{ @@ -35,7 +35,8 @@ func (p *RuleChainMsgLogApi) GetRuleChainMsgLogList(rc *restfulx.ReqCtx) { // DeleteRuleChainMsgLog 删除规则链 func (p *RuleChainMsgLogApi) DeleteRuleChainMsgLog(rc *restfulx.ReqCtx) { - id := restfulx.PathParam(rc, "id") - ids := strings.Split(id, ",") - p.RuleChainMsgLogApp.Delete(ids) + data := entity.RuleChainMsgLog{} + data.DeviceName = restfulx.QueryParam(rc, "deviceName") + data.MsgType = restfulx.QueryParam(rc, "msgType") + p.RuleChainMsgLogApp.Delete(data) } diff --git a/apps/rule/entity/rulechain.go b/apps/rule/entity/rulechain.go index 3d91aab..d643975 100644 --- a/apps/rule/entity/rulechain.go +++ b/apps/rule/entity/rulechain.go @@ -29,12 +29,13 @@ func (RuleChain) TableName() string { } type RuleChainMsgLog struct { - MessageId string `json:"message_id"` - MsgType string `json:"msg_type"` - DeviceName string `json:"device_name"` - Ts time.Time `json:"ts"` - Content string `json:"content"` - CreatedAt time.Time // 创建时间 + MessageId string `gorm:"message_id;type:varchar(64);comment:消息Id" json:"messageId"` + MsgType string `gorm:"msg_type;type:varchar(64);comment:消息类型" json:"msgType"` + DeviceId string `gorm:"device_id;type:varchar(64);comment:设备ID" json:"deviceId"` + DeviceName string `gorm:"device_name;type:varchar(255);comment:设备名称" json:"deviceName"` + Ts time.Time `gorm:"ts;type:varchar(64);comment:时间" json:"ts"` + Content string `gorm:"content;type:varchar(1024);comment:内容" json:"content"` + CreatedAt time.Time `gorm:"column:create_at" json:"create_at"` // 创建时间 } func (RuleChainMsgLog) TableName() string { diff --git a/apps/rule/router/rulechain_log.go b/apps/rule/router/rulechain_log.go index 772cb03..88b988f 100644 --- a/apps/rule/router/rulechain_log.go +++ b/apps/rule/router/rulechain_log.go @@ -19,22 +19,24 @@ func InitRuleChainMsgLogRouter(container *restful.Container) { tags := []string{"RuleChainMsgLog"} ws.Route(ws.GET("/list").To(func(request *restful.Request, response *restful.Response) { - restfulx.NewReqCtx(request, response).WithLog("获取规则引擎分页列表").Handle(s.GetRuleChainMsgLogList) + restfulx.NewReqCtx(request, response).WithLog("获取规则引擎日志分页列表").Handle(s.GetRuleChainMsgLogList) }). - Doc("获取规则引擎分页列表"). + Doc("获取规则引擎日志分页列表"). Param(ws.QueryParameter("pageNum", "页数").Required(true).DataType("int")). Param(ws.QueryParameter("pageSize", "每页条数").Required(true).DataType("int")). - Param(ws.QueryParameter("ruleName", "规则名").Required(false).DataType("string")). + Param(ws.QueryParameter("deviceName", "设备名").Required(false).DataType("string")). + Param(ws.QueryParameter("msgType", "消息类型").Required(false).DataType("string")). Metadata(restfulspec.KeyOpenAPITags, tags). Writes(model.ResultPage{}). Returns(200, "OK", model.ResultPage{})) - ws.Route(ws.DELETE("/{id}").To(func(request *restful.Request, response *restful.Response) { + ws.Route(ws.GET("/delete").To(func(request *restful.Request, response *restful.Response) { restfulx.NewReqCtx(request, response).WithLog("删除规则引擎信息").Handle(s.DeleteRuleChainMsgLog) }). Doc("删除规则链日志信息"). - Metadata(restfulspec.KeyOpenAPITags, tags). - Param(ws.PathParameter("id", "多id 1,2,3").DataType("string"))) + Param(ws.QueryParameter("deviceName", "设备名").Required(false).DataType("string")). + Param(ws.QueryParameter("msgType", "消息类型").Required(false).DataType("string")). + Metadata(restfulspec.KeyOpenAPITags, tags)) container.Add(ws) diff --git a/apps/rule/services/rulechain_log.go b/apps/rule/services/rulechain_log.go index 4f349aa..a76682b 100644 --- a/apps/rule/services/rulechain_log.go +++ b/apps/rule/services/rulechain_log.go @@ -16,7 +16,7 @@ type ( RuleChainMsgLogModel interface { Insert(data entity.RuleChainMsgLog) *entity.RuleChainMsgLog FindListPage(page, pageSize int, data entity.RuleChainMsgLog) (*[]entity.RuleChainMsgLog, int64) - Delete(ids []string) + Delete(data entity.RuleChainMsgLog) } ruleChainLogModelImpl struct { @@ -55,6 +55,17 @@ func (m *ruleChainLogModelImpl) FindListPage(page, pageSize int, data entity.Rul return &list, total } -func (m *ruleChainLogModelImpl) Delete(ids []string) { - biz.ErrIsNil(global.Db.Table(m.table).Delete(&entity.RuleChainMsgLog{}, "id in (?)", ids).Error, "删除规则链失败") +func (m *ruleChainLogModelImpl) Delete(data entity.RuleChainMsgLog) { + db := global.Db.Table(m.table) + // 此处填写 where参数判断 + if data.DeviceName != "" { + db = db.Where("device_name = ?", data.DeviceName) + } + if data.MessageId != "" { + db = db.Where("message_id = ?", data.MessageId) + } + if data.MsgType != "" { + db = db.Where("msg_type = ?", data.MsgType) + } + biz.ErrIsNil(db.Delete(&entity.RuleChainMsgLog{}).Error, "删除规则链失败") } diff --git a/apps/video/api/ys.go b/apps/video/api/ys.go index b316cc0..cc04285 100644 --- a/apps/video/api/ys.go +++ b/apps/video/api/ys.go @@ -15,7 +15,7 @@ func (j *YsApi) GetDeviceList(rc *restfulx.ReqCtx) { pageNum := restfulx.QueryInt(rc, "pageNum", 1) pageSize := restfulx.QueryInt(rc, "pageSize", 10) devices, total, err := j.Ys.GetDeviceList(pageNum, pageSize) - biz.ErrIsNilAppendErr(err, "设备列表获取失败") + biz.ErrIsNil(err, "设备列表获取失败,可能萤石Token过期,请联系管理员。。") rc.ResData = model.ResultPage{ Total: total, PageNum: int64(pageNum), diff --git a/pkg/global/const_device.go b/pkg/global/const_device.go index d4f0ddf..499852c 100644 --- a/pkg/global/const_device.go +++ b/pkg/global/const_device.go @@ -29,7 +29,6 @@ const ( DIRECT = "direct" //直连设备 GATEWAY = "gateway" //网关设备 GATEWAYS = "gatewayS" // 网关子设备 - MONITOR = "monitor" // 监控设备 ) // 设备命令状态 diff --git a/pkg/rule_engine/message/message.go b/pkg/rule_engine/message/message.go index 6daeb52..c3539c7 100644 --- a/pkg/rule_engine/message/message.go +++ b/pkg/rule_engine/message/message.go @@ -1,6 +1,11 @@ package message -/* +import ( + "encoding/json" + "github.com/google/uuid" + "time" +) + // 消息类型 const ( ConnectMes = "Connect" @@ -20,51 +25,21 @@ const ( MONITOR = "MONITOR" //监控 ) -// Message ... -type Message interface { - GetId() string - GetTs() time.Time - GetUserId() string - GetType() string - GetMsg() map[string]interface{} - GetMetadata() Metadata - GetAllMap() map[string]interface{} //msg 和 Metadata的合并 - SetType(string) - SetMsg(map[string]interface{}) - SetUserId(string) - SetMetadata(Metadata) - MarshalBinary() ([]byte, error) -} +type Msg map[string]interface{} +type Metadata map[string]interface{} -// Metadata ... -type Metadata interface { - Keys() []string - GetKeyValue(key string) interface{} - SetKeyValue(key string, val interface{}) - GetValues() map[string]interface{} +type Message struct { + Id string //uuid 消息Id + Ts time.Time //时间戳 + MsgType string //消息类型, attributes(参数),telemetry(遥测),Connect连接事件 + UserId string //客户Id UUID 设备发布人 + Msg Msg //数据 数据结构JSON 设备原始数据 msg + Metadata Metadata //消息的元数据 包括设备Id,设备类型,产品ID等 } // NewMessage ... -func NewMessage() Message { - return &defaultMessage{ - Id: uuid.New().String(), - Ts: time.Now(), - Msg: map[string]interface{}{}, - } -} - -type defaultMessage struct { - Id string //uuid 消息Id - Ts time.Time //时间戳 - MsgType string //消息类型, attributes(参数),telemetry(遥测),Connect连接事件 - UserId string //客户Id UUID 设备发布人 - Msg map[string]interface{} //数据 数据结构JSON 设备原始数据 msg - Metadata Metadata //消息的元数据 包括设备Id,设备类型,产品ID等 -} - -// NewMessageWithDetail ... -func NewMessageWithDetail(userId, messageType string, msg map[string]interface{}, metadata Metadata) Message { - return &defaultMessage{ +func NewMessage(userId, messageType string, msg Msg, metadata Metadata) *Message { + return &Message{ Id: uuid.New().String(), Ts: time.Now(), UserId: userId, @@ -74,16 +49,10 @@ func NewMessageWithDetail(userId, messageType string, msg map[string]interface{} } } -func (t *defaultMessage) GetId() string { return t.Id } -func (t *defaultMessage) GetTs() time.Time { return t.Ts } -func (t *defaultMessage) GetUserId() string { return t.UserId } -func (t *defaultMessage) GetType() string { return t.MsgType } -func (t *defaultMessage) GetMsg() map[string]interface{} { return t.Msg } -func (t *defaultMessage) GetMetadata() Metadata { return t.Metadata } -func (t *defaultMessage) GetAllMap() map[string]interface{} { +func (t *Message) GetAllMap() map[string]interface{} { data := make(map[string]interface{}) - for msgKey, msgValue := range t.GetMsg() { - for metaKey, metaValue := range t.GetMetadata().GetValues() { + for msgKey, msgValue := range t.Msg { + for metaKey, metaValue := range t.Metadata { if msgKey == metaKey { data[msgKey] = metaValue } else { @@ -98,55 +67,33 @@ func (t *defaultMessage) GetAllMap() map[string]interface{} { } return data } -func (t *defaultMessage) SetType(msgType string) { t.MsgType = msgType } -func (t *defaultMessage) SetMsg(msg map[string]interface{}) { t.Msg = msg } -func (t *defaultMessage) SetUserId(userId string) { t.UserId = userId } -func (t *defaultMessage) SetMetadata(metadata Metadata) { t.Metadata = metadata } -func (t *defaultMessage) MarshalBinary() ([]byte, error) { +func (t *Message) MarshalBinary() ([]byte, error) { return json.Marshal(t) } -// NewMetadata ... -func NewMetadata() Metadata { - return &defaultMetadata{ - values: make(map[string]interface{}), - } -} - -type defaultMetadata struct { - values map[string]interface{} -} - -func NewDefaultMetadata(vals map[string]interface{}) Metadata { - return &defaultMetadata{ - values: vals, - } -} - -func (t *defaultMetadata) Keys() []string { +func (meta *Metadata) Keys() []string { keys := make([]string, 0) - for key := range t.values { + for key := range *meta { keys = append(keys, key) } return keys } -func (t *defaultMetadata) GetKeyValue(key string) interface{} { - if _, found := t.values[key]; !found { +func (meta *Metadata) GetValue(key string) any { + if _, found := (*meta)[key]; !found { return nil } - return t.values[key] + return (*meta)[key] } -func (t *defaultMetadata) SetKeyValue(key string, val interface{}) { - t.values[key] = val +func (meta *Metadata) SetValue(key string, val interface{}) { + (*meta)[key] = val } -func (t *defaultMetadata) GetValues() map[string]interface{} { - return t.values +func (msg *Msg) GetValue(key string) any { + if _, found := (*msg)[key]; !found { + return nil + } + return (*msg)[key] } -func (t *defaultMetadata) SetValues(values map[string]interface{}) { - t.values = values -} -*/ diff --git a/pkg/rule_engine/message/message1.go b/pkg/rule_engine/message/message1.go deleted file mode 100644 index c3539c7..0000000 --- a/pkg/rule_engine/message/message1.go +++ /dev/null @@ -1,99 +0,0 @@ -package message - -import ( - "encoding/json" - "github.com/google/uuid" - "time" -) - -// 消息类型 -const ( - ConnectMes = "Connect" - DisConnectMes = "Disconnect" - RpcRequestMes = "RpcRequest" - UpEventMes = "Event" - AlarmMes = "Alarm" - RowMes = "Row" - TelemetryMes = "Telemetry" - AttributesMes = "Attributes" -) - -// 数据类型Originator -const ( - DEVICE = "DEVICE" - GATEWAY = "GATEWAY" - MONITOR = "MONITOR" //监控 -) - -type Msg map[string]interface{} -type Metadata map[string]interface{} - -type Message struct { - Id string //uuid 消息Id - Ts time.Time //时间戳 - MsgType string //消息类型, attributes(参数),telemetry(遥测),Connect连接事件 - UserId string //客户Id UUID 设备发布人 - Msg Msg //数据 数据结构JSON 设备原始数据 msg - Metadata Metadata //消息的元数据 包括设备Id,设备类型,产品ID等 -} - -// NewMessage ... -func NewMessage(userId, messageType string, msg Msg, metadata Metadata) *Message { - return &Message{ - Id: uuid.New().String(), - Ts: time.Now(), - UserId: userId, - MsgType: messageType, - Msg: msg, - Metadata: metadata, - } -} - -func (t *Message) GetAllMap() map[string]interface{} { - data := make(map[string]interface{}) - for msgKey, msgValue := range t.Msg { - for metaKey, metaValue := range t.Metadata { - if msgKey == metaKey { - data[msgKey] = metaValue - } else { - if _, ok := data[msgKey]; !ok { - data[msgKey] = msgValue - } - if _, ok := data[metaKey]; !ok { - data[metaKey] = metaValue - } - } - } - } - return data -} - -func (t *Message) MarshalBinary() ([]byte, error) { - return json.Marshal(t) -} - -func (meta *Metadata) Keys() []string { - keys := make([]string, 0) - for key := range *meta { - keys = append(keys, key) - } - return keys -} - -func (meta *Metadata) GetValue(key string) any { - if _, found := (*meta)[key]; !found { - return nil - } - return (*meta)[key] -} - -func (meta *Metadata) SetValue(key string, val interface{}) { - (*meta)[key] = val -} - -func (msg *Msg) GetValue(key string) any { - if _, found := (*msg)[key]; !found { - return nil - } - return (*msg)[key] -} diff --git a/pkg/rule_engine/nodes/action_log_node.go b/pkg/rule_engine/nodes/action_log_node.go index 5cf3471..1717378 100644 --- a/pkg/rule_engine/nodes/action_log_node.go +++ b/pkg/rule_engine/nodes/action_log_node.go @@ -3,7 +3,6 @@ package nodes import ( "pandax/apps/rule/entity" "pandax/apps/rule/services" - "pandax/pkg/global" "pandax/pkg/rule_engine/message" ) @@ -40,11 +39,11 @@ func (n *logNode) Handle(msg *message.Message) error { services.RuleChainMsgLogModelDao.Insert(entity.RuleChainMsgLog{ MessageId: msg.Id, MsgType: msg.MsgType, + DeviceId: msg.Metadata["deviceId"].(string), DeviceName: msg.Metadata["deviceName"].(string), Ts: msg.Ts, Content: logMessage, }) - global.Log.Info(logMessage) if err != nil { if failureLableNode != nil { return failureLableNode.Handle(msg) diff --git a/pkg/ys/ys.go b/pkg/ys/ys.go index a910f0b..ccc40a4 100644 --- a/pkg/ys/ys.go +++ b/pkg/ys/ys.go @@ -101,7 +101,6 @@ func (ys *Ys) authorizeRequset(method, url string, params map[string]interface{} return } }() - log.Println("初始化token", *ys) params["accessToken"] = ys.AccessToken status, err = ys.requset(method, url, params, data) return diff --git a/shutdown.bat b/shutdown.bat index d059a77..421c5b5 100644 --- a/shutdown.bat +++ b/shutdown.bat @@ -1 +1 @@ -taskkill /pid 19788 -t -f \ No newline at end of file +taskkill /pid 45120 -t -f \ No newline at end of file