【feat】完成http数据上报

This commit is contained in:
XM-GO
2023-09-26 18:05:47 +08:00
parent 4c7bacad97
commit 950e8d8814
13 changed files with 1269 additions and 78 deletions

View File

@@ -5,6 +5,7 @@ import (
"encoding/json"
"fmt"
"github.com/PandaXGO/PandaKit/biz"
"log"
"pandax/apps/device/entity"
"pandax/apps/device/services"
ruleEntity "pandax/apps/rule/entity"
@@ -24,6 +25,7 @@ func (s *HookService) MessageWork() {
for {
select {
case msg := <-s.MessageCh:
log.Println("一条记录", msg)
s.handleOne(msg) // 处理消息
}
}

View File

@@ -4,16 +4,17 @@ import (
"pandax/iothub/hook_message_work"
"pandax/iothub/server/emqxserver"
"pandax/iothub/server/httpserver"
"pandax/iothub/server/tcpserver"
)
func InitIothub() {
service := hook_message_work.NewHookService()
// 初始化EMQX
emqxserver.InitEmqxHook("", service)
go emqxserver.InitEmqxHook("", service)
// 初始化HTTP
httpserver.InitHttpHook("", service)
go httpserver.InitHttpHook("", service)
//初始化TCP
go tcpserver.InitTcpHook("", service)
// 开启线程处理消息
go service.MessageWork()
}

View File

@@ -2,6 +2,7 @@ package netbase
import (
"encoding/json"
"log"
"pandax/apps/device/services"
"pandax/iothub/server/emqxserver/protobuf"
"pandax/pkg/global"
@@ -11,39 +12,36 @@ import (
"time"
)
func Auth(username, password string) bool {
func Auth(authToken string) bool {
// 根据token去查设备Id以及设备类型
if username == "pandax" && password == "pandax" {
if authToken == "pandax" {
return true
}
etoken := &tool.DeviceAuth{}
// redis 中有就查询,没有就添加
exists, err := global.RedisDb.Exists(global.RedisDb.Context(), username).Result()
exists, err := global.RedisDb.Exists(global.RedisDb.Context(), authToken).Result()
if exists == 1 {
err = global.RedisDb.Get(username, etoken)
err = global.RedisDb.Get(authToken, etoken)
} else {
device, err := services.DeviceModelDao.FindOneByToken(password)
device, err := services.DeviceModelDao.FindOneByToken(authToken)
log.Println(err)
if err != nil {
global.Log.Infof("设备 %s 不存在", username)
global.Log.Infof("设备token %s 不存在", authToken)
return false
}
etoken, err = services.GetDeviceToken(device)
if err != nil {
global.Log.Infof("设备%s添加缓存失败", username)
global.Log.Infof("设备TOKEN %s添加缓存失败", authToken)
return false
}
}
if err != nil {
global.Log.Infof("invalid username %s", username)
global.Log.Infof("invalid authToken %s", authToken)
return false
}
// 判断token是否过期了, 设备过期
if etoken.ExpiredAt < time.Now().Unix() {
global.Log.Infof("设备%s: Token失效", username)
return false
}
if etoken.Token != password {
global.Log.Infof("invalid password %s", password)
global.Log.Infof("设备authToken %s 失效", authToken)
return false
}
return true
@@ -107,17 +105,6 @@ func SplitLwm2mClientID(lwm2mClientID string, index int) string {
return idArray[index]
}
func GetPassword(Clientinfo *exhook.ClientInfo) string {
protocol := Clientinfo.GetProtocol()
var pw string
if protocol == "lwm2m" {
pw = SplitLwm2mClientID(Clientinfo.GetClientid(), 1)
} else {
pw = Clientinfo.GetPassword()
}
return pw
}
// encode data
func EncodeData(jsonData interface{}) ([]byte, error) {
byteData, err := json.Marshal(jsonData)

View File

@@ -85,10 +85,12 @@ func (s *HookGrpcService) OnClientConnack(ctx context.Context, in *exhook2.Clien
func (s *HookGrpcService) OnClientConnected(ctx context.Context, in *exhook2.ClientConnectedRequest) (*exhook2.EmptySuccess, error) {
global.Log.Info(fmt.Sprintf("Client %s Connected ", in.Clientinfo.GetNode()))
ts := time.Now().Format("2006-01-02 15:04:05.000")
username := netbase.GetUserName(in.Clientinfo)
token := netbase.GetUserName(in.Clientinfo)
etoken := &tool.DeviceAuth{}
etoken.GetDeviceToken(token)
ci := &tdengine.ConnectInfo{
ClientID: in.Clientinfo.Clientid,
DeviceId: username,
DeviceId: etoken.DeviceId,
PeerHost: in.Clientinfo.Peerhost,
Protocol: in.Clientinfo.Protocol,
SocketPort: strconv.Itoa(int(in.Clientinfo.Sockport)),
@@ -101,7 +103,7 @@ func (s *HookGrpcService) OnClientConnected(ctx context.Context, in *exhook2.Cli
}
// 添加设备上线记录
data := &netbase.DeviceEventInfo{
DeviceId: username,
DeviceId: etoken.DeviceId,
Datas: string(v),
Type: message.ConnectMes,
}
@@ -112,12 +114,17 @@ func (s *HookGrpcService) OnClientConnected(ctx context.Context, in *exhook2.Cli
func (s *HookGrpcService) OnClientDisconnected(ctx context.Context, in *exhook2.ClientDisconnectedRequest) (*exhook2.EmptySuccess, error) {
global.Log.Info(fmt.Sprintf("%s断开连接", in.Clientinfo.Username))
devicename := netbase.GetUserName(in.Clientinfo)
token := netbase.GetUserName(in.Clientinfo)
etoken := &tool.DeviceAuth{}
err := etoken.GetDeviceToken(token)
if err != nil {
return nil, err
}
ts := time.Now().Format("2006-01-02 15:04:05.000")
ci := &tdengine.ConnectInfo{
ClientID: in.Clientinfo.Clientid,
DeviceId: devicename,
DeviceId: etoken.DeviceId,
PeerHost: in.Clientinfo.Peerhost,
Protocol: in.Clientinfo.Protocol,
SocketPort: strconv.Itoa(int(in.Clientinfo.Sockport)),
@@ -131,7 +138,7 @@ func (s *HookGrpcService) OnClientDisconnected(ctx context.Context, in *exhook2.
// 添加设备下线记录
data := &netbase.DeviceEventInfo{
DeviceId: devicename,
DeviceId: etoken.DeviceId,
Datas: string(v),
Type: message.DisConnectMes,
}
@@ -140,18 +147,17 @@ func (s *HookGrpcService) OnClientDisconnected(ctx context.Context, in *exhook2.
}
func (s *HookGrpcService) OnClientAuthenticate(ctx context.Context, in *exhook2.ClientAuthenticateRequest) (*exhook2.ValuedResponse, error) {
global.Log.Info(fmt.Sprintf("账号%s,密码%s,开始认证", in.Clientinfo.Username, in.Clientinfo.Password))
global.Log.Info(fmt.Sprintf("账号%s,开始认证", in.Clientinfo.Username))
res := &exhook2.ValuedResponse{}
res.Type = exhook2.ValuedResponse_STOP_AND_RETURN
res.Value = &exhook2.ValuedResponse_BoolResult{BoolResult: false}
username := netbase.GetUserName(in.Clientinfo)
pw := netbase.GetPassword(in.Clientinfo)
if username == "" || pw == "" {
global.Log.Warn(fmt.Sprintf("invalid username %s or password %s", username, pw))
token := netbase.GetUserName(in.Clientinfo)
if token == "" {
global.Log.Warn(fmt.Sprintf("invalid username %s", token))
return res, nil
}
authRes := netbase.Auth(username, pw)
authRes := netbase.Auth(token)
res.Value = &exhook2.ValuedResponse_BoolResult{BoolResult: authRes}
return res, nil
@@ -212,6 +218,8 @@ func (s *HookGrpcService) OnMessagePublish(ctx context.Context, in *exhook2.Mess
res.Value = &exhook2.ValuedResponse_BoolResult{BoolResult: true}
return res, nil
}
etoken := &tool.DeviceAuth{}
etoken.GetDeviceToken(in.Message.Headers["username"])
// 获取topic类型
ts := time.Now().Format("2006-01-02 15:04:05.000")
eventType := IotHubTopic.GetMessageType(in.Message.Topic)
@@ -219,7 +227,7 @@ func (s *HookGrpcService) OnMessagePublish(ctx context.Context, in *exhook2.Mess
data := &netbase.DeviceEventInfo{
Type: eventType,
Datas: datas,
DeviceId: in.Message.Headers["username"],
DeviceId: etoken.DeviceId,
}
// 如果是网关子设备单独处理
if eventType == message.GATEWAY {

View File

@@ -0,0 +1,8 @@
package httpserver
const (
Row = `row`
Telemetry = `telemetry`
Attributes = `attributes`
Rpc = `rpc`
)

View File

@@ -2,23 +2,35 @@ package httpserver
import (
"context"
"encoding/json"
"fmt"
"github.com/emicklei/go-restful/v3"
"io"
"log"
"net"
"net/http"
"pandax/iothub/hook_message_work"
"pandax/iothub/netbase"
"pandax/pkg/global"
"strings"
"pandax/pkg/rule_engine/message"
"pandax/pkg/tdengine"
"pandax/pkg/tool"
"sync"
"time"
)
type HookHttpService struct {
HookService *hook_message_work.HookService
}
var (
activeConnections sync.Map
)
func InitHttpHook(addr string, hs *hook_message_work.HookService) {
server := NewHttpServer(addr)
service := NewHookHttpService(hs)
service := &HookHttpService{
HookService: hs,
}
container := server.Container
ws := new(restful.WebService)
ws.Path("/api/v1").Produces(restful.MIME_JSON)
@@ -26,41 +38,123 @@ func InitHttpHook(addr string, hs *hook_message_work.HookService) {
container.Add(ws)
server.srv.ConnState = func(conn net.Conn, state http.ConnState) {
// 断开连接
switch state {
case http.StateNew:
log.Println("New connection", conn.RemoteAddr())
case http.StateActive:
log.Println("Connection active", conn.RemoteAddr())
case http.StateIdle:
log.Println("Connection idle", conn.RemoteAddr())
case http.StateHijacked, http.StateClosed:
log.Println("Connection closed", conn.RemoteAddr())
ts := time.Now().Format("2006-01-02 15:04:05.000")
deviceId, _ := activeConnections.Load(conn.RemoteAddr())
ci := &tdengine.ConnectInfo{
ClientID: conn.RemoteAddr().String(),
DeviceId: deviceId.(string),
PeerHost: conn.RemoteAddr().String(),
Protocol: "http",
Type: message.ConnectMes,
Ts: ts,
}
v, err := netbase.EncodeData(*ci)
if err != nil {
return
}
// 添加设备上线记录
data := &netbase.DeviceEventInfo{
DeviceId: deviceId.(string),
Datas: string(v),
Type: message.ConnectMes,
}
service.HookService.MessageCh <- data
activeConnections.Delete(conn.RemoteAddr())
}
}
err := server.Start(context.TODO())
if err != nil {
global.Log.Error("IOTHUB HTTP服务启动错误", err)
} else {
global.Log.Infof("IOTHUB HOOK Start SUCCESS,Grpc Server listen: %s", addr)
global.Log.Infof("HTTP IOTHUB HOOK Start SUCCESS,Server listen: %s", addr)
}
}
func NewHookHttpService(hs *hook_message_work.HookService) *HookHttpService {
hhs := &HookHttpService{
HookService: hs,
}
return hhs
}
// 获取token进行认证
func basicAuthenticate(req *restful.Request, resp *restful.Response, chain *restful.FilterChain) {
path := req.Request.URL.Path
log.Println(path)
split := strings.Split(path, "/")
log.Println(split)
token := req.PathParameter("token")
auth := netbase.Auth(token)
if !auth {
resp.Write([]byte("认证错误"))
return
}
chain.ProcessFilter(req, resp)
}
func (hhs *HookHttpService) hook(req *restful.Request, resp *restful.Response) {
io.WriteString(resp, "42")
token := req.PathParameter("token")
pathType := req.PathParameter("pathType")
if token == "" || pathType == "" {
resp.Write([]byte("路径未识别token或上报类型"))
return
}
var upData map[string]interface{}
err := req.ReadEntity(&upData)
if err != nil {
resp.Write([]byte("解析上报参数失败"))
return
}
etoken := &tool.DeviceAuth{}
etoken.GetDeviceToken(token)
ts := time.Now().Format("2006-01-02 15:04:05.000")
_, ok := activeConnections.Load(req.Request.RemoteAddr)
// 是否需要添加设备上线通知
if !ok {
activeConnections.Store(req.Request.RemoteAddr, etoken.DeviceId)
ci := &tdengine.ConnectInfo{
ClientID: req.Request.RemoteAddr,
DeviceId: etoken.DeviceId,
PeerHost: req.Request.RemoteAddr,
Protocol: "http",
Type: message.ConnectMes,
Ts: ts,
}
v, err := netbase.EncodeData(*ci)
if err != nil {
return
}
// 添加设备上线记录
data := &netbase.DeviceEventInfo{
DeviceId: etoken.DeviceId,
Datas: string(v),
Type: message.ConnectMes,
}
hhs.HookService.MessageCh <- data
}
marshal, _ := json.Marshal(upData)
data := &netbase.DeviceEventInfo{
Datas: string(marshal),
DeviceId: etoken.DeviceId,
}
switch pathType {
case Row:
data.Type = message.RowMes
data.Datas = fmt.Sprintf(`{"ts": "%s","rowdata": "%s"}`, ts, data.Datas)
case Telemetry:
telemetryData := netbase.UpdateDeviceTelemetryData(data.Datas)
if telemetryData == nil {
resp.Write([]byte("解析遥测失败"))
return
}
bytes, _ := json.Marshal(telemetryData)
data.Type = message.TelemetryMes
data.Datas = string(bytes)
case Attributes:
attributesData := netbase.UpdateDeviceAttributesData(data.Datas)
if attributesData == nil {
resp.Write([]byte("解析属性失败"))
return
}
bytes, _ := json.Marshal(attributesData)
data.Datas = string(bytes)
data.Type = message.AttributesMes
default:
resp.Write([]byte("路径上报类型错误"))
return
}
hhs.HookService.MessageCh <- data
io.WriteString(resp, "ok")
}

View File

@@ -40,19 +40,15 @@ func (s *HttpServer) Type() string {
}
func (s *HttpServer) Start(ctx context.Context) error {
go func() {
if global.Conf.Server.Tls.Enable {
global.Log.Infof("HTTPS Server listen: %s", s.Addr)
if err := s.srv.ListenAndServeTLS(global.Conf.Server.Tls.CertFile, global.Conf.Server.Tls.KeyFile); err != nil {
global.Log.Errorf("error http serve: %s", err)
}
} else {
global.Log.Infof("HTTP Server listen: %s", s.Addr)
if err := s.srv.ListenAndServe(); err != nil {
global.Log.Errorf("error http serve: %s", err)
}
if global.Conf.Server.Tls.Enable {
if err := s.srv.ListenAndServeTLS(global.Conf.Server.Tls.CertFile, global.Conf.Server.Tls.KeyFile); err != nil {
global.Log.Errorf("error http serve: %s", err)
}
}()
} else {
if err := s.srv.ListenAndServe(); err != nil {
global.Log.Errorf("error http serve: %s", err)
}
}
return nil
}

View File

@@ -0,0 +1,109 @@
package tcpserver
import (
"context"
"encoding/hex"
"github.com/emicklei/go-restful/v3"
"log"
"net"
"pandax/iothub/hook_message_work"
"pandax/iothub/netbase"
"pandax/pkg/global"
"pandax/pkg/rule_engine/message"
"strings"
"time"
)
type HookTcpService struct {
HookService *hook_message_work.HookService
keepAlive int64
conn *net.TCPConn
}
func InitTcpHook(addr string, hs *hook_message_work.HookService) {
hhs := &HookTcpService{
HookService: hs,
keepAlive: 20,
}
server := NewTcpServer(addr)
err := server.Start(context.TODO())
if err != nil {
global.Log.Error("IOTHUB HTTP服务启动错误", err)
return
} else {
global.Log.Infof("TCP IOTHUB HOOK Start SUCCESS, Server listen: %s", addr)
}
go func() {
for {
conn, err := server.listener.AcceptTCP()
if err != nil {
global.Log.Error("Error accepting connection:", err)
continue
}
conn.SetReadDeadline(time.Now().Add(20 * time.Second))
hhs.conn = conn
go hhs.hook()
}
}()
}
// 获取token进行认证
func basicAuthenticate(req *restful.Request, resp *restful.Response, chain *restful.FilterChain) {
path := req.Request.URL.Path
log.Println(path)
split := strings.Split(path, "/")
log.Println(split)
chain.ProcessFilter(req, resp)
}
func (hhs *HookTcpService) hook() {
isAuth := false
for {
buf := make([]byte, 128)
n := 0
n, err := hhs.conn.Read(buf)
if err != nil {
// 断开连接 掉线
log.Println("断开连接")
_ = hhs.conn.Close()
isAuth = false
return
}
if !isAuth {
token := string(buf[:n])
log.Println(token)
isAuth = true
} else {
hexData := hex.EncodeToString(buf[:n])
log.Println(hexData)
}
}
}
func (hhs *HookTcpService) Send(message string) error {
return hhs.SendBytes([]byte(message))
}
func (hhs *HookTcpService) SendHex(msg string) error {
b, err := hex.DecodeString(msg)
if err != nil {
return err
}
return hhs.SendBytes(b)
}
func (hhs *HookTcpService) SendBytes(msg []byte) error {
_, err := hhs.conn.Write(msg)
if err != nil {
hhs.conn.Close()
data := &netbase.DeviceEventInfo{
DeviceId: "",
Datas: "",
Type: message.ConnectMes,
}
hhs.HookService.MessageCh <- data
}
return err
}

View File

@@ -0,0 +1,64 @@
package tcpserver
import (
"context"
"crypto/tls"
"fmt"
"net"
"pandax/pkg/global"
)
const DefaultPort = ":9003"
type TcpServer struct {
Addr string
listener *net.TCPListener
}
func NewTcpServer(addr string) *TcpServer {
if addr == "" {
addr = DefaultPort
}
return &TcpServer{
Addr: addr,
}
}
func (s *TcpServer) GetServe() *net.TCPListener {
return s.listener
}
func (s *TcpServer) Type() string {
return "TCP"
}
func (s *TcpServer) Start(ctx context.Context) error {
addr, _ := net.ResolveTCPAddr("tcp", s.Addr)
listener, err := net.ListenTCP("tcp", addr)
if err != nil {
global.Log.Errorf("error http serve: %s", err)
return err
}
s.listener = listener
return nil
}
func (s *TcpServer) Stop(ctx context.Context) error {
s.listener.Close()
return nil
}
func (s *TcpServer) TlsConfig() (*tls.Config, error) {
var certificates []tls.Certificate
cert, err := tls.LoadX509KeyPair(global.Conf.Server.Tls.CertFile, global.Conf.Server.Tls.KeyFile)
if err != nil {
return nil, fmt.Errorf("generate x509 key pair failed: %s ", err)
}
certificates = append(certificates, cert)
if len(certificates) == 0 {
return nil, fmt.Errorf("none valid certs and secret")
}
return &tls.Config{Certificates: certificates}, nil
}