diff --git a/iothub/client/mqttclient/mqtt_api.go b/iothub/client/mqttclient/mqtt_api.go index 3c00d36..c763cdb 100644 --- a/iothub/client/mqttclient/mqtt_api.go +++ b/iothub/client/mqttclient/mqtt_api.go @@ -11,25 +11,37 @@ import ( "sync" ) -// key 设备id,value MQTT的clientID -var MqttClient sync.Map +type MqttClient struct { + sync.Map +} -const ClientsInfo string = "client" -const SubscribeTopicsInfo string = "subscribe" +type MqttConfig struct { + HttpBroker string + Username string + Password string + Qos int +} + +const ( + ClientsInfo = "client" + SubscribeTopicsInfo = "subscribe" +) + +var Session MqttClient -// GetEmqInfo 获取emqx信息,包括连接信息,订阅信息 func GetEmqInfo(infoType string) ([]map[string]interface{}, error) { var url string - if infoType == ClientsInfo { + switch infoType { + case ClientsInfo: url = fmt.Sprintf("%s/v5/clients?_page=1&_limit=100000", global.Conf.Mqtt.HttpBroker) - } else if infoType == SubscribeTopicsInfo { + case SubscribeTopicsInfo: url = fmt.Sprintf("%s/v5/subscriptions?_page=1&_limit=100000", global.Conf.Mqtt.HttpBroker) - } else { + default: return nil, errors.New("invalid infoType") } req, err := http.NewRequest(http.MethodGet, url, nil) - if nil != err { + if err != nil { return nil, err } req.SetBasicAuth(global.Conf.Mqtt.Username, global.Conf.Mqtt.Password) @@ -38,31 +50,29 @@ func GetEmqInfo(infoType string) ([]map[string]interface{}, error) { if err != nil { return nil, err } - defer resp.Body.Close() + body, err := ioutil.ReadAll(resp.Body) if err != nil { return nil, err } global.Log.Debug("receive resp, ", string(body)) - if resp.StatusCode != 200 { + if resp.StatusCode != http.StatusOK { return nil, errors.New(resp.Status) } - var result interface{} - if err = json.Unmarshal(body, &result); nil != err { + var result map[string]interface{} + if err := json.Unmarshal(body, &result); err != nil { global.Log.Error("body Unmarshal error", err) return nil, err } - res, ok := result.(map[string]interface{}) + data, ok := result["data"].([]map[string]interface{}) if !ok { return nil, errors.New("result error") } - data := res["data"].([]map[string]interface{}) return data, nil } -// Publish 推送信息 func Publish(topic, clientId string, payload interface{}) error { if clientId == "" { return errors.New("未获取到MQTT连接") @@ -77,12 +87,12 @@ func Publish(topic, clientId string, payload interface{}) error { "clientid": clientId, } data, err := json.Marshal(pubData) - if nil != err { + if err != nil { global.Log.Error("error ", err) return err } req, err := http.NewRequest(http.MethodPost, url, bytes.NewBuffer(data)) - if nil != err { + if err != nil { return err } req.SetBasicAuth(global.Conf.Mqtt.Username, global.Conf.Mqtt.Password) @@ -94,13 +104,14 @@ func Publish(topic, clientId string, payload interface{}) error { return err } defer resp.Body.Close() + body, err := ioutil.ReadAll(resp.Body) if err != nil { global.Log.Error("error ReadAll", err) return err } global.Log.Debug("receive resp, ", string(body)) - if resp.StatusCode != 200 { + if resp.StatusCode != http.StatusOK { global.Log.Error("bad status ", resp.StatusCode) return errors.New(resp.Status) } diff --git a/iothub/server/emqxserver/hook.go b/iothub/server/emqxserver/hook.go index 43187b8..6f1fdf4 100644 --- a/iothub/server/emqxserver/hook.go +++ b/iothub/server/emqxserver/hook.go @@ -87,7 +87,7 @@ func (s *HookGrpcService) OnClientConnected(ctx context.Context, in *exhook2.Cli return nil, err } //添加连接ID - mqttclient.MqttClient.Store(etoken.DeviceId, in.Clientinfo.Clientid) + mqttclient.Session.Store(etoken.DeviceId, in.Clientinfo.Clientid) data := netbase.CreateConnectionInfo(message.ConnectMes, "mqtt", in.Clientinfo.Clientid, in.Clientinfo.Peerhost, etoken) go s.HookService.Queue.Queue(data) return &exhook2.EmptySuccess{}, nil @@ -102,7 +102,7 @@ func (s *HookGrpcService) OnClientDisconnected(ctx context.Context, in *exhook2. return nil, err } //删除连接ID - mqttclient.MqttClient.Delete(etoken.DeviceId) + mqttclient.Session.Delete(etoken.DeviceId) data := netbase.CreateConnectionInfo(message.DisConnectMes, "mqtt", in.Clientinfo.Clientid, in.Clientinfo.Peerhost, etoken) go s.HookService.Queue.Queue(data) return &exhook2.EmptySuccess{}, nil