Files
PandaX/iothub/server/httpserver/hook.go
2024-08-22 17:43:38 +08:00

165 lines
4.5 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package httpserver
import (
"context"
"encoding/json"
"fmt"
"io"
"net"
"net/http"
"sync"
"time"
"pandax/iothub/hook_message_work"
"pandax/iothub/netbase"
"pandax/pkg/global"
"pandax/pkg/global/model"
"pandax/pkg/rule_engine/message"
"github.com/emicklei/go-restful/v3"
)
type HookHttpService struct {
HookService *hook_message_work.HookService
}
type ConnectionManager struct {
activeConnections sync.Map
}
type PathType string
const (
AEPTYPE PathType = "AEP"
ONENETTYPE PathType = "ONENET"
)
const (
AuthError = "认证错误"
UnknownPath = "路径未识别token或上报类型"
ParseParamError = "解析上报参数失败"
ParseTelemetryErr = "解析遥测失败"
ParseAttrErr = "解析属性失败"
UnknownPathType = "路径上报类型错误"
)
var connectionManager ConnectionManager
func InitHttpHook(addr string, hs *hook_message_work.HookService) {
server := NewHttpServer(addr)
service := &HookHttpService{
HookService: hs,
}
container := server.Container
ws := new(restful.WebService)
ws.Path("/api/v1").Produces(restful.MIME_JSON)
ws.Route(ws.POST("/{token}/{pathType}").Filter(basicAuthenticate).To(service.hook))
container.Add(ws)
connectionManager = ConnectionManager{
activeConnections: sync.Map{},
}
server.srv.ConnState = func(conn net.Conn, state http.ConnState) {
// 断开连接
switch state {
case http.StateHijacked, http.StateClosed:
etoken, _ := connectionManager.activeConnections.Load(conn.RemoteAddr().String())
if etoken != nil {
connectionManager.RemoveConnection(conn.RemoteAddr().String(), etoken.(*model.DeviceAuth), service.HookService)
}
}
}
err := server.Start(context.TODO())
if err != nil {
global.Log.Error("IOTHUB HTTP服务启动错误", err)
} else {
global.Log.Infof("HTTP IOTHUB HOOK Start SUCCESS,Server listen: %s", addr)
}
}
// 获取token进行认证
func basicAuthenticate(req *restful.Request, resp *restful.Response, chain *restful.FilterChain) {
token := req.PathParameter("token")
auth := netbase.Auth(token)
if !auth {
resp.Write([]byte(AuthError))
return
}
chain.ProcessFilter(req, resp)
}
func (hhs *HookHttpService) hook(req *restful.Request, resp *restful.Response) {
token := req.PathParameter("token")
pathType := req.PathParameter("pathType")
platformType := req.QueryParameter("platformType")
if token == "" || pathType == "" {
resp.Write([]byte(UnknownPath))
return
}
var upData map[string]interface{}
err := req.ReadEntity(&upData)
if err != nil {
resp.Write([]byte(ParseParamError))
return
}
etoken := &model.DeviceAuth{}
etoken.GetDeviceToken(token)
_, ok := connectionManager.activeConnections.Load(req.Request.RemoteAddr)
// 是否需要添加设备上线通知
if !ok {
connectionManager.AddConnection(req.Request.RemoteAddr, etoken, hhs.HookService)
}
var data *netbase.DeviceEventInfo
if platformType == string(AEPTYPE) {
}
marshal, _ := json.Marshal(upData)
data = &netbase.DeviceEventInfo{
Datas: string(marshal),
DeviceAuth: etoken,
DeviceId: etoken.DeviceId,
}
switch pathType {
case "row":
ts := time.Now().Format("2006-01-02 15:04:05.000")
data.Type = message.RowMes
data.Datas = fmt.Sprintf(`{"ts": "%s","rowdata": "%s"}`, ts, data.Datas)
case "telemetry":
telemetryData := netbase.UpdateDeviceTelemetryData(data.Datas)
if telemetryData == nil {
resp.Write([]byte(ParseTelemetryErr))
return
}
bytes, _ := json.Marshal(telemetryData)
data.Type = message.TelemetryMes
data.Datas = string(bytes)
case "attributes":
attributesData := netbase.UpdateDeviceAttributesData(data.Datas)
if attributesData == nil {
resp.Write([]byte(ParseAttrErr))
return
}
bytes, _ := json.Marshal(attributesData)
data.Datas = string(bytes)
data.Type = message.AttributesMes
default:
resp.Write([]byte(UnknownPathType))
return
}
go hhs.HookService.Queue.Queue(data)
io.WriteString(resp, "ok")
}
func (cm *ConnectionManager) AddConnection(addr string, etoken *model.DeviceAuth, service *hook_message_work.HookService) {
cm.activeConnections.Store(addr, etoken)
data := netbase.CreateEventInfo(message.ConnectMes, "info", fmt.Sprintf("设备%s通过HTTP协议连接", etoken.Name), etoken)
go service.Queue.Queue(data)
}
func (cm *ConnectionManager) RemoveConnection(addr string, etoken *model.DeviceAuth, service *hook_message_work.HookService) {
data := netbase.CreateEventInfo(message.DisConnectMes, "info", fmt.Sprintf("设备%s断开连接", etoken.Name), etoken)
cm.activeConnections.Delete(addr)
go service.Queue.Queue(data)
}