diff --git a/apps/device/api/device.go b/apps/device/api/device.go index 7d35e8a..0ad1ba2 100644 --- a/apps/device/api/device.go +++ b/apps/device/api/device.go @@ -7,7 +7,6 @@ package api // ========================================================================== import ( "fmt" - "pandax/apps/device/util" "pandax/kit/biz" "pandax/kit/model" "pandax/kit/restfulx" @@ -131,6 +130,18 @@ func (p *DeviceApi) GetDeviceStatus(rc *restfulx.ReqCtx) { rc.ResData = res } +func (p *DeviceApi) GetDeviceEvents(rc *restfulx.ReqCtx) { + pageNum := restfulx.QueryInt(rc, "pageNum", 1) + pageSize := restfulx.QueryInt(rc, "pageSize", 10) + deviceId := restfulx.PathParam(rc, "id") + ty := restfulx.QueryParam(rc, "type") + offset := pageSize * (pageNum - 1) + sql := `select * from ? where deviceId = ? and type = ? DESC LIMIT ?,? ` + list, err := global.TdDb.GetAllEvents(sql, deviceId, ty, offset, pageSize) + biz.ErrIsNilAppendErr(err, "查询设备事件历史失败") + rc.ResData = list +} + // GetDeviceTelemetryHistory 获取Device属性的遥测历史 func (p *DeviceApi) GetDeviceTelemetryHistory(rc *restfulx.ReqCtx) { id := restfulx.PathParam(rc, "id") @@ -146,21 +157,6 @@ func (p *DeviceApi) GetDeviceTelemetryHistory(rc *restfulx.ReqCtx) { rc.ResData = rs } -// 下发设备属性 -func (p *DeviceApi) DownAttribute(rc *restfulx.ReqCtx) { - id := restfulx.PathParam(rc, "id") - key := restfulx.QueryParam(rc, "key") - value := restfulx.QueryParam(rc, "value") - biz.NotEmpty(value, "请设置属性值") - err := util.BuildRunDeviceRpc(id, "single", map[string]interface{}{ - "method": "setAttributes", - "params": map[string]interface{}{ - key: value, - }, - }) - biz.ErrIsNilAppendErr(err, "下发失败:") -} - // InsertDevice 添加Device func (p *DeviceApi) InsertDevice(rc *restfulx.ReqCtx) { var data entity.Device diff --git a/apps/device/api/device_cmd.go b/apps/device/api/device_cmd.go index 427d2f5..7fc6491 100644 --- a/apps/device/api/device_cmd.go +++ b/apps/device/api/device_cmd.go @@ -3,17 +3,15 @@ package api // ========================================================================== import ( "encoding/json" - "pandax/apps/device/util" "pandax/kit/biz" "pandax/kit/model" "pandax/kit/restfulx" - "pandax/kit/utils" - "pandax/pkg/global" + devicerpc "pandax/pkg/device_rpc" "strings" - "time" "pandax/apps/device/entity" "pandax/apps/device/services" + "pandax/apps/device/util" ) type DeviceCmdLogApi struct { @@ -50,22 +48,12 @@ func (p *DeviceCmdLogApi) InsertDeviceCmdLog(rc *restfulx.ReqCtx) { err := json.Unmarshal([]byte(data.CmdContent), &ms) biz.ErrIsNil(err, "指令格式不正确") biz.IsTrue(len(ms) > 0, "指令格式不正确") - data.Id = utils.GenerateID() - data.State = "2" - data.RequestTime = time.Now().Format("2006-01-02 15:04:05") go func() { - err := util.BuildRunDeviceRpc(data.DeviceId, data.Mode, map[string]interface{}{ - "method": data.CmdName, - "params": ms, - }) - if err != nil { - global.Log.Error("规则链执行失败", err) - data.State = "1" - } else { - data.State = "0" + rpc := devicerpc.RpcPayload{ + Method: data.CmdName, + Params: ms, } - data.ResponseTime = time.Now().Format("2006-01-02 15:04:05.000") - err = p.DeviceCmdLogApp.Insert(data) + err := util.BuildRunDeviceRpc(data.DeviceId, data.Mode, rpc) biz.ErrIsNil(err, "添加指令记录失败") }() } diff --git a/apps/device/router/device.go b/apps/device/router/device.go index 4f13f0d..8b5732a 100644 --- a/apps/device/router/device.go +++ b/apps/device/router/device.go @@ -80,6 +80,18 @@ func InitDeviceRouter(container *restful.Container) { Returns(200, "OK", []entity.DeviceStatusVo{}). Returns(404, "Not Found", nil)) + ws.Route(ws.GET("/{id}/event").To(func(request *restful.Request, response *restful.Response) { + restfulx.NewReqCtx(request, response).WithLog("获取设备事件历史").Handle(s.GetDeviceEvents) + }). + Doc("获取设备事件历史"). + Param(ws.QueryParameter("pageNum", "页数").Required(true).DataType("int")). + Param(ws.QueryParameter("pageSize", "每页条数").Required(true).DataType("int")). + Param(ws.PathParameter("id", "设备ID").Required(true).DataType("string")). + Param(ws.QueryParameter("type", "事件类型").Required(true).DataType("string")). + Metadata(restfulspec.KeyOpenAPITags, tags). // on the response + Returns(200, "OK", []map[string]any{}). + Returns(404, "Not Found", nil)) + ws.Route(ws.GET("/{id}/property/history").To(func(request *restful.Request, response *restful.Response) { restfulx.NewReqCtx(request, response).WithLog("获取设备属性的遥测历史").Handle(s.GetDeviceTelemetryHistory) }). @@ -93,15 +105,6 @@ func InitDeviceRouter(container *restful.Container) { Returns(200, "OK", []map[string]any{}). Returns(404, "Not Found", nil)) - ws.Route(ws.GET("/{id}/attribute/down").To(func(request *restful.Request, response *restful.Response) { - restfulx.NewReqCtx(request, response).WithLog("获取Device属性下发").Handle(s.DownAttribute) - }). - Doc("获取Device属性下发"). - Param(ws.PathParameter("id", "Id").DataType("string")). - Param(ws.QueryParameter("key", "属性KEY").Required(false).DataType("string")). - Param(ws.QueryParameter("value", "属性Value").Required(false).DataType("string")). - Metadata(restfulspec.KeyOpenAPITags, tags)) - ws.Route(ws.POST("").To(func(request *restful.Request, response *restful.Response) { restfulx.NewReqCtx(request, response).WithLog("添加Device信息").Handle(s.InsertDevice) }). diff --git a/apps/device/services/product_template.go b/apps/device/services/product_template.go index 06d3a2f..41bff2b 100644 --- a/apps/device/services/product_template.go +++ b/apps/device/services/product_template.go @@ -10,6 +10,7 @@ type ( ProductTemplateModel interface { Insert(data entity.ProductTemplate) (*entity.ProductTemplate, error) FindOne(id string) (*entity.ProductTemplate, error) + FindOneByKey(deviceId, key string) (*entity.ProductTemplate, error) FindListPage(page, pageSize int, data entity.ProductTemplate) (*[]entity.ProductTemplate, int64, error) FindListAttrs(data entity.ProductTemplate) (*[]entity.ProductTemplate, error) FindList(data entity.ProductTemplate) (*[]entity.ProductTemplate, error) @@ -38,6 +39,13 @@ func (m *templateModelImpl) FindOne(id string) (*entity.ProductTemplate, error) return resData, err } +func (m *templateModelImpl) FindOneByKey(deviceId, key string) (*entity.ProductTemplate, error) { + resData := new(entity.ProductTemplate) + db := global.Db.Table(m.table).Where("pid = ?", deviceId).Where("key = ?", key) + err := db.First(resData).Error + return resData, err +} + func (m *templateModelImpl) FindListPage(page, pageSize int, data entity.ProductTemplate) (*[]entity.ProductTemplate, int64, error) { list := make([]entity.ProductTemplate, 0) var total int64 = 0 diff --git a/apps/device/util/device_rpc.go b/apps/device/util/device_rpc.go index 255dce8..0f6bb15 100644 --- a/apps/device/util/device_rpc.go +++ b/apps/device/util/device_rpc.go @@ -7,13 +7,15 @@ import ( "pandax/apps/device/services" ruleEntity "pandax/apps/rule/entity" ruleService "pandax/apps/rule/services" + "pandax/pkg/cache" + devicerpc "pandax/pkg/device_rpc" "pandax/pkg/global" "pandax/pkg/rule_engine" "pandax/pkg/rule_engine/message" "pandax/pkg/tool" ) -func BuildRunDeviceRpc(deviceId, mode string, msgData map[string]interface{}) error { +func BuildRunDeviceRpc(deviceId, mode string, rp devicerpc.RpcPayload) error { device, err := services.DeviceModelDao.FindOne(deviceId) if err != nil { return err @@ -35,10 +37,21 @@ func BuildRunDeviceRpc(deviceId, mode string, msgData map[string]interface{}) er dataCode := ruleData.LfData.DataCode code, _ := json.Marshal(dataCode) //新建规则链实体 - instance, errs := rule_engine.NewRuleChainInstance(findOne.Id, code) - if err != nil { - return errs + instance := &rule_engine.RuleChainInstance{} + ruleInstance, bo := cache.GetProductRule(device.Product.Id) + if !bo { + instance, err = rule_engine.NewRuleChainInstance(findOne.Id, code) + if err != nil { + return err + } + } else { + if data, ok := ruleInstance.(*rule_engine.RuleChainInstance); ok { + instance = data + } else { + return errors.New("规则实体解析错误") + } } + metadataVals := map[string]interface{}{ "deviceId": device.Id, "mode": mode, @@ -49,10 +62,11 @@ func BuildRunDeviceRpc(deviceId, mode string, msgData map[string]interface{}) er "orgId": device.OrgId, "owner": device.Owner, } - msg := message.NewMessage(device.Owner, message.RpcRequestToDevice, msgData, metadataVals) + + msg := message.NewMessage(device.Owner, message.RpcRequestToDevice, rp.ToMap(), metadataVals) err = instance.StartRuleChain(context.Background(), msg) if err != nil { - global.Log.Error("规则链执行失败", errs) + global.Log.Error("规则链执行失败", err) } return err } diff --git a/iothub/README.md b/iothub/README.md index 9ee6422..e742b3d 100644 --- a/iothub/README.md +++ b/iothub/README.md @@ -91,7 +91,7 @@ devA 为设备标识 "devB": "offline" } ``` -## 命令下发,设备请求格式, +## 服务端命令下发,设备请求格式, ```json { "method": "restart", @@ -103,14 +103,28 @@ devA 为设备标识 } } ``` -属性下发 method: 'setAttributes' - -## 命令响应的格式 +## 服务端属性下发 method: 'setAttributes' ```json { - "method": "2343", + "method": "setAttributes", "params": { "aa": "2" } } ``` + +## 设备端 请求的格式 +{ + "method": "getCurrentTime", + "params": { + "aa": "2" + } +} + +## 设备端 响应的格式 +{ + "method": "cmdResp", + "params": { + "aa": "2" + } +} \ No newline at end of file diff --git a/iothub/client/mqttclient/rpc.go b/iothub/client/mqttclient/rpc.go index a15c8bd..b392489 100644 --- a/iothub/client/mqttclient/rpc.go +++ b/iothub/client/mqttclient/rpc.go @@ -3,13 +3,11 @@ package mqttclient import ( "errors" "fmt" - "math/rand" - "time" ) const ( - RpcRespTopic = `v1/devices/me/rpc/response/%d` - RpcReqTopic = `v1/devices/me/rpc/request/%d` + RpcRespTopic = `v1/devices/me/rpc/response/%s` + RpcReqTopic = `v1/devices/me/rpc/request/%s` ) const ( @@ -18,7 +16,7 @@ const ( ) type RpcRequest struct { - RequestId int + RequestId string Mode string //单向、双向 单项只发送不等待响应 双向需要等到响应 Timeout int // 设置双向时,等待的超时时间 } @@ -41,9 +39,3 @@ func (rpc RpcRequest) Pub(deviceId, reqPayload string) error { } return Publish(topic, value.(string), reqPayload) } - -func (rpc *RpcRequest) GetRequestId() { - rand.Seed(time.Now().UnixNano()) - // 生成随机整数 - rpc.RequestId = rand.Intn(10000) + 1 // 生成0到99之间的随机整数 -} diff --git a/iothub/client/udpclient/udp.go b/iothub/client/udpclient/udp.go new file mode 100644 index 0000000..2ff6e2d --- /dev/null +++ b/iothub/client/udpclient/udp.go @@ -0,0 +1,47 @@ +package udpclient + +import ( + "encoding/hex" + "net" + "pandax/pkg/global" + "sync" +) + +type UdpClientT struct { + Conn *net.UDPConn + Addr *net.UDPAddr +} + +var UdpClient sync.Map + +func Send(deviceId, msg string) error { + if conn, ok := UdpClient.Load(deviceId); ok { + conn := conn.(*UdpClientT) + global.Log.Infof("设备%s, 发送指令%s", deviceId, msg) + _, err := conn.Conn.WriteToUDP([]byte(msg), conn.Addr) + if err != nil { + return err + } + } else { + global.Log.Infof("设备%s TCP连接不存在, 发送指令失败", deviceId) + } + return nil +} + +func SendHex(deviceId, msg string) error { + if conn, ok := UdpClient.Load(deviceId); ok { + conn := conn.(*UdpClientT) + global.Log.Infof("设备%s, 发送指令%s", deviceId, msg) + b, err := hex.DecodeString(msg) + if err != nil { + return err + } + _, err = conn.Conn.WriteToUDP(b, conn.Addr) + if err != nil { + return err + } + } else { + global.Log.Infof("设备%s TCP连接不存在, 发送指令失败", deviceId) + } + return nil +} diff --git a/iothub/hook_message_work/hook_message_work.go b/iothub/hook_message_work/hook_message_work.go index 4667410..a85cc67 100644 --- a/iothub/hook_message_work/hook_message_work.go +++ b/iothub/hook_message_work/hook_message_work.go @@ -14,8 +14,12 @@ import ( "pandax/pkg/global/model" "pandax/pkg/rule_engine" "pandax/pkg/rule_engine/message" + "pandax/pkg/tdengine" "pandax/pkg/tool" "pandax/pkg/websocket" + "time" + + "github.com/kakuilan/kgo" ) // 消息处理模块 @@ -38,11 +42,8 @@ func (s *HookService) handleOne(msg *netbase.DeviceEventInfo) { <-s.Ch } }() - // 去除上传数据的非法空字符 - // msg.Datas = strings.ReplaceAll(msg.Datas, "\\u0000", "") - switch msg.Type { - case message.RowMes, message.AttributesMes, message.TelemetryMes, message.RpcRequestFromDevice: + case message.RowMes, message.AttributesMes, message.TelemetryMes, message.RpcRequestFromDevice, message.UpEventMes: msgVals := make(map[string]interface{}) err := json.Unmarshal([]byte(msg.Datas), &msgVals) if err != nil { @@ -65,6 +66,29 @@ func (s *HookService) handleOne(msg *netbase.DeviceEventInfo) { global.Log.Error("规则链执行失败", err) return } + // 保存事件信息 + if msg.Type == message.UpEventMes { + tsl, err := services.ProductTemplateModelDao.FindOneByKey(msg.DeviceId, msg.Identifier) + if err != nil { + return + } + ci := &tdengine.Events{ + DeviceId: msg.DeviceId, + Name: msg.Identifier, + Type: tsl.Type, + Content: msg.Datas, + Ts: time.Now().Format("2006-01-02 15:04:05.000"), + } + data, err := kgo.KConv.Struct2Map(ci, "") + if err != nil { + global.Log.Error("事件格式转化错误") + return + } + err = global.TdDb.InsertEvent(data) + if err != nil { + global.Log.Error("事件添加错误", err) + } + } case message.DisConnectMes, message.ConnectMes: // 更改设备在线状态 if msg.Type == message.ConnectMes { diff --git a/iothub/netbase/hook_base.go b/iothub/netbase/hook_base.go index 2560ba4..29a9928 100644 --- a/iothub/netbase/hook_base.go +++ b/iothub/netbase/hook_base.go @@ -2,7 +2,6 @@ package netbase import ( "encoding/json" - "fmt" "pandax/apps/device/entity" "pandax/apps/device/services" exhook "pandax/iothub/server/emqxserver/protobuf" @@ -182,6 +181,15 @@ func SplitLwm2mClientID(lwm2mClientID string, index int) string { } func GetRequestIdFromTopic(reg, topic string) (requestId string) { + re := regexp.MustCompile(reg) + res := re.FindStringSubmatch(topic) + if len(res) > 2 { + return res[2] + } + return "" +} + +func GetEventFromTopic(reg, topic string) (identifier string) { re := regexp.MustCompile(reg) res := re.FindStringSubmatch(topic) if len(res) > 1 { @@ -190,13 +198,14 @@ func GetRequestIdFromTopic(reg, topic string) (requestId string) { return "" } -func CreateConnectionInfo(msgType, protocol, clientID, peerHost string, deviceAuth *model.DeviceAuth) *DeviceEventInfo { +// eventType 事件类型 info alarm +func CreateEvent(msgType, eventType, content string, deviceAuth *model.DeviceAuth) *DeviceEventInfo { ts := time.Now().Format("2006-01-02 15:04:05.000") ci := &tdengine.Events{ DeviceId: deviceAuth.DeviceId, Name: msgType, - Type: "info", - Content: fmt.Sprintf("设备%s, %s 事件", deviceAuth.Name, msgType), + Type: eventType, + Content: content, Ts: ts, } v, err := json.Marshal(*ci) diff --git a/iothub/netbase/iothub_session.go b/iothub/netbase/iothub_session.go index 566a080..4305a9b 100644 --- a/iothub/netbase/iothub_session.go +++ b/iothub/netbase/iothub_session.go @@ -10,7 +10,8 @@ type DeviceEventInfo struct { DeviceAuth *model.DeviceAuth `json:"deviceAuth"` Datas string `json:"datas"` Type string `json:"type"` - RequestId string `json:"requestId"` + RequestId string `json:"requestId"` // rpc 请求ID + Identifier string `json:"identifier"` //事件标识 } func (j *DeviceEventInfo) Bytes() []byte { diff --git a/iothub/server/emqxserver/const.go b/iothub/server/emqxserver/const.go index 5c485c8..7011162 100644 --- a/iothub/server/emqxserver/const.go +++ b/iothub/server/emqxserver/const.go @@ -16,7 +16,9 @@ const ( TelemetryGatewayTopic = "v1/gateway/telemetry" ConnectGatewayTopic = "v1/gateway/connect" - RpcReq = `v1/devices/me/rpc/request/(.*?)$` + RpcReq = `v1/devices/me/rpc/(.*?)/(.*?)$` + + EventReq = `v1/devices/event/(.*?)$` ) var IotHubTopic = NewIotHubTopic() @@ -39,8 +41,11 @@ func (iht TopicMeg) GetMessageType(topic string) string { if meg, ok := iht[topic]; ok { return meg } - if strings.Contains(topic, "v1/devices/me/rpc/request") { + if strings.Contains(topic, "v1/devices/me/rpc/request") || strings.Contains(topic, "v1/devices/me/rpc/response") { return message.RpcRequestFromDevice } + if strings.Contains(topic, "v1/devices/event") { + return message.UpEventMes + } return "" } diff --git a/iothub/server/emqxserver/hook.go b/iothub/server/emqxserver/hook.go index 6f1fdf4..a48aba3 100644 --- a/iothub/server/emqxserver/hook.go +++ b/iothub/server/emqxserver/hook.go @@ -88,7 +88,7 @@ func (s *HookGrpcService) OnClientConnected(ctx context.Context, in *exhook2.Cli } //添加连接ID mqttclient.Session.Store(etoken.DeviceId, in.Clientinfo.Clientid) - data := netbase.CreateConnectionInfo(message.ConnectMes, "mqtt", in.Clientinfo.Clientid, in.Clientinfo.Peerhost, etoken) + data := netbase.CreateEvent(message.ConnectMes, "info", fmt.Sprintf("设备%s通过MQTT协议连接", etoken.Name), etoken) go s.HookService.Queue.Queue(data) return &exhook2.EmptySuccess{}, nil } @@ -103,7 +103,7 @@ func (s *HookGrpcService) OnClientDisconnected(ctx context.Context, in *exhook2. } //删除连接ID mqttclient.Session.Delete(etoken.DeviceId) - data := netbase.CreateConnectionInfo(message.DisConnectMes, "mqtt", in.Clientinfo.Clientid, in.Clientinfo.Peerhost, etoken) + data := netbase.CreateEvent(message.DisConnectMes, "info", fmt.Sprintf("设备%s断开连接", etoken.Name), etoken) go s.HookService.Queue.Queue(data) return &exhook2.EmptySuccess{}, nil } @@ -243,10 +243,10 @@ func (s *HookGrpcService) OnMessagePublish(ctx context.Context, in *exhook2.Mess if in.Message.Topic == ConnectGatewayTopic { if val, ok := value.(string); ok { if val == "online" { - data = netbase.CreateConnectionInfo(message.ConnectMes, "mqtt", in.Message.From, in.Message.Headers["peerhost"], auth) + data = netbase.CreateEvent(message.ConnectMes, "info", fmt.Sprintf("子设备%s通过网关连接", etoken.Name), auth) } if val == "offline" { - data = netbase.CreateConnectionInfo(message.DisConnectMes, "mqtt", in.Message.From, in.Message.Headers["peerhost"], auth) + data = netbase.CreateEvent(message.ConnectMes, "info", fmt.Sprintf("子设备设备%s通过网关连接", etoken.Name), auth) } // 子设备发送到队列里 go s.HookService.Queue.Queue(data) @@ -281,6 +281,10 @@ func (s *HookGrpcService) OnMessagePublish(ctx context.Context, in *exhook2.Mess // 获取请求id id := netbase.GetRequestIdFromTopic(RpcReq, in.Message.Topic) data.RequestId = id + case message.UpEventMes: + data.Type = message.UpEventMes + identifier := netbase.GetEventFromTopic(EventReq, in.Message.Topic) + data.Identifier = identifier } //将数据放到队列中 go s.HookService.Queue.Queue(data) diff --git a/iothub/server/httpserver/hook.go b/iothub/server/httpserver/hook.go index 1493cfe..6d56093 100644 --- a/iothub/server/httpserver/hook.go +++ b/iothub/server/httpserver/hook.go @@ -153,12 +153,12 @@ func (hhs *HookHttpService) hook(req *restful.Request, resp *restful.Response) { func (cm *ConnectionManager) AddConnection(addr string, etoken *model.DeviceAuth, service *hook_message_work.HookService) { cm.activeConnections.Store(addr, etoken) - data := netbase.CreateConnectionInfo(message.ConnectMes, "http", addr, addr, etoken) + data := netbase.CreateEvent(message.ConnectMes, "info", fmt.Sprintf("设备%s通过HTTP协议连接", etoken.Name), etoken) go service.Queue.Queue(data) } func (cm *ConnectionManager) RemoveConnection(addr string, etoken *model.DeviceAuth, service *hook_message_work.HookService) { - data := netbase.CreateConnectionInfo(message.DisConnectMes, "http", addr, addr, etoken) + data := netbase.CreateEvent(message.DisConnectMes, "info", fmt.Sprintf("设备%s断开连接", etoken.Name), etoken) cm.activeConnections.Delete(addr) go service.Queue.Queue(data) } diff --git a/iothub/server/tcpserver/hook.go b/iothub/server/tcpserver/hook.go index 4abced9..3d9eaea 100644 --- a/iothub/server/tcpserver/hook.go +++ b/iothub/server/tcpserver/hook.go @@ -47,7 +47,7 @@ func handleConnection(conn *net.TCPConn, hs *hook_message_work.HookService) { defer func() { _ = conn.Close() if isAuth { - data := netbase.CreateConnectionInfo(message.DisConnectMes, "tcp", conn.RemoteAddr().String(), conn.RemoteAddr().String(), etoken) + data := netbase.CreateEvent(message.DisConnectMes, "info", fmt.Sprintf("设备%s断开连接", etoken.Name), etoken) go hs.Queue.Queue(data) } tcpclient.TcpClient.Delete(etoken.DeviceId) @@ -67,7 +67,7 @@ func handleConnection(conn *net.TCPConn, hs *hook_message_work.HookService) { auth := netbase.Auth(token) if auth { global.Log.Infof("TCP协议 设备%s,认证成功", etoken.DeviceId) - data := netbase.CreateConnectionInfo(message.ConnectMes, "tcp", conn.RemoteAddr().String(), conn.RemoteAddr().String(), etoken) + data := netbase.CreateEvent(message.ConnectMes, "info", fmt.Sprintf("设备%s通过TCP协议连接", etoken.Name), etoken) go hs.Queue.Queue(data) isAuth = true tcpclient.TcpClient.Store(etoken.DeviceId, conn) diff --git a/iothub/server/udpserver/hook.go b/iothub/server/udpserver/hook.go index 992c482..74c4f0b 100644 --- a/iothub/server/udpserver/hook.go +++ b/iothub/server/udpserver/hook.go @@ -7,7 +7,7 @@ import ( "net" "time" - udpclient "pandax/iothub/client/updclient" + udpclient "pandax/iothub/client/udpclient" "pandax/iothub/hook_message_work" "pandax/iothub/netbase" "pandax/pkg/global" @@ -44,7 +44,7 @@ func InitUdpHook(addr string, hs *hook_message_work.HookService) { _ = server.listener.Close() if isAuth, ok := authMap[client.AddrPort().String()]; ok && isAuth { - data := netbase.CreateConnectionInfo(message.DisConnectMes, "udp", client.IP.String(), client.AddrPort().String(), etoken) + data := netbase.CreateEvent(message.DisConnectMes, "info", fmt.Sprintf("设备%s断开连接", etoken.Name), etoken) go hhs.HookService.Queue.Queue(data) } udpclient.UdpClient.Delete(etoken.DeviceId) @@ -72,7 +72,7 @@ func InitUdpHook(addr string, hs *hook_message_work.HookService) { if auth { global.Log.Infof("UDP协议 设备%s,认证成功", etoken.DeviceId) - data := netbase.CreateConnectionInfo(message.ConnectMes, "udp", client.IP.String(), client.AddrPort().String(), etoken) + data := netbase.CreateEvent(message.ConnectMes, "info", fmt.Sprintf("设备%s通过UDP协议连接", etoken.Name), etoken) go hhs.HookService.Queue.Queue(data) authMap[client.AddrPort().String()] = true @@ -112,4 +112,4 @@ func (hhs *HookUdpService) SendBytes(addr *net.UDPAddr, msg []byte) error { hhs.HookService.MessageCh <- data } return err -} \ No newline at end of file +} diff --git a/pkg/cache/product_rule.go b/pkg/cache/product_rule.go index e8ec8dd..37cc39c 100644 --- a/pkg/cache/product_rule.go +++ b/pkg/cache/product_rule.go @@ -11,6 +11,10 @@ func ComputeIfAbsentProductRule(key string, fun func(any) (any, error)) (any, er return ProductCache.ComputeIfAbsent(key, fun) } +func GetProductRule(key string) (any, bool) { + return ProductCache.Get(key) +} + func DelProductRule(key string) { ProductCache.Delete(key) } diff --git a/pkg/device_rpc/rpc.go b/pkg/device_rpc/rpc.go new file mode 100644 index 0000000..31af469 --- /dev/null +++ b/pkg/device_rpc/rpc.go @@ -0,0 +1,35 @@ +package devicerpc + +import ( + "errors" + "fmt" + "time" + + "github.com/kakuilan/kgo" +) + +type RpcPayload struct { + Method string `json:"method"` + Params any `json:"params"` +} + +func (rp *RpcPayload) ToMap() map[string]any { + data, err := kgo.KConv.Struct2Map(rp, "") + if err != nil { + return nil + } + return data +} + +// GetRequestResult 处理设备端请求服务端方法 +func (rpc RpcPayload) GetRequestResult() (string, error) { + //TODO 此处处理设备的请求参数逻辑 + //自己定义请求逻辑 + if rpc.Params == "getCurrentTime" { + unix := time.Now().Unix() + msg := fmt.Sprintf("%d", unix) + return msg, nil + } + // 获取属性 ... + return "", errors.New("未获取到请求方法") +} diff --git a/pkg/rule_engine/instance.go b/pkg/rule_engine/instance.go index b728aa3..ab4708a 100644 --- a/pkg/rule_engine/instance.go +++ b/pkg/rule_engine/instance.go @@ -19,17 +19,19 @@ type RuleChainInstance struct { } func NewRuleChainInstance(ruleID string, data []byte) (*RuleChainInstance, error) { + instance := &RuleChainInstance{} manifest, err := manifest.New(data) if err != nil { logrus.WithError(err).Errorf("invalid manifest file") return nil, err } - withManifest, err := newInstanceWithManifest(manifest) + instance, err = newInstanceWithManifest(manifest) if err != nil { return nil, err } - withManifest.ruleID = ruleID - return withManifest, nil + instance.ruleID = ruleID + + return instance, nil } func newInstanceWithManifest(m *manifest.Manifest) (*RuleChainInstance, error) { diff --git a/pkg/rule_engine/nodes/action_rpc_request_from_device_node.go b/pkg/rule_engine/nodes/action_rpc_request_from_device_node.go index c03a2bc..d3cf8a4 100644 --- a/pkg/rule_engine/nodes/action_rpc_request_from_device_node.go +++ b/pkg/rule_engine/nodes/action_rpc_request_from_device_node.go @@ -2,16 +2,22 @@ package nodes import ( "errors" + "pandax/apps/device/services" "pandax/iothub/client/mqttclient" "pandax/iothub/client/tcpclient" + "pandax/iothub/client/udpclient" + "pandax/kit/utils" + devicerpc "pandax/pkg/device_rpc" "pandax/pkg/global" - "pandax/pkg/global/model" "pandax/pkg/rule_engine/message" + "time" + + "github.com/kakuilan/kgo" ) type rpcRequestFromDeviceNode struct { bareNode - RequestId int `json:"requestId"` + RequestId string `json:"requestId"` } type rpcRequestFromDeviceFactory struct{} @@ -34,42 +40,55 @@ func (n *rpcRequestFromDeviceNode) Handle(msg *message.Message) error { if msg.Msg.GetValue("method") == nil || msg.Msg.GetValue("params") == nil { return errors.New("指令请求格式错误") } - var rpcp = model.RpcPayload{ + + var rpcp = devicerpc.RpcPayload{ Method: msg.Msg.GetValue("method").(string), Params: msg.Msg.GetValue("params"), } - result, err := rpcp.GetRequestResult() - if err != nil { - if failureLableNode != nil { - n.Debug(msg, message.DEBUGOUT, err.Error()) - return failureLableNode.Handle(msg) - } else { - return err + var err error + // 指令下发响应 + if rpcp.Method == "cmdResp" { + if requestId, ok := msg.Metadata.GetValue("requestId").(string); ok { + services.DeviceCmdLogModelDao.UpdateResp(requestId, kgo.KConv.ToStr(rpcp.Params), time.Now().Format("2006-01-02 15:04:05")) } - } - // 判断设备协议,根据不通协议,发送不通内容 - deviceProtocol := global.MQTTProtocol - if msg.Metadata.GetValue("deviceProtocol") != nil && msg.Metadata.GetValue("deviceProtocol").(string) != "" { - deviceProtocol = msg.Metadata.GetValue("deviceProtocol").(string) - } - deviceId := msg.Metadata.GetValue("deviceId").(string) - if deviceProtocol == global.MQTTProtocol { - rpc := &mqttclient.RpcRequest{} - RequestId := n.RequestId - if RequestId == 0 { - if msg.Metadata.GetValue("requestId") == nil { - rpc.GetRequestId() + } else { + result, err := rpcp.GetRequestResult() + if err != nil { + if failureLableNode != nil { + n.Debug(msg, message.DEBUGOUT, err.Error()) + return failureLableNode.Handle(msg) } else { - RequestId = int(msg.Metadata.GetValue("requestId").(float64)) + return err } - } else { - rpc.RequestId = RequestId } - err = rpc.Pub(deviceId, result) - } - if deviceProtocol == global.TCPProtocol { - err = tcpclient.Send(deviceId, result) + // 判断设备协议,根据不通协议,发送不通内容 + deviceProtocol := global.MQTTProtocol + if msg.Metadata.GetValue("deviceProtocol") != nil && msg.Metadata.GetValue("deviceProtocol").(string) != "" { + deviceProtocol = msg.Metadata.GetValue("deviceProtocol").(string) + } + deviceId := msg.Metadata.GetValue("deviceId").(string) + if deviceProtocol == global.MQTTProtocol || deviceProtocol == global.CoAPProtocol || deviceProtocol == global.LwM2MProtocol { + rpc := &mqttclient.RpcRequest{} + RequestId := n.RequestId + if RequestId == "" { + if msg.Metadata.GetValue("requestId") == nil { + rpc.RequestId = utils.GenerateID() + } else { + rpc.RequestId = msg.Metadata.GetValue("requestId").(string) + } + } else { + rpc.RequestId = RequestId + } + err = rpc.Pub(deviceId, result) + } + if deviceProtocol == global.TCPProtocol { + err = tcpclient.Send(deviceId, result) + } + if deviceProtocol == global.UDPProtocol { + err = udpclient.Send(deviceId, result) + } } + if err != nil { n.Debug(msg, message.DEBUGOUT, err.Error()) if failureLableNode != nil { diff --git a/pkg/rule_engine/nodes/action_rpc_request_to_device_node.go b/pkg/rule_engine/nodes/action_rpc_request_to_device_node.go index 72e5882..3aec63b 100644 --- a/pkg/rule_engine/nodes/action_rpc_request_to_device_node.go +++ b/pkg/rule_engine/nodes/action_rpc_request_to_device_node.go @@ -3,11 +3,18 @@ package nodes import ( "encoding/json" "errors" + "pandax/apps/device/entity" + "pandax/apps/device/services" "pandax/iothub/client/mqttclient" "pandax/iothub/client/tcpclient" + "pandax/iothub/client/udpclient" + "pandax/kit/utils" "pandax/pkg/global" "pandax/pkg/global/model" "pandax/pkg/rule_engine/message" + "time" + + "github.com/kakuilan/kgo" ) type rpcRequestToDeviceNode struct { @@ -34,11 +41,24 @@ func (n *rpcRequestToDeviceNode) Handle(msg *message.Message) error { if msg.Msg.GetValue("method") == nil || msg.Msg.GetValue("params") == nil { return errors.New("指令下发格式错误") } + deviceId := msg.Metadata.GetValue("deviceId").(string) + // 创建请求格式 var datas = model.RpcPayload{ Method: msg.Msg.GetValue("method").(string), Params: msg.Msg.GetValue("params"), } payload, _ := json.Marshal(datas) + + // 构建指令记录 + var data entity.DeviceCmdLog + data.Id = utils.GenerateID() + data.DeviceId = deviceId + data.CmdName = datas.Method + data.CmdContent = kgo.KConv.ToStr(datas.Params) + data.Mode = msg.Metadata.GetValue("mode").(string) + data.State = "2" + data.RequestTime = time.Now().Format("2006-01-02 15:04:05") + mode := mqttclient.SingleMode if n.Timeout > 0 { mode = mqttclient.DoubleMode @@ -49,16 +69,20 @@ func (n *rpcRequestToDeviceNode) Handle(msg *message.Message) error { deviceProtocol = msg.Metadata.GetValue("deviceProtocol").(string) } var err error - deviceId := msg.Metadata.GetValue("deviceId").(string) - if deviceProtocol == global.MQTTProtocol { - var rpc = &mqttclient.RpcRequest{Mode: mode, Timeout: n.Timeout} - rpc.GetRequestId() + if deviceProtocol == global.MQTTProtocol || deviceProtocol == global.CoAPProtocol || deviceProtocol == global.LwM2MProtocol { + var rpc = &mqttclient.RpcRequest{Mode: mode, Timeout: n.Timeout, RequestId: data.Id} err = rpc.RequestCmd(deviceId, string(payload)) } if deviceProtocol == global.TCPProtocol { err = tcpclient.Send(deviceId, string(payload)) } + if deviceProtocol == global.UDPProtocol { + err = udpclient.Send(deviceId, string(payload)) + } + if err != nil { + data.State = "1" + services.DeviceCmdLogModelDao.Insert(data) n.Debug(msg, message.DEBUGOUT, err.Error()) if failureLableNode != nil { return failureLableNode.Handle(msg) @@ -66,7 +90,10 @@ func (n *rpcRequestToDeviceNode) Handle(msg *message.Message) error { return err } } + if successLableNode != nil { + data.State = "0" + services.DeviceCmdLogModelDao.Insert(data) n.Debug(msg, message.DEBUGOUT, "") return successLableNode.Handle(msg) } diff --git a/pkg/tdengine/tdengine_event.go b/pkg/tdengine/tdengine_event.go index c902574..92c1de0 100644 --- a/pkg/tdengine/tdengine_event.go +++ b/pkg/tdengine/tdengine_event.go @@ -4,7 +4,7 @@ import ( "fmt" ) -const connectTableName = "events" +const connectTableName = "device_events" type Events struct { Ts string `json:"ts"` diff --git a/pkg/tdengine/tdengine_log.go b/pkg/tdengine/tdengine_log.go index f0cc380..8119303 100644 --- a/pkg/tdengine/tdengine_log.go +++ b/pkg/tdengine/tdengine_log.go @@ -7,7 +7,7 @@ import ( "github.com/kakuilan/kgo" ) -const logTableName = "logs" +const logTableName = "device_logs" // 日志 TDengine type TdLog struct { @@ -39,7 +39,7 @@ func (s *TdEngine) InsertLog(log *TdLog) (err error) { func (s *TdEngine) ClearLog() (err error) { ts := time.Now().Add(-7 * 24 * time.Hour).Format("2006-01-02") - sql := fmt.Sprintf("DELETE FROM %s WHERE ts < ?", logTableName) + sql := fmt.Sprintf("DELETE FROM %s.%s WHERE ts < ?", s.dbName, logTableName) _, err = s.db.Exec(sql, ts) return diff --git a/pkg/websocket/socket_server.go b/pkg/websocket/socket_server.go index 62f3772..9eb8403 100644 --- a/pkg/websocket/socket_server.go +++ b/pkg/websocket/socket_server.go @@ -3,12 +3,14 @@ package websocket import ( "encoding/json" "fmt" - "github.com/gorilla/websocket" "net/http" "pandax/apps/device/entity" "pandax/apps/device/util" + devicerpc "pandax/pkg/device_rpc" "pandax/pkg/global" "strings" + + "github.com/gorilla/websocket" ) var upGrader = websocket.Upgrader{ @@ -65,10 +67,11 @@ func OnMessage(ws *Websocket, msg string) { return } //2. 根据设备下发属性更改 - err = util.BuildRunDeviceRpc(vtsa.TwinId, "single", map[string]interface{}{ - "method": "setAttributes", - "params": vtsa.Attrs, - }) + rpc := devicerpc.RpcPayload{ + Method: "setAttributes", + Params: vtsa.Attrs, + } + err = util.BuildRunDeviceRpc(vtsa.TwinId, "single", rpc) if err != nil { global.Log.Error("命令发送失败", err) sendMessages("02", "命令发送失败", screenId)