package lc import ( "context" "fmt" "lc-smartX/util/mqtt" "sync" "time" "github.com/sirupsen/logrus" ) type BaseMqttOnline interface { GetOnlineMsg() (string, string) GetWillMsg() (string, string) } type EmptyMqttOnline struct { } func (o *EmptyMqttOnline) GetOnlineMsg() (string, string) { return "", "" } func (o *EmptyMqttOnline) GetWillMsg() (string, string) { return "", "" } type MqttClient struct { mqtt *mqtt.Client // mu sync.Mutex //保护mapTopics mapTopics map[string]mqtt.QOS //订阅的主题 timeout uint //超时时间,毫秒为单位 MqttOnline BaseMqttOnline //是否发布上线消息&遗嘱消息 } func NewMqttClient(server, clientid, user, password string, timeout uint, mqttOnline BaseMqttOnline) *MqttClient { o := MqttClient{ mapTopics: make(map[string]mqtt.QOS), timeout: timeout, MqttOnline: mqttOnline, } client, err := mqtt.NewClient(mqtt.ClientOptions{ Servers: []string{server}, ClientID: clientid, Username: user, Password: password, AutoReconnect: true, }, &o) if err != nil { panic(fmt.Sprintf("MQTT错误:", err.Error())) return nil } o.mqtt = client err = client.Connect(o.Ctx()) return &o } func (o *MqttClient) ConnectionLostHandler(err error) { logrus.Errorln("MqttClient.ConnectionLostHandler:MQTT连接已经断开,原因:", err) } func (o *MqttClient) OnConnectHandler() { logrus.Infoln("MqttClient.OnConnectHandler:MQTT连接成功") //连接成功则订阅主题 for k, v := range o.mapTopics { o.Subscribe(k, v) } topic, str := o.MqttOnline.GetOnlineMsg() if topic != "" { o.PublishString(topic, str, 0) } o.Publish("000000/cltled/LED20230408/down/switch", []byte("路口new-54"), mqtt.AtMostOnce) } func (o *MqttClient) GetWill() (topic string, payload string) { return o.MqttOnline.GetWillMsg() } func (o *MqttClient) Connect() error { if !o.mqtt.IsConnected() { return o.mqtt.Connect(o.Ctx()) } return nil } func (o *MqttClient) IsConnected() bool { return o.mqtt.IsConnected() } func (o *MqttClient) Publish(topic string, payload []byte, qos mqtt.QOS) error { return o.mqtt.Publish(o.Ctx(), topic, payload, qos) } func (o *MqttClient) PublishString(topic string, payload string, qos mqtt.QOS) error { return o.mqtt.PublishString(o.Ctx(), topic, payload, qos) } func (o *MqttClient) PublishJSON(topic string, payload interface{}, qos mqtt.QOS) error { return o.mqtt.PublishJSON(o.Ctx(), topic, payload, qos) } func (o *MqttClient) Subscribe(topic string, qos mqtt.QOS) error { o.mu.Lock() defer o.mu.Unlock() if _, ok := o.mapTopics[topic]; !ok { o.mapTopics[topic] = qos } return o.mqtt.Subscribe(o.Ctx(), topic, qos) } func (o *MqttClient) Unsubscribe(topic string) error { o.mu.Lock() defer o.mu.Unlock() if _, ok := o.mapTopics[topic]; ok { delete(o.mapTopics, topic) } return o.mqtt.Unsubscribe(o.Ctx(), topic) } func (o *MqttClient) Handle(topic string, handler mqtt.MessageHandler) mqtt.Route { return o.mqtt.Handle(topic, handler) } func (o *MqttClient) Ctx() context.Context { ctx, _ := context.WithTimeout(context.Background(), time.Millisecond*time.Duration(o.timeout)) return ctx }