mqtt代码优化

Signed-off-by: lixxxww <941403820@qq.com>
This commit is contained in:
lixxxww
2024-01-22 10:41:59 +00:00
committed by Gitee
parent 14c4252420
commit b74f8369a0
2 changed files with 32 additions and 21 deletions

View File

@@ -11,25 +11,37 @@ import (
"sync"
)
// key 设备idvalue MQTT的clientID
var MqttClient sync.Map
type MqttClient struct {
sync.Map
}
const ClientsInfo string = "client"
const SubscribeTopicsInfo string = "subscribe"
type MqttConfig struct {
HttpBroker string
Username string
Password string
Qos int
}
const (
ClientsInfo = "client"
SubscribeTopicsInfo = "subscribe"
)
var Session MqttClient
// GetEmqInfo 获取emqx信息包括连接信息订阅信息
func GetEmqInfo(infoType string) ([]map[string]interface{}, error) {
var url string
if infoType == ClientsInfo {
switch infoType {
case ClientsInfo:
url = fmt.Sprintf("%s/v5/clients?_page=1&_limit=100000", global.Conf.Mqtt.HttpBroker)
} else if infoType == SubscribeTopicsInfo {
case SubscribeTopicsInfo:
url = fmt.Sprintf("%s/v5/subscriptions?_page=1&_limit=100000", global.Conf.Mqtt.HttpBroker)
} else {
default:
return nil, errors.New("invalid infoType")
}
req, err := http.NewRequest(http.MethodGet, url, nil)
if nil != err {
if err != nil {
return nil, err
}
req.SetBasicAuth(global.Conf.Mqtt.Username, global.Conf.Mqtt.Password)
@@ -38,31 +50,29 @@ func GetEmqInfo(infoType string) ([]map[string]interface{}, error) {
if err != nil {
return nil, err
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, err
}
global.Log.Debug("receive resp, ", string(body))
if resp.StatusCode != 200 {
if resp.StatusCode != http.StatusOK {
return nil, errors.New(resp.Status)
}
var result interface{}
if err = json.Unmarshal(body, &result); nil != err {
var result map[string]interface{}
if err := json.Unmarshal(body, &result); err != nil {
global.Log.Error("body Unmarshal error", err)
return nil, err
}
res, ok := result.(map[string]interface{})
data, ok := result["data"].([]map[string]interface{})
if !ok {
return nil, errors.New("result error")
}
data := res["data"].([]map[string]interface{})
return data, nil
}
// Publish 推送信息
func Publish(topic, clientId string, payload interface{}) error {
if clientId == "" {
return errors.New("未获取到MQTT连接")
@@ -77,12 +87,12 @@ func Publish(topic, clientId string, payload interface{}) error {
"clientid": clientId,
}
data, err := json.Marshal(pubData)
if nil != err {
if err != nil {
global.Log.Error("error ", err)
return err
}
req, err := http.NewRequest(http.MethodPost, url, bytes.NewBuffer(data))
if nil != err {
if err != nil {
return err
}
req.SetBasicAuth(global.Conf.Mqtt.Username, global.Conf.Mqtt.Password)
@@ -94,13 +104,14 @@ func Publish(topic, clientId string, payload interface{}) error {
return err
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
global.Log.Error("error ReadAll", err)
return err
}
global.Log.Debug("receive resp, ", string(body))
if resp.StatusCode != 200 {
if resp.StatusCode != http.StatusOK {
global.Log.Error("bad status ", resp.StatusCode)
return errors.New(resp.Status)
}

View File

@@ -87,7 +87,7 @@ func (s *HookGrpcService) OnClientConnected(ctx context.Context, in *exhook2.Cli
return nil, err
}
//添加连接ID
mqttclient.MqttClient.Store(etoken.DeviceId, in.Clientinfo.Clientid)
mqttclient.Session.Store(etoken.DeviceId, in.Clientinfo.Clientid)
data := netbase.CreateConnectionInfo(message.ConnectMes, "mqtt", in.Clientinfo.Clientid, in.Clientinfo.Peerhost, etoken)
go s.HookService.Queue.Queue(data)
return &exhook2.EmptySuccess{}, nil
@@ -102,7 +102,7 @@ func (s *HookGrpcService) OnClientDisconnected(ctx context.Context, in *exhook2.
return nil, err
}
//删除连接ID
mqttclient.MqttClient.Delete(etoken.DeviceId)
mqttclient.Session.Delete(etoken.DeviceId)
data := netbase.CreateConnectionInfo(message.DisConnectMes, "mqtt", in.Clientinfo.Clientid, in.Clientinfo.Peerhost, etoken)
go s.HookService.Queue.Queue(data)
return &exhook2.EmptySuccess{}, nil