diff --git a/apps/device/api/device.go b/apps/device/api/device.go index cb26ab8..c5f103f 100644 --- a/apps/device/api/device.go +++ b/apps/device/api/device.go @@ -85,14 +85,17 @@ func (p *DeviceApi) GetDeviceListAll(rc *restfulx.ReqCtx) { // GetDevice 获取Device func (p *DeviceApi) GetDevice(rc *restfulx.ReqCtx) { id := restfulx.PathParam(rc, "id") - rc.ResData = p.DeviceApp.FindOne(id) + device, err := p.DeviceApp.FindOne(id) + biz.ErrIsNil(err, "获取设备失败") + rc.ResData = device } // GetDeviceStatus 获取Device状态信息 func (p *DeviceApi) GetDeviceStatus(rc *restfulx.ReqCtx) { id := restfulx.PathParam(rc, "id") classify := restfulx.QueryParam(rc, "classify") - device := p.DeviceApp.FindOne(id) + device, err := p.DeviceApp.FindOne(id) + biz.ErrIsNil(err, "获取设备失败") template := p.ProductTemplateApp.FindList(entity.ProductTemplate{Classify: classify, Pid: device.Pid}) // 从设备影子中读取 res := make([]entity.DeviceStatusVo, 0) @@ -164,7 +167,8 @@ func (p *DeviceApi) GetDeviceTelemetryHistory(rc *restfulx.ReqCtx) { startTime := restfulx.QueryParam(rc, "startTime") endTime := restfulx.QueryParam(rc, "endTime") limit := restfulx.QueryInt(rc, "limit", 1000) - device := p.DeviceApp.FindOne(id) + device, err := p.DeviceApp.FindOne(id) + biz.ErrIsNil(err, "获取设备失败,设备不存在") sql := `select ts,? from ? where ts > '?' and ts < '?' and ? is not null ORDER BY ts DESC LIMIT ? ` rs, err := global.TdDb.GetAll(sql, key, fmt.Sprintf("%s_telemetry", strings.ToLower(device.Name)), startTime, endTime, key, limit) biz.ErrIsNilAppendErr(err, "查询设备属性的遥测历史失败") diff --git a/apps/device/services/device.go b/apps/device/services/device.go index d3a5665..2581fc8 100644 --- a/apps/device/services/device.go +++ b/apps/device/services/device.go @@ -14,7 +14,7 @@ type ( Insert(data entity.Device) *entity.Device FindOneByToken(token string) (*entity.DeviceRes, error) FindOneByName(name string) (*entity.DeviceRes, error) - FindOne(id string) *entity.DeviceRes + FindOne(id string) (*entity.DeviceRes, error) FindListPage(page, pageSize int, data entity.Device) (*[]entity.DeviceRes, int64) FindList(data entity.Device) *[]entity.DeviceRes Update(data entity.Device) *entity.Device @@ -65,12 +65,11 @@ func (m *deviceModelImpl) Insert(data entity.Device) *entity.Device { return &data } -func (m *deviceModelImpl) FindOne(id string) *entity.DeviceRes { +func (m *deviceModelImpl) FindOne(id string) (*entity.DeviceRes, error) { resData := new(entity.DeviceRes) db := global.Db.Table(m.table).Where("id = ?", id) err := db.Preload("Product").Preload("DeviceGroup").First(resData).Error - biz.ErrIsNil(err, "查询设备失败") - return resData + return resData, err } func (m *deviceModelImpl) FindOneByName(name string) (*entity.DeviceRes, error) { @@ -176,9 +175,12 @@ func (m *deviceModelImpl) UpdateStatus(id, linkStatus string) error { func (m *deviceModelImpl) Delete(ids []string) { biz.ErrIsNil(global.Db.Table(m.table).Delete(&entity.Device{}, "id in (?)", ids).Error, "删除设备失败") for _, id := range ids { - device := m.FindOne(id) + device, err := m.FindOne(id) + if err != nil { + continue + } // 删除表 - err := deleteDeviceTable(device.Name) + err = deleteDeviceTable(device.Name) global.Log.Error("设备时序表删除失败", err) // 删除所有缓存 if device.DeviceType == global.GATEWAYS { diff --git a/apps/device/util/device_rpc.go b/apps/device/util/device_rpc.go index dec4635..0a3ffcb 100644 --- a/apps/device/util/device_rpc.go +++ b/apps/device/util/device_rpc.go @@ -14,13 +14,16 @@ import ( ) func BuildRunDeviceRpc(deviceId, mode string, metadata map[string]interface{}) error { - one := services.DeviceModelDao.FindOne(deviceId) - if one.LinkStatus != global.ONLINE { + device, err := services.DeviceModelDao.FindOne(deviceId) + if err != nil { + return err + } + if device.LinkStatus != global.ONLINE { return errors.New("设备不在线无法设置属性") } - findOne := ruleService.RuleChainModelDao.FindOne(one.Product.RuleChainId) + findOne := ruleService.RuleChainModelDao.FindOne(device.Product.RuleChainId) ruleData := ruleEntity.RuleDataJson{} - err := tool.StringToStruct(findOne.RuleDataJson, &ruleData) + err = tool.StringToStruct(findOne.RuleDataJson, &ruleData) if err != nil { global.Log.Error("规则链数据转化失败", err) return errors.New("规则链数据转化失败") @@ -33,16 +36,16 @@ func BuildRunDeviceRpc(deviceId, mode string, metadata map[string]interface{}) e return errs[0] } metadataVals := map[string]interface{}{ - "deviceId": one.Id, + "deviceId": device.Id, "mode": mode, - "deviceName": one.Name, - "deviceType": one.DeviceType, - "deviceProtocol": one.Product.ProtocolName, - "productId": one.Pid, - "orgId": one.OrgId, - "owner": one.Owner, + "deviceName": device.Name, + "deviceType": device.DeviceType, + "deviceProtocol": device.Product.ProtocolName, + "productId": device.Pid, + "orgId": device.OrgId, + "owner": device.Owner, } - msg := message.NewMessage(one.Owner, message.RpcRequestToDevice, metadata, metadataVals) + msg := message.NewMessage(device.Owner, message.RpcRequestToDevice, metadata, metadataVals) err = instance.StartRuleChain(context.Background(), msg) if err != nil { global.Log.Error("规则链执行失败", errs) diff --git a/iothub/client/mqttclient/mqtt_api.go b/iothub/client/mqttclient/mqtt_api.go index ad43156..8a875ec 100644 --- a/iothub/client/mqttclient/mqtt_api.go +++ b/iothub/client/mqttclient/mqtt_api.go @@ -63,6 +63,9 @@ func GetEmqInfo(infoType string) ([]map[string]interface{}, error) { // Publish 推送信息 func Publish(topic, clientId string, payload interface{}) error { + if clientId == "" { + return errors.New("未获取到MQTT连接") + } global.Log.Debugf("send data to clientId: %s, topic:%s, payload: %v", clientId, topic, payload) url := fmt.Sprintf("%s/v5/publish", global.Conf.Mqtt.Broker) pubData := map[string]interface{}{ diff --git a/pkg/rule_engine/message/message.go b/pkg/rule_engine/message/message.go index 135a72d..efdab97 100644 --- a/pkg/rule_engine/message/message.go +++ b/pkg/rule_engine/message/message.go @@ -118,6 +118,11 @@ func (meta *Metadata) GetValue(key string) any { return (*meta)[key] } +func (meta *Metadata) Has(key string) bool { + _, ok := (*meta)[key] + return ok +} + func (meta *Metadata) SetValue(key string, val interface{}) { (*meta)[key] = val } diff --git a/pkg/rule_engine/nodes/init.go b/pkg/rule_engine/nodes/init.go index b2510fc..634517e 100644 --- a/pkg/rule_engine/nodes/init.go +++ b/pkg/rule_engine/nodes/init.go @@ -11,6 +11,7 @@ func init() { RegisterFactory(transformDeleteKeyNodeFactory{}) RegisterFactory(transformRenameKeyNodeFactory{}) RegisterFactory(transformScriptNodeFactory{}) + RegisterFactory(transformMetadataNodeFactory{}) RegisterFactory(createAlarmNodeFactory{}) RegisterFactory(clearAlarmNodeFactory{}) diff --git a/pkg/rule_engine/nodes/transform_metadata_node.go b/pkg/rule_engine/nodes/transform_metadata_node.go new file mode 100644 index 0000000..aa63296 --- /dev/null +++ b/pkg/rule_engine/nodes/transform_metadata_node.go @@ -0,0 +1,55 @@ +package nodes + +import ( + "pandax/apps/device/services" + "pandax/pkg/rule_engine/message" +) + +type transformMetadataNode struct { + bareNode + DeviceId string `json:"deviceId" yaml:"deviceId"` +} + +type transformMetadataNodeFactory struct{} + +func (f transformMetadataNodeFactory) Name() string { return "MetadataKeyNode" } +func (f transformMetadataNodeFactory) Category() string { return NODE_CATEGORY_TRANSFORM } +func (f transformMetadataNodeFactory) Labels() []string { return []string{"Success", "Failure"} } +func (f transformMetadataNodeFactory) Create(id string, meta Properties) (Node, error) { + node := &transformMetadataNode{ + bareNode: newBareNode(f.Name(), id, meta, f.Labels()), + } + return decodePath(meta, node) +} + +func (n *transformMetadataNode) Handle(msg *message.Message) error { + n.Debug(msg, message.DEBUGIN, "") + + successLabelNode := n.GetLinkedNode("Success") + failureLabelNode := n.GetLinkedNode("Failure") + // 获取设备信息 + device, err := services.DeviceModelDao.FindOne(n.DeviceId) + if err != nil { + n.Debug(msg, message.DEBUGOUT, err.Error()) + if failureLabelNode != nil { + return failureLabelNode.Handle(msg) + } else { + return err + } + } + // 更改元数据基本信息 + msg.Metadata = map[string]interface{}{ + "deviceId": n.DeviceId, + "deviceName": device.Name, + "deviceType": device.DeviceType, + "deviceProtocol": device.Product.ProtocolName, + "productId": device.Pid, + "orgId": device.OrgId, + "owner": device.Owner, + } + if successLabelNode != nil { + n.Debug(msg, message.DEBUGOUT, "") + return successLabelNode.Handle(msg) + } + return nil +}