http接口代码优化

Signed-off-by: lixxxww <941403820@qq.com>
This commit is contained in:
lixxxww
2024-01-22 14:26:11 +00:00
committed by Gitee
parent b74f8369a0
commit ff44179fe1

View File

@@ -4,32 +4,47 @@ import (
"context" "context"
"encoding/json" "encoding/json"
"fmt" "fmt"
"github.com/emicklei/go-restful/v3"
"io" "io"
"net" "net"
"net/http" "net/http"
"sync"
"time"
"pandax/iothub/hook_message_work" "pandax/iothub/hook_message_work"
"pandax/iothub/netbase" "pandax/iothub/netbase"
"pandax/pkg/global" "pandax/pkg/global"
"pandax/pkg/global/model" "pandax/pkg/global/model"
"pandax/pkg/rule_engine/message" "pandax/pkg/rule_engine/message"
"sync"
"time" "github.com/emicklei/go-restful/v3"
) )
type HookHttpService struct { type HookHttpService struct {
HookService *hook_message_work.HookService HookService *hook_message_work.HookService
} }
var ( type ConnectionManager struct {
activeConnections sync.Map activeConnections sync.Map
}
type PathType string
const (
AEPTYPE PathType = "AEP"
ONENETTYPE PathType = "ONENET"
) )
const ( const (
AEPTYPE = "AEP" AuthError = "认证错误"
ONENETTYPE = "ONENET" UnknownPath = "路径未识别token或上报类型"
ParseParamError = "解析上报参数失败"
ParseTelemetryErr = "解析遥测失败"
ParseAttrErr = "解析属性失败"
UnknownPathType = "路径上报类型错误"
) )
var connectionManager ConnectionManager
func InitHttpHook(addr string, hs *hook_message_work.HookService) { func InitHttpHook(addr string, hs *hook_message_work.HookService) {
server := NewHttpServer(addr) server := NewHttpServer(addr)
service := &HookHttpService{ service := &HookHttpService{
@@ -41,15 +56,17 @@ func InitHttpHook(addr string, hs *hook_message_work.HookService) {
ws.Route(ws.POST("/{token}/{pathType}").Filter(basicAuthenticate).To(service.hook)) ws.Route(ws.POST("/{token}/{pathType}").Filter(basicAuthenticate).To(service.hook))
container.Add(ws) container.Add(ws)
connectionManager = ConnectionManager{
activeConnections: sync.Map{},
}
server.srv.ConnState = func(conn net.Conn, state http.ConnState) { server.srv.ConnState = func(conn net.Conn, state http.ConnState) {
// 断开连接 // 断开连接
switch state { switch state {
case http.StateHijacked, http.StateClosed: case http.StateHijacked, http.StateClosed:
etoken, _ := activeConnections.Load(conn.RemoteAddr().String()) etoken, _ := connectionManager.activeConnections.Load(conn.RemoteAddr().String())
if etoken != nil { if etoken != nil {
data := netbase.CreateConnectionInfo(message.DisConnectMes, "http", conn.RemoteAddr().String(), conn.RemoteAddr().String(), etoken.(*model.DeviceAuth)) connectionManager.RemoveConnection(conn.RemoteAddr().String(), etoken.(*model.DeviceAuth), service.HookService)
activeConnections.Delete(conn.RemoteAddr().String())
service.HookService.Queue.Queue(data)
} }
} }
} }
@@ -66,7 +83,7 @@ func basicAuthenticate(req *restful.Request, resp *restful.Response, chain *rest
token := req.PathParameter("token") token := req.PathParameter("token")
auth := netbase.Auth(token) auth := netbase.Auth(token)
if !auth { if !auth {
resp.Write([]byte("认证错误")) resp.Write([]byte(AuthError))
return return
} }
chain.ProcessFilter(req, resp) chain.ProcessFilter(req, resp)
@@ -77,26 +94,24 @@ func (hhs *HookHttpService) hook(req *restful.Request, resp *restful.Response) {
pathType := req.PathParameter("pathType") pathType := req.PathParameter("pathType")
platformType := req.QueryParameter("platformType") platformType := req.QueryParameter("platformType")
if token == "" || pathType == "" { if token == "" || pathType == "" {
resp.Write([]byte("路径未识别token或上报类型")) resp.Write([]byte(UnknownPath))
return return
} }
var upData map[string]interface{} var upData map[string]interface{}
err := req.ReadEntity(&upData) err := req.ReadEntity(&upData)
if err != nil { if err != nil {
resp.Write([]byte("解析上报参数失败")) resp.Write([]byte(ParseParamError))
return return
} }
etoken := &model.DeviceAuth{} etoken := &model.DeviceAuth{}
etoken.GetDeviceToken(token) etoken.GetDeviceToken(token)
_, ok := activeConnections.Load(req.Request.RemoteAddr) _, ok := connectionManager.activeConnections.Load(req.Request.RemoteAddr)
// 是否需要添加设备上线通知 // 是否需要添加设备上线通知
if !ok { if !ok {
activeConnections.Store(req.Request.RemoteAddr, etoken) connectionManager.AddConnection(req.Request.RemoteAddr, etoken, hhs.HookService)
data := netbase.CreateConnectionInfo(message.ConnectMes, "http", req.Request.RemoteAddr, req.Request.RemoteAddr, etoken)
go hhs.HookService.Queue.Queue(data)
} }
var data *netbase.DeviceEventInfo var data *netbase.DeviceEventInfo
if platformType == AEPTYPE { if platformType == string(AEPTYPE) {
} }
marshal, _ := json.Marshal(upData) marshal, _ := json.Marshal(upData)
@@ -106,32 +121,44 @@ func (hhs *HookHttpService) hook(req *restful.Request, resp *restful.Response) {
DeviceId: etoken.DeviceId, DeviceId: etoken.DeviceId,
} }
switch pathType { switch pathType {
case Row: case "row":
ts := time.Now().Format("2006-01-02 15:04:05.000") ts := time.Now().Format("2006-01-02 15:04:05.000")
data.Type = message.RowMes data.Type = message.RowMes
data.Datas = fmt.Sprintf(`{"ts": "%s","rowdata": "%s"}`, ts, data.Datas) data.Datas = fmt.Sprintf(`{"ts": "%s","rowdata": "%s"}`, ts, data.Datas)
case Telemetry: case "telemetry":
telemetryData := netbase.UpdateDeviceTelemetryData(data.Datas) telemetryData := netbase.UpdateDeviceTelemetryData(data.Datas)
if telemetryData == nil { if telemetryData == nil {
resp.Write([]byte("解析遥测失败")) resp.Write([]byte(ParseTelemetryErr))
return return
} }
bytes, _ := json.Marshal(telemetryData) bytes, _ := json.Marshal(telemetryData)
data.Type = message.TelemetryMes data.Type = message.TelemetryMes
data.Datas = string(bytes) data.Datas = string(bytes)
case Attributes: case "attributes":
attributesData := netbase.UpdateDeviceAttributesData(data.Datas) attributesData := netbase.UpdateDeviceAttributesData(data.Datas)
if attributesData == nil { if attributesData == nil {
resp.Write([]byte("解析属性失败")) resp.Write([]byte(ParseAttrErr))
return return
} }
bytes, _ := json.Marshal(attributesData) bytes, _ := json.Marshal(attributesData)
data.Datas = string(bytes) data.Datas = string(bytes)
data.Type = message.AttributesMes data.Type = message.AttributesMes
default: default:
resp.Write([]byte("路径上报类型错误")) resp.Write([]byte(UnknownPathType))
return return
} }
go hhs.HookService.Queue.Queue(data) go hhs.HookService.Queue.Queue(data)
io.WriteString(resp, "ok") io.WriteString(resp, "ok")
} }
func (cm *ConnectionManager) AddConnection(addr string, etoken *model.DeviceAuth, service *hook_message_work.HookService) {
cm.activeConnections.Store(addr, etoken)
data := netbase.CreateConnectionInfo(message.ConnectMes, "http", addr, addr, etoken)
go service.Queue.Queue(data)
}
func (cm *ConnectionManager) RemoveConnection(addr string, etoken *model.DeviceAuth, service *hook_message_work.HookService) {
data := netbase.CreateConnectionInfo(message.DisConnectMes, "http", addr, addr, etoken)
cm.activeConnections.Delete(addr)
go service.Queue.Queue(data)
}