Files
PandaX/pkg/mqtt/mqtt.go
2023-08-22 15:17:14 +08:00

78 lines
1.9 KiB
Go

package mqtt
import (
"errors"
"fmt"
mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/sirupsen/logrus"
"time"
)
const DefaultDownStreamClientId = `@panda.iothub.internal.clientId`
type IothubMqttClient struct {
Client mqtt.Client
}
var connectHandler mqtt.OnConnectHandler = func(client mqtt.Client) {
logrus.Infof("Connected")
}
var connectLostHandler mqtt.ConnectionLostHandler = func(client mqtt.Client, err error) {
logrus.Infof("Connect lost: %v", err)
}
func InitMqtt(broker, username, password string) *IothubMqttClient {
server := fmt.Sprintf("tcp://%s", broker)
client := GetMqttClinent(server, username, password)
return &IothubMqttClient{
Client: client,
}
}
func GetMqttClinent(server, username, password string) mqtt.Client {
opts := mqtt.NewClientOptions().AddBroker(server)
time.Now().Unix()
// 默认下行ID iothub会过滤掉改Id
opts.SetClientID(DefaultDownStreamClientId)
if username != "" {
opts.SetUsername(username)
}
if password != "" {
opts.SetPassword(password)
}
opts.OnConnect = connectHandler
opts.OnConnectionLost = connectLostHandler
client := mqtt.NewClient(opts)
if token := client.Connect(); token.Wait() && token.Error() != nil {
panic(token.Error())
}
return client
}
// 订阅
func (imc *IothubMqttClient) Sub(topic string, qos byte, handler mqtt.MessageHandler) {
if token := imc.Client.Subscribe(topic, qos, handler); token.Wait() && token.Error() != nil {
logrus.Infof(token.Error().Error())
}
}
// 取消订阅
func (imc *IothubMqttClient) UnSub(topic string) {
if token := imc.Client.Unsubscribe(topic); token.Wait() && token.Error() != nil {
logrus.Infof(token.Error().Error())
}
}
// 发布
func (imc *IothubMqttClient) Pub(topic string, qos byte, payload string) error {
token := imc.Client.Publish(topic, qos, false, payload)
if token.WaitTimeout(2*time.Second) == false {
return errors.New("推送消息超时")
}
if token.Error() != nil {
return token.Error()
}
return nil
}