diff --git a/apps/device/services/device.go b/apps/device/services/device.go index 112d31c..554e724 100644 --- a/apps/device/services/device.go +++ b/apps/device/services/device.go @@ -210,19 +210,17 @@ func createDeviceTable(productId, device string) error { } func GetDeviceToken(data *entity.Device) *model.DeviceAuth { - now := time.Now() etoken := &model.DeviceAuth{ - DeviceId: data.Id, - OrgId: data.OrgId, - Owner: data.Owner, - Name: data.Name, - DeviceType: data.DeviceType, - ProductId: data.Pid, - DeviceProtocol: data.Protocol, + DeviceId: data.Id, + OrgId: data.OrgId, + Owner: data.Owner, + Name: data.Name, + DeviceType: data.DeviceType, + Status: data.LinkStatus, + DeviceGroup: data.Gid, + ProductId: data.Pid, + DeviceExt: data.Ext, } - //设备有效期360天 - etoken.CreatedAt = now.Unix() - etoken.ExpiredAt = now.Add(time.Hour * 24 * 365).Unix() return etoken } diff --git a/apps/device/services/product.go b/apps/device/services/product.go index 5f4cc7e..445459e 100644 --- a/apps/device/services/product.go +++ b/apps/device/services/product.go @@ -2,7 +2,6 @@ package services import ( "pandax/apps/device/entity" - "pandax/pkg/cache" "pandax/pkg/global" ) @@ -12,6 +11,7 @@ type ( FindOne(id string) (*entity.ProductRes, error) FindListPage(page, pageSize int, data entity.Product) (*[]entity.ProductRes, int64, error) FindList(data entity.Product) (*[]entity.ProductRes, error) + FindListByRule(ruleId string) (*[]entity.Product, error) Update(data entity.Product) (*entity.Product, error) Delete(ids []string) error FindProductCount() (entity.DeviceCount, error) @@ -108,6 +108,18 @@ func (m *productModelImpl) FindList(data entity.Product) (*[]entity.ProductRes, return &list, err } +func (m *productModelImpl) FindListByRule(ruleId string) (*[]entity.Product, error) { + list := make([]entity.Product, 0) + db := global.Db.Table(m.table) + // 此处填写 where参数判断 + if ruleId != "" { + db = db.Where("rule_chain_id = ?", ruleId) + } + db = db.Where("status = ?", "0") + err := db.Find(&list).Error + return &list, err +} + func (m *productModelImpl) Update(data entity.Product) (*entity.Product, error) { // go的一些默认值 int 0 bool false 保存失败需要先转成map err := global.Db.Table(m.table).Where("id = ?", data.Id).Updates(data).Error @@ -122,8 +134,6 @@ func (m *productModelImpl) Delete(ids []string) error { for _, id := range ids { // 删除超级表 deleteDeviceStable(id) - // 删除所有缓存 - cache.DelProductRule(id) // 删除绑定的属性及OTA记录 ProductTemplateModelDao.Delete([]string{id}) ProductOtaModelDao.Delete([]string{id}) diff --git a/apps/device/util/device_rpc.go b/apps/device/util/device_rpc.go index 0f6bb15..a9f743b 100644 --- a/apps/device/util/device_rpc.go +++ b/apps/device/util/device_rpc.go @@ -1,13 +1,11 @@ package util import ( - "context" "encoding/json" "errors" "pandax/apps/device/services" ruleEntity "pandax/apps/rule/entity" ruleService "pandax/apps/rule/services" - "pandax/pkg/cache" devicerpc "pandax/pkg/device_rpc" "pandax/pkg/global" "pandax/pkg/rule_engine" @@ -23,35 +21,33 @@ func BuildRunDeviceRpc(deviceId, mode string, rp devicerpc.RpcPayload) error { if device.LinkStatus != global.ONLINE { return errors.New("设备不在线无法设置属性") } - findOne, err := ruleService.RuleChainModelDao.FindOne(device.Product.RuleChainId) - if err != nil { - global.Log.Error("查询规则链数据失败", err) - return errors.New("查询规则链数据失败") - } - ruleData := ruleEntity.RuleDataJson{} - err = tool.StringToStruct(findOne.RuleDataJson, &ruleData) - if err != nil { - global.Log.Error("规则链数据转化失败", err) - return errors.New("规则链数据转化失败") - } - dataCode := ruleData.LfData.DataCode - code, _ := json.Marshal(dataCode) + //新建规则链实体 - instance := &rule_engine.RuleChainInstance{} - ruleInstance, bo := cache.GetProductRule(device.Product.Id) - if !bo { - instance, err = rule_engine.NewRuleChainInstance(findOne.Id, code) + ruleInstance := rule_engine.RuleEngine.GetRuleInstance(device.Product.Id) + if ruleInstance == nil { + findOne, err := ruleService.RuleChainModelDao.FindOne(device.Product.RuleChainId) + if err != nil { + global.Log.Error("查询规则链数据失败", err) + return errors.New("查询规则链数据失败") + } + ruleData := ruleEntity.RuleDataJson{} + err = tool.StringToStruct(findOne.RuleDataJson, &ruleData) + if err != nil { + global.Log.Error("规则链数据转化失败", err) + return errors.New("规则链数据转化失败") + } + dataCode := ruleData.DataCode + code, err := json.Marshal(dataCode) + if err != nil { + global.Log.Error("规则链数据解析失败", err) + return errors.New("规则链数据解析失败") + } + ruleInstance, err = rule_engine.NewRuleChainInstance(findOne.Id, code) if err != nil { return err } - } else { - if data, ok := ruleInstance.(*rule_engine.RuleChainInstance); ok { - instance = data - } else { - return errors.New("规则实体解析错误") - } + rule_engine.RuleEngine.SaveRuleInstance(device.Product.Id, ruleInstance) } - metadataVals := map[string]interface{}{ "deviceId": device.Id, "mode": mode, @@ -62,9 +58,8 @@ func BuildRunDeviceRpc(deviceId, mode string, rp devicerpc.RpcPayload) error { "orgId": device.OrgId, "owner": device.Owner, } - msg := message.NewMessage(device.Owner, message.RpcRequestToDevice, rp.ToMap(), metadataVals) - err = instance.StartRuleChain(context.Background(), msg) + err = rule_engine.RuleEngine.StartRuleInstance(ruleInstance, msg) if err != nil { global.Log.Error("规则链执行失败", err) } diff --git a/apps/rule/api/rulechain.go b/apps/rule/api/rulechain.go index 0467815..d7987f1 100644 --- a/apps/rule/api/rulechain.go +++ b/apps/rule/api/rulechain.go @@ -25,7 +25,7 @@ func (r *RuleChainApi) GetNodeDebug(rc *restfulx.ReqCtx) { ruleId := restfulx.QueryParam(rc, "ruleId") nodeId := restfulx.QueryParam(rc, "nodeId") - total, list, err := rule_engine.GetDebugDataPage(pageNum, pageSize, ruleId, nodeId) + total, list, err := rule_engine.RuleEngine.GetDebugDataPage(pageNum, pageSize, ruleId, nodeId) biz.ErrIsNil(err, "获取规则测试数据错误") rc.ResData = model.ResultPage{ Total: total, @@ -38,7 +38,7 @@ func (r *RuleChainApi) GetNodeDebug(rc *restfulx.ReqCtx) { func (r *RuleChainApi) ClearNodeDebug(rc *restfulx.ReqCtx) { ruleId := restfulx.QueryParam(rc, "ruleId") nodeId := restfulx.QueryParam(rc, "nodeId") - rule_engine.ClearDebugData(ruleId, nodeId) + rule_engine.RuleEngine.ClearDebugData(ruleId, nodeId) } // GetRuleChainList WorkInfo列表数据 diff --git a/apps/rule/entity/rulechain_data.go b/apps/rule/entity/rulechain_data.go index acc5d16..c891a84 100644 --- a/apps/rule/entity/rulechain_data.go +++ b/apps/rule/entity/rulechain_data.go @@ -5,11 +5,7 @@ import ( ) type RuleDataJson struct { - Id string - LfData LfData `json:"lfData"` -} - -type LfData struct { + Id string GlobalColor string `json:"globalColor"` DataCode map[string]interface{} `json:"dataCode"` OpenRule bool `json:"openRule"` diff --git a/iothub/hook_message_work/hook_message_work.go b/iothub/hook_message_work/hook_message_work.go index 1c469b2..df49607 100644 --- a/iothub/hook_message_work/hook_message_work.go +++ b/iothub/hook_message_work/hook_message_work.go @@ -1,19 +1,17 @@ package hook_message_work import ( - "context" "encoding/json" "fmt" - "github.com/PandaXGO/PandaKit/biz" "pandax/apps/device/services" ruleEntity "pandax/apps/rule/entity" ruleService "pandax/apps/rule/services" "pandax/iothub/netbase" - "pandax/pkg/cache" "pandax/pkg/global" "pandax/pkg/global/model" "pandax/pkg/rule_engine" "pandax/pkg/rule_engine/message" + "pandax/pkg/shadow" "pandax/pkg/tdengine" "pandax/pkg/tool" "pandax/pkg/websocket" @@ -55,13 +53,13 @@ func (s *HookService) handleOne(msg *netbase.DeviceEventInfo) { go SendZtWebsocket(msg.DeviceId, msg.Datas) } // 获取规则链代码实体 - instance := getRuleChainInstance(msg.DeviceAuth) - if instance == nil { - global.Log.Error("规则链实体不存在") + instance, err := getRuleChainInstance(msg.DeviceAuth) + if err != nil { + global.Log.Error("获取设备实体失败", err) return } ruleMessage := buildRuleMessage(msg.DeviceAuth, msgVals, msg.Type) - err = instance.StartRuleChain(context.Background(), ruleMessage) + err = rule_engine.RuleEngine.StartRuleInstance(instance, ruleMessage) if err != nil { global.Log.Error("规则链执行失败", err) return @@ -89,12 +87,19 @@ func (s *HookService) handleOne(msg *netbase.DeviceEventInfo) { global.Log.Error("事件添加错误", err) } } + // 刷新设备状态 + shadow.DeviceShadowInstance.RefreshDeviceStatus(msg.DeviceAuth.Name) case message.DisConnectMes, message.ConnectMes: // 更改设备在线状态 + isHas := shadow.DeviceShadowInstance.HasDevice(msg.DeviceAuth.Name) + if !isHas { + shadow.InitDeviceShadow(msg.DeviceAuth.Name) + } if msg.Type == message.ConnectMes { - services.DeviceModelDao.UpdateStatus(msg.DeviceId, global.ONLINE) + shadow.DeviceShadowInstance.SetOnline(msg.DeviceAuth.Name) + } else { - services.DeviceModelDao.UpdateStatus(msg.DeviceId, global.OFFLINE) + shadow.DeviceShadowInstance.SetOnline(msg.DeviceAuth.Name) } // 添加设备连接历史 data := make(map[string]any) @@ -112,16 +117,11 @@ func (s *HookService) handleOne(msg *netbase.DeviceEventInfo) { } // 获取规则实体 -func getRuleChainInstance(etoken *model.DeviceAuth) *rule_engine.RuleChainInstance { - defer func() { - if err := recover(); err != nil { - global.Log.Error(err) - } - }() - +func getRuleChainInstance(etoken *model.DeviceAuth) (*rule_engine.RuleChainInstance, error) { key := etoken.ProductId - instance, err := cache.ComputeIfAbsentProductRule(key, func(k any) (any, error) { - one, err := services.ProductModelDao.FindOne(k.(string)) + instance := rule_engine.RuleEngine.GetRuleInstance(key) + if instance == nil { + one, err := services.ProductModelDao.FindOne(key) if err != nil { return nil, err } @@ -134,20 +134,18 @@ func getRuleChainInstance(etoken *model.DeviceAuth) *rule_engine.RuleChainInstan if err != nil { return nil, err } - code, _ := json.Marshal(lfData.LfData.DataCode) + code, _ := json.Marshal(lfData.DataCode) //新建规则链实体 - instance, err := rule_engine.NewRuleChainInstance(rule.Id, code) + instance, err = rule_engine.NewRuleChainInstance(rule.Id, code) + if err != nil { + return nil, err + } + _, err = rule_engine.RuleEngine.SaveRuleInstance(key, instance) if err != nil { - global.Log.Error("规则链初始化失败", err) return nil, err } - return instance, nil - }) - biz.ErrIsNil(err, "缓存读取规则链失败") - if ruleData, ok := instance.(*rule_engine.RuleChainInstance); ok { - return ruleData } - return nil + return instance, nil } // 构建规则链执行的消息 @@ -156,7 +154,7 @@ func buildRuleMessage(etoken *model.DeviceAuth, msgVals map[string]interface{}, "deviceId": etoken.DeviceId, "deviceName": etoken.Name, "deviceType": etoken.DeviceType, - "deviceProtocol": etoken.DeviceProtocol, + "deviceProtocol": etoken.Protocol, "productId": etoken.ProductId, "orgId": etoken.OrgId, "owner": etoken.Owner, diff --git a/iothub/netbase/hook_base.go b/iothub/netbase/hook_base.go index f0aad85..9d2cca4 100644 --- a/iothub/netbase/hook_base.go +++ b/iothub/netbase/hook_base.go @@ -13,7 +13,6 @@ import ( "pandax/pkg/tool" "regexp" "strings" - "sync" "time" ) @@ -38,83 +37,86 @@ func Auth(authToken string) bool { return false } etoken = services.GetDeviceToken(&device.Device) - etoken.DeviceProtocol = device.Product.ProtocolName + etoken.Protocol = device.Product.ProtocolName err = cache.SetDeviceEtoken(authToken, etoken.GetMarshal(), time.Hour*24*365) if err != nil { global.Log.Infof("认证失败,设备TOKEN %s添加缓存失败", authToken) return false } } - // 判断token是否过期了, 设备过期 - if etoken.ExpiredAt < time.Now().Unix() { - global.Log.Infof("设备authToken %s 失效", authToken) - return false - } return true } // SubAuth 获取子设备的认证信息 -func SubAuth(name string) (*model.DeviceAuth, bool) { +func SubAuth(authToken string, productAuth *model.DeviceAuth) (*model.DeviceAuth, bool) { + defer func() { + if Rerr := recover(); Rerr != nil { + global.Log.Error(Rerr) + return + } + }() + // 解析认证 + tokens := strings.Split(authToken, "_") + producId := "" + name := "" + if len(tokens) >= 2 { + producId = tokens[0] + name = tokens[1] + } else if len(tokens) == 1 { + name = tokens[0] + } else { + return nil, false + } + etoken := &model.DeviceAuth{} // redis 中有就查询,没有就添加 - exists := cache.ExistsDeviceEtoken(name) - if exists { - err := etoken.GetDeviceToken(name) + err := cache.GetDeviceEtoken(name, etoken) + if err == nil { + return etoken, true + } + // 判断子设备 已经创建的子设备 + deviceRes, err := services.DeviceModelDao.FindOneByName(name) + // 没有设备就要创建子设备 + if err != nil { + product, err := services.ProductModelDao.FindOne(producId) if err != nil { - global.Log.Infof("认证失败,缓存读取设备错误,无效的设备标识: %s", err) return nil, false } + //自动创建设备, + // 1. 创建设备 + device := entity.Device{ + Name: name, + Pid: producId, + Alias: productAuth.Name + "子设备", + ParentId: productAuth.DeviceId, + Gid: productAuth.DeviceGroup, + Status: "0", + LinkStatus: global.ONLINE, + LastAt: time.Now(), + DeviceType: entity.GATEWAYS_DEVICE, + } + device.Id = utils.GenerateID("d") + device.OrgId = product.OrgId + device.Owner = product.Owner + device.Protocol = product.ProtocolName + deviceD, err := services.DeviceModelDao.Insert(device) + if err != nil { + return nil, false + } + etoken = services.GetDeviceToken(deviceD) + etoken.Protocol = product.ProtocolName } else { - device, err := services.DeviceModelDao.FindOneByName(name) - // 没有设备就要创建子设备 - if err != nil { - global.Log.Infof("设备标识 %s 不存在, ", name) - return nil, false - } - etoken = services.GetDeviceToken(&device.Device) - etoken.DeviceProtocol = device.Product.ProtocolName - err = cache.SetDeviceEtoken(name, etoken.GetMarshal(), time.Hour*24*365) - if err != nil { - global.Log.Infof("设备标识 %s添加缓存失败", name) - return nil, false - } + etoken = services.GetDeviceToken(&deviceRes.Device) + etoken.Protocol = deviceRes.Product.ProtocolName + } + err = cache.SetDeviceEtoken(name, etoken.GetMarshal(), time.Hour*24*365) + if err != nil { + global.Log.Infof("设备 %s添加缓存失败", name) + return nil, false } return etoken, true } -// CreateSubTableField 添加子设备字段 -func CreateSubTableField(productId, ty string, fields map[string]interface{}) { - var group sync.WaitGroup - for key, value := range fields { - group.Add(1) - go func(key string, value any) { - defer group.Done() - if key == "ts" { - return - } - check := cache.CheckSubDeviceField(key) - if !check { - interfaceType := tool.GetInterfaceType(value) - err := global.TdDb.AddSTableField(productId+"_"+ty, key, interfaceType, 0) - if err != nil { - return - } - tsl := entity.ProductTemplate{} - tsl.Pid = productId - tsl.Id = utils.GenerateID("tsl") - tsl.Name = key - tsl.Type = interfaceType - tsl.Key = key - tsl.Classify = ty - // 向产品tsl中添加模型 - services.ProductTemplateModelDao.Insert(tsl) - cache.SetSubDeviceField(key) - } - }(key, value) - } - group.Wait() -} - // UpdateDeviceTelemetryData 解析遥测数据类型 返回标准带时间戳格式 func UpdateDeviceTelemetryData(data string) map[string]interface{} { tel := make(map[string]interface{}) diff --git a/iothub/server/emqxserver/hook.go b/iothub/server/emqxserver/hook.go index 2eac7b2..26085e9 100644 --- a/iothub/server/emqxserver/hook.go +++ b/iothub/server/emqxserver/hook.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "fmt" + "github.com/kakuilan/kgo" "pandax/iothub/client/mqttclient" "pandax/iothub/hook_message_work" "pandax/iothub/netbase" @@ -205,23 +206,25 @@ func (s *HookGrpcService) OnMessagePublish(ctx context.Context, in *exhook2.Mess } // key就是device name for deviceName, value := range subData { - auth, isSub := netbase.SubAuth(deviceName) + auth, isSub := netbase.SubAuth(deviceName, etoken) if !isSub { continue } + if auth.Status != "online" { + data = netbase.CreateEventInfo(message.ConnectMes, "info", fmt.Sprintf("子设备%s通过网关连接", etoken.Name), auth) + go s.HookService.Queue.Queue(data) + } data.DeviceAuth = auth data.DeviceId = auth.DeviceId if in.Message.Topic == AttributesGatewayTopic { data.Type = message.AttributesMes - marshal, _ := json.Marshal(value) - attributesData := netbase.UpdateDeviceAttributesData(string(marshal)) + attr := kgo.KConv.ToStr(value) + attributesData := netbase.UpdateDeviceAttributesData(attr) if attributesData == nil { continue } bytes, _ := json.Marshal(attributesData) data.Datas = string(bytes) - // 创建tdengine的设备属性表 - netbase.CreateSubTableField(auth.ProductId, global.TslAttributesType, attributesData) // 子设备发送到队列里 go s.HookService.Queue.Queue(data) } @@ -235,23 +238,9 @@ func (s *HookGrpcService) OnMessagePublish(ctx context.Context, in *exhook2.Mess } bytes, _ := json.Marshal(telemetryData) data.Datas = string(bytes) - // 创建tdengine的设备遥测表 - netbase.CreateSubTableField(auth.ProductId, global.TslTelemetryType, telemetryData) // 子设备发送到队列里 go s.HookService.Queue.Queue(data) } - if in.Message.Topic == ConnectGatewayTopic { - if val, ok := value.(string); ok { - if val == "online" { - data = netbase.CreateEventInfo(message.ConnectMes, "info", fmt.Sprintf("子设备%s通过网关连接", etoken.Name), auth) - } - if val == "offline" { - data = netbase.CreateEventInfo(message.ConnectMes, "info", fmt.Sprintf("子设备设备%s通过网关连接", etoken.Name), auth) - } - // 子设备发送到队列里 - go s.HookService.Queue.Queue(data) - } - } } return } diff --git a/pkg/cache/sub_device.go b/pkg/cache/sub_device.go deleted file mode 100644 index 3f7b4d9..0000000 --- a/pkg/cache/sub_device.go +++ /dev/null @@ -1,31 +0,0 @@ -package cache - -import ( - "github.com/PandaXGO/PandaKit/cache" - "strings" - "time" -) - -var SubDeviceField = cache.NewTimedCache(cache.NoExpiration, 24*time.Hour) -var SUBDEVICEKEY = "SUBDEVICEKEY" - -func CheckSubDeviceField(field string) bool { - fields, bool := SubDeviceField.Get(SUBDEVICEKEY) - if !bool { - return false - } - if !strings.Contains(fields.(string), field) { - return false - } - return true -} - -func SetSubDeviceField(data string) { - fields, bool := SubDeviceField.Get(SUBDEVICEKEY) - if !bool { - fields = data - } else { - fields = fields.(string) + "," + data - } - ProductCache.Put(SUBDEVICEKEY, fields) -} diff --git a/pkg/global/model/device_auth_model.go b/pkg/global/model/device_auth_model.go index a8c51a2..bdfa6af 100644 --- a/pkg/global/model/device_auth_model.go +++ b/pkg/global/model/device_auth_model.go @@ -10,21 +10,20 @@ import ( "pandax/apps/system/entity" "pandax/apps/system/services" "pandax/pkg/cache" - "strconv" "strings" ) type DeviceAuth struct { - Owner string `json:"owner"` - OrgId int64 `json:"orgId"` - DeviceId string `json:"deviceId"` - DeviceType string `json:"deviceType"` - DeviceProtocol string `json:"deviceProtocol"` - ProductId string `json:"productId"` - RuleChainId string `json:"ruleChainId"` - Name string `json:"name"` - CreatedAt int64 `json:"created_at"` - ExpiredAt int64 `json:"expired_at"` + Owner string `json:"owner"` + OrgId int64 `json:"orgId"` + DeviceId string `json:"deviceId"` + DeviceType string `json:"deviceType"` + DeviceGroup string `json:"deviceGroup"` + DeviceExt map[string]any `json:"deviceExt"` + Protocol string `json:"protocol"` + ProductId string `json:"productId"` + Name string `json:"name"` + Status string `json:"status"` } func (entity *DeviceAuth) GetDeviceToken(key string) error { @@ -37,7 +36,6 @@ func (entity *DeviceAuth) GetDeviceToken(key string) error { func (token *DeviceAuth) MD5ID() string { buf := bytes.NewBufferString(token.DeviceId) buf.WriteString(token.DeviceType) - buf.WriteString(strconv.FormatInt(token.CreatedAt, 10)) access := base64.URLEncoding.EncodeToString([]byte(uuid.NewMD5(uuid.Must(uuid.NewRandom()), buf.Bytes()).String())) access = strings.TrimRight(access, "=") return access diff --git a/pkg/global/model/rpc_model.go b/pkg/global/model/rpc_model.go deleted file mode 100644 index 042d29c..0000000 --- a/pkg/global/model/rpc_model.go +++ /dev/null @@ -1,25 +0,0 @@ -package model - -import ( - "errors" - "fmt" - "time" -) - -type RpcPayload struct { - Method string `json:"method"` - Params any `json:"params"` -} - -// GetRequestResult 处理设备端请求服务端方法 -func (rpc RpcPayload) GetRequestResult() (string, error) { - //TODO 此处处理设备的请求参数逻辑 - //自己定义请求逻辑 - if rpc.Params == "getCurrentTime" { - unix := time.Now().Unix() - msg := fmt.Sprintf("%d", unix) - return msg, nil - } - // 获取属性 ... - return "", errors.New("未获取到请求方法") -} diff --git a/pkg/initialize/event.go b/pkg/initialize/event.go index dacffa3..7f31066 100644 --- a/pkg/initialize/event.go +++ b/pkg/initialize/event.go @@ -2,10 +2,8 @@ package initialize import ( "encoding/json" - "pandax/apps/device/entity" "pandax/apps/device/services" ruleEntity "pandax/apps/rule/entity" - "pandax/pkg/cache" "pandax/pkg/events" "pandax/pkg/global" "pandax/pkg/rule_engine" @@ -15,11 +13,9 @@ import ( // 初始化事件监听 func InitEvents() { // 监听规则链改变 更新所有绑定改规则链的产品 - global.EventEmitter.On(events.ProductChainRuleEvent, func(ruleId, codeData string) { + global.EventEmitter.On(events.ProductChainRuleEvent, func(ruleId string, codeData string) { global.Log.Infof("规则链%s变更", ruleId) - list, _ := services.ProductModelDao.FindList(entity.Product{ - RuleChainId: ruleId, - }) + list, _ := services.ProductModelDao.FindListByRule(ruleId) if list != nil { var lfData ruleEntity.RuleDataJson err := tool.StringToStruct(codeData, &lfData) @@ -27,7 +23,7 @@ func InitEvents() { global.Log.Error("规则链序列化失败", err) return } - code, err := json.Marshal(lfData.LfData.DataCode) + code, err := json.Marshal(lfData.DataCode) if err != nil { global.Log.Error("规则链序列化失败", err) return @@ -39,7 +35,7 @@ func InitEvents() { return } for _, product := range *list { - cache.PutProductRule(product.Id, instance) + rule_engine.RuleEngine.SaveRuleInstance(product.Id, instance) } } }) diff --git a/pkg/rule_engine/engine.go b/pkg/rule_engine/engine.go new file mode 100644 index 0000000..721d603 --- /dev/null +++ b/pkg/rule_engine/engine.go @@ -0,0 +1,102 @@ +package rule_engine + +import ( + "errors" + "pandax/pkg/rule_engine/message" + "pandax/pkg/rule_engine/nodes" + "sync" + + "github.com/sirupsen/logrus" +) + +var RuleEngine = (*RuleChainEngine)(nil) + +type RuleChainEngine struct { + pool sync.Map + ruleChainDebugData *message.RuleChainDebugData +} + +func init() { + RuleEngine = &RuleChainEngine{ + pool: sync.Map{}, + ruleChainDebugData: message.NewRuleChainDebugData(100), + } +} + +func (en *RuleChainEngine) GetRuleInstance(ruleID string) *RuleChainInstance { + instance, ok := en.pool.Load(ruleID) + if ok { + return instance.(*RuleChainInstance) + } + // TODO 没有就去数据库查询并返回 + + return nil +} + +// iD为产品Id, 一个产品对应一个规则实体 +func (en *RuleChainEngine) SaveRuleInstance(id string, instance *RuleChainInstance) (*RuleChainInstance, error) { + en.pool.Store(id, instance) + return instance, nil +} + +func (en *RuleChainEngine) DeletRuleInstance(id string) { + en.pool.Delete(id) +} + +func (en *RuleChainEngine) StartRuleInstance(instance *RuleChainInstance, msg *message.Message) error { + go func() { + for { + select { + case debugMsg := <-msg.DeBugChan: + en.ruleChainDebugData.Add(instance.ruleId, debugMsg.NodeId, debugMsg) + case <-msg.EndDeBugChan: + logrus.Debugf("规则链%s,执行结束", msg.Id) + return + } + } + }() + node, found := instance.nodes[instance.firstRuleNodeID] + if !found { + return errors.New("first rule node not found") + } + + err := node.Handle(msg) + msg.EndDeBugChan <- struct{}{} + return err +} + +func (en *RuleChainEngine) GetDebugData(ruleId, nodeId string) []message.DebugData { + if data, ok := en.ruleChainDebugData.Data[ruleId]; ok { + return data.Get(nodeId).Items + } + return nil +} + +func (en *RuleChainEngine) ClearDebugData(ruleId, nodeId string) { + if data, ok := en.ruleChainDebugData.Data[ruleId]; ok { + data.Clear(nodeId) + } +} + +func (en *RuleChainEngine) GetDebugDataPage(page, pageSize int, ruleId, nodeId string) (int64, []message.DebugData, error) { + if page < 1 { + page = 1 + } + offset := pageSize * (page - 1) + if data, ok := en.ruleChainDebugData.Data[ruleId]; ok { + nodeData := data.Get(nodeId) + if nodeData != nil { + total := len(nodeData.Items) + end := offset + pageSize + if end >= total { + end = total + } + return int64(total), nodeData.Items[offset:end], nil + } + } + return 0, nil, errors.New("规则不存在") +} + +func GetCategory() []map[string]interface{} { + return nodes.GetCategory() +} diff --git a/pkg/rule_engine/engine_data.go b/pkg/rule_engine/engine_data.go deleted file mode 100644 index a492a7f..0000000 --- a/pkg/rule_engine/engine_data.go +++ /dev/null @@ -1,43 +0,0 @@ -package rule_engine - -import ( - "errors" - "pandax/pkg/rule_engine/message" - "pandax/pkg/rule_engine/nodes" -) - -func GetCategory() []map[string]interface{} { - return nodes.GetCategory() -} - -func GetDebugData(ruleId, nodeId string) []message.DebugData { - if data, ok := ruleChainDebugData.Data[ruleId]; ok { - return data.Get(nodeId).Items - } - return nil -} - -func ClearDebugData(ruleId, nodeId string) { - if data, ok := ruleChainDebugData.Data[ruleId]; ok { - data.Clear(nodeId) - } -} - -func GetDebugDataPage(page, pageSize int, ruleId, nodeId string) (int64, []message.DebugData, error) { - if page < 1 { - page = 1 - } - offset := pageSize * (page - 1) - if data, ok := ruleChainDebugData.Data[ruleId]; ok { - nodeData := data.Get(nodeId) - if nodeData != nil { - total := len(nodeData.Items) - end := offset + pageSize - if end >= total { - end = total - } - return int64(total), nodeData.Items[offset:end], nil - } - } - return 0, nil, errors.New("规则不存在") -} diff --git a/pkg/rule_engine/instance.go b/pkg/rule_engine/instance.go index ab4708a..899d24b 100644 --- a/pkg/rule_engine/instance.go +++ b/pkg/rule_engine/instance.go @@ -1,37 +1,30 @@ package rule_engine import ( - "context" - "errors" "pandax/pkg/rule_engine/manifest" - "pandax/pkg/rule_engine/message" "pandax/pkg/rule_engine/nodes" "github.com/sirupsen/logrus" ) -var ruleChainDebugData = message.NewRuleChainDebugData(100) - type RuleChainInstance struct { - ruleID string + ruleId string firstRuleNodeID string nodes map[string]nodes.Node } -func NewRuleChainInstance(ruleID string, data []byte) (*RuleChainInstance, error) { - instance := &RuleChainInstance{} +func NewRuleChainInstance(ruleId string, data []byte) (*RuleChainInstance, error) { manifest, err := manifest.New(data) if err != nil { logrus.WithError(err).Errorf("invalid manifest file") return nil, err } - instance, err = newInstanceWithManifest(manifest) + withManifest, err := newInstanceWithManifest(manifest) if err != nil { return nil, err } - instance.ruleID = ruleID - - return instance, nil + withManifest.ruleId = ruleId + return withManifest, nil } func newInstanceWithManifest(m *manifest.Manifest) (*RuleChainInstance, error) { @@ -45,29 +38,3 @@ func newInstanceWithManifest(m *manifest.Manifest) (*RuleChainInstance, error) { } return r, nil } - -func (c *RuleChainInstance) StartRuleChain(ctx context.Context, msg *message.Message) error { - debugChan := make(chan *message.DebugData, 100) - endDebugChan := make(chan struct{}) - - go func() { - for { - select { - case debugMsg := <-debugChan: - ruleChainDebugData.Add(c.ruleID, debugMsg.NodeId, *debugMsg) - case <-endDebugChan: - logrus.Debugf("规则链%s,执行结束", msg.Id) - return - } - } - }() - - node, found := c.nodes[c.firstRuleNodeID] - if !found { - return errors.New("first rule node not found") - } - - err := node.Handle(msg) - endDebugChan <- struct{}{} - return err -} diff --git a/pkg/rule_engine/nodes/action_rpc_request_to_device_node.go b/pkg/rule_engine/nodes/action_rpc_request_to_device_node.go index 557f357..9c059e9 100644 --- a/pkg/rule_engine/nodes/action_rpc_request_to_device_node.go +++ b/pkg/rule_engine/nodes/action_rpc_request_to_device_node.go @@ -9,8 +9,8 @@ import ( "pandax/iothub/client/mqttclient" "pandax/iothub/client/tcpclient" "pandax/iothub/client/udpclient" + devicerpc "pandax/pkg/device_rpc" "pandax/pkg/global" - "pandax/pkg/global/model" "pandax/pkg/rule_engine/message" "time" @@ -48,7 +48,7 @@ func (n *rpcRequestToDeviceNode) Handle(msg *message.Message) error { return errors.New("元数据中为获取到设备ID") } // 创建请求格式 - var datas = model.RpcPayload{ + var datas = devicerpc.RpcPayload{ Params: msg.Msg.GetValue("params"), } if method, ok := msg.Msg.GetValue("method").(string); ok { diff --git a/pkg/shadow/device.go b/pkg/shadow/device.go index 09a16eb..2b8d59c 100644 --- a/pkg/shadow/device.go +++ b/pkg/shadow/device.go @@ -1,40 +1,30 @@ package shadow import ( + "pandax/apps/device/services" + "pandax/pkg/global" "time" ) // Device 设备结构 type Device struct { - Name string // 设备名称 - ProductName string // 设备模型名称 - AttributesPoints map[string]DevicePoint // 设备属性点位列表 key 作为属性 - TelemetryPoints map[string]DevicePoint // 设备遥测点位列表 key 作为属性 - online bool // 在线状态 - updatedAt time.Time // 更新时间 + Name string // 设备名称 + online bool // 在线状态 + updatedAt time.Time // 更新时间 } -// DevicePoint 设备点位结构 -type DevicePoint struct { - Name string // 点位名称 - Value interface{} // 点位值 - UpdatedAt time.Time -} - -func NewDevice(deviceName string, productName string, attributes, telemetry map[string]DevicePoint) Device { +func NewDevice(deviceName string) Device { return Device{ - Name: deviceName, - ProductName: productName, - AttributesPoints: attributes, - TelemetryPoints: telemetry, - online: true, + Name: deviceName, + online: true, + updatedAt: time.Now(), } } -func NewDevicePoint(pointName string, value interface{}) DevicePoint { - return DevicePoint{ - Name: pointName, - Value: value, - UpdatedAt: time.Now(), +func deviceHandler(deviceName string, online bool) { + if online { + services.DeviceModelDao.UpdateStatus(deviceName, global.ONLINE) + } else { + services.DeviceModelDao.UpdateStatus(deviceName, global.OFFLINE) } } diff --git a/pkg/shadow/shadow.go b/pkg/shadow/shadow.go index d6677e9..f980c7e 100644 --- a/pkg/shadow/shadow.go +++ b/pkg/shadow/shadow.go @@ -3,7 +3,6 @@ package shadow import ( "errors" "fmt" - "pandax/pkg/global" "sync" "time" ) @@ -18,15 +17,12 @@ type OnlineChangeCallback func(deviceName string, online bool) // 设备上/下 type DeviceShadow interface { AddDevice(device Device) (err error) GetDevice(deviceName string) (device Device, err error) - - SetDevicePoint(deviceName, pointType, pointName string, value interface{}) (err error) - GetDevicePoint(deviceName, pointType, pointName string) (value DevicePoint, err error) - GetDevicePoints(deviceName, pointType string) (points map[string]DevicePoint, err error) + HasDevice(deviceName string) bool + DeleteDevice(deviceName ...string) error GetDeviceUpdateAt(deviceName string) (time.Time, error) - GetDeviceStatus(deviceName string) (online bool, err error) - + RefreshDeviceStatus(deviceName string) SetOnline(deviceName string) (err error) SetOffline(deviceName string) (err error) @@ -35,14 +31,14 @@ type DeviceShadow interface { // StopStatusListener 停止设备状态监听 StopStatusListener() - // SetDeviceTTL 设备影子过期时间 + // SetDeviceTTL 设备影子过期时间, 过期设备将下线 SetDeviceTTL(ttl int) } type deviceShadow struct { m *sync.Map ticker *time.Ticker - handlerFunc OnlineChangeCallback + handlerFunc OnlineChangeCallback //上下线执行的回调函数 ttl int } @@ -50,21 +46,20 @@ var DeviceShadowInstance DeviceShadow func init() { shadow := &deviceShadow{ - m: &sync.Map{}, - ticker: time.NewTicker(time.Second), + m: &sync.Map{}, + ticker: time.NewTicker(time.Second), + ttl: 3600, // 默认1小时 + handlerFunc: deviceHandler, } go shadow.checkOnOff() DeviceShadowInstance = shadow } -func InitDeviceShadow(deviceName, ProductId string) Device { +func InitDeviceShadow(deviceName string) Device { device, err := DeviceShadowInstance.GetDevice(deviceName) if err == UnknownDeviceErr { - attributes := make(map[string]DevicePoint) - telemetry := make(map[string]DevicePoint) - device = NewDevice(deviceName, ProductId, attributes, telemetry) + device = NewDevice(deviceName) DeviceShadowInstance.AddDevice(device) - //shadow.DeviceShadowInstance.SetDeviceTTL() } return device } @@ -90,63 +85,22 @@ func (d *deviceShadow) GetDevice(deviceName string) (device Device, err error) { return Device{}, UnknownDeviceErr } } -func (d *deviceShadow) SetDevicePoint(deviceName, pointType, pointName string, value interface{}) (err error) { - deviceAny, ok := d.m.Load(deviceName) - if !ok { - return UnknownDeviceErr - } - device := deviceAny.(Device) - // update point value - device.updatedAt = time.Now() - switch pointType { - case global.TslAttributesType: - device.AttributesPoints[pointName] = NewDevicePoint(pointName, value) - case global.TslTelemetryType: - device.TelemetryPoints[pointName] = NewDevicePoint(pointName, value) - default: - return errors.New("设备属性类型错误") +func (d *deviceShadow) HasDevice(deviceName string) bool { + if _, ok := d.m.Load(deviceName); ok { + return ok } - - // update - d.m.Store(deviceName, device) - return + return false } - -func (d *deviceShadow) GetDevicePoint(deviceName, pointType, pointName string) (value DevicePoint, err error) { - if deviceAny, ok := d.m.Load(deviceName); ok { - device := deviceAny.(Device) - if !device.online || time.Since(device.updatedAt) > time.Duration(d.ttl)*time.Second { - return - } - switch pointType { - case global.TslAttributesType: - return device.AttributesPoints[pointName], nil - case global.TslTelemetryType: - return device.TelemetryPoints[pointName], nil - default: - return value, errors.New("设备属性类型错误") - } - } else { - return value, UnknownDeviceErr +func (d *deviceShadow) DeleteDevice(deviceName ...string) error { + if len(deviceName) == 0 { + return nil } -} - -func (d *deviceShadow) GetDevicePoints(deviceName, pointType string) (points map[string]DevicePoint, err error) { - if deviceAny, ok := d.m.Load(deviceName); ok { - device := deviceAny.(Device) - switch pointType { - case global.TslAttributesType: - return device.AttributesPoints, nil - case global.TslTelemetryType: - return device.TelemetryPoints, nil - default: - return points, errors.New("设备属性类型错误") - } - } else { - return nil, UnknownDeviceErr + for _, v := range deviceName { + d.m.Delete(v) } + return nil } func (d *deviceShadow) GetDeviceUpdateAt(deviceName string) (time.Time, error) { @@ -181,6 +135,19 @@ func (d *deviceShadow) GetDeviceStatus(deviceName string) (online bool, err erro } } +// RefreshDeviceStatus 刷新设备状态 +func (d *deviceShadow) RefreshDeviceStatus(deviceName string) { + if deviceAny, ok := d.m.Load(deviceName); ok { + device := deviceAny.(Device) + if device.online { + return + } + device.online = true + device.updatedAt = time.Now() + d.m.Store(deviceName, device) + } +} + func (d *deviceShadow) SetOnline(deviceName string) (err error) { return d.changeOnOff(deviceName, true) } @@ -197,6 +164,7 @@ func (d *deviceShadow) StopStatusListener() { d.ticker.Stop() } +// 定时检测设备是否离线 func (d *deviceShadow) checkOnOff() { for range d.ticker.C { d.m.Range(func(key, value interface{}) bool { @@ -204,12 +172,6 @@ func (d *deviceShadow) checkOnOff() { if !ok { return true } - - // fix: when ttl == 0, device always offline - if d.ttl == 0 { - return true - } - if device.online && time.Since(device.updatedAt) > time.Duration(d.ttl)*time.Second { _ = d.SetOffline(device.Name) }