|
@@ -1,125 +0,0 @@
|
|
|
-package lc
|
|
|
-
|
|
|
-import (
|
|
|
- "context"
|
|
|
- "fmt"
|
|
|
- "lc-smartX/util/mqtt"
|
|
|
- "sync"
|
|
|
- "time"
|
|
|
-
|
|
|
- "github.com/sirupsen/logrus"
|
|
|
-)
|
|
|
-
|
|
|
-type BaseMqttOnline interface {
|
|
|
- GetOnlineMsg() (string, string)
|
|
|
- GetWillMsg() (string, string)
|
|
|
-}
|
|
|
-
|
|
|
-type EmptyMqttOnline struct {
|
|
|
-}
|
|
|
-
|
|
|
-func (o *EmptyMqttOnline) GetOnlineMsg() (string, string) {
|
|
|
- return "", ""
|
|
|
-}
|
|
|
-func (o *EmptyMqttOnline) GetWillMsg() (string, string) {
|
|
|
- return "", ""
|
|
|
-}
|
|
|
-
|
|
|
-type MqttClient struct {
|
|
|
- mqtt *mqtt.Client //
|
|
|
- mu sync.Mutex //保护mapTopics
|
|
|
- mapTopics map[string]mqtt.QOS //订阅的主题
|
|
|
- timeout uint //超时时间,毫秒为单位
|
|
|
- MqttOnline BaseMqttOnline //是否发布上线消息&遗嘱消息
|
|
|
-}
|
|
|
-
|
|
|
-func NewMqttClient(server, clientid, user, password string, timeout uint, mqttOnline BaseMqttOnline) *MqttClient {
|
|
|
- o := MqttClient{
|
|
|
- mapTopics: make(map[string]mqtt.QOS),
|
|
|
- timeout: timeout,
|
|
|
- MqttOnline: mqttOnline,
|
|
|
- }
|
|
|
- client, err := mqtt.NewClient(mqtt.ClientOptions{
|
|
|
- Servers: []string{server},
|
|
|
- ClientID: clientid,
|
|
|
- Username: user,
|
|
|
- Password: password,
|
|
|
- AutoReconnect: true,
|
|
|
- }, &o)
|
|
|
- if err != nil {
|
|
|
- panic(fmt.Sprintf("MQTT错误:", err.Error()))
|
|
|
- return nil
|
|
|
- }
|
|
|
- o.mqtt = client
|
|
|
- err = client.Connect(o.Ctx())
|
|
|
- return &o
|
|
|
-}
|
|
|
-
|
|
|
-func (o *MqttClient) ConnectionLostHandler(err error) {
|
|
|
- logrus.Errorln("MqttClient.ConnectionLostHandler:MQTT连接已经断开,原因:", err)
|
|
|
-}
|
|
|
-
|
|
|
-func (o *MqttClient) OnConnectHandler() {
|
|
|
- logrus.Infoln("MqttClient.OnConnectHandler:MQTT连接成功")
|
|
|
- //连接成功则订阅主题
|
|
|
- for k, v := range o.mapTopics {
|
|
|
- o.Subscribe(k, v)
|
|
|
- }
|
|
|
- topic, str := o.MqttOnline.GetOnlineMsg()
|
|
|
- if topic != "" {
|
|
|
- o.PublishString(topic, str, 0)
|
|
|
- }
|
|
|
- o.Publish("000000/cltled/LED20230408/down/switch", []byte("路口new-54"), mqtt.AtMostOnce)
|
|
|
-}
|
|
|
-
|
|
|
-func (o *MqttClient) GetWill() (topic string, payload string) {
|
|
|
- return o.MqttOnline.GetWillMsg()
|
|
|
-}
|
|
|
-
|
|
|
-func (o *MqttClient) Connect() error {
|
|
|
- if !o.mqtt.IsConnected() {
|
|
|
- return o.mqtt.Connect(o.Ctx())
|
|
|
- }
|
|
|
- return nil
|
|
|
-}
|
|
|
-
|
|
|
-func (o *MqttClient) IsConnected() bool {
|
|
|
- return o.mqtt.IsConnected()
|
|
|
-}
|
|
|
-
|
|
|
-func (o *MqttClient) Publish(topic string, payload []byte, qos mqtt.QOS) error {
|
|
|
- return o.mqtt.Publish(o.Ctx(), topic, payload, qos)
|
|
|
-}
|
|
|
-func (o *MqttClient) PublishString(topic string, payload string, qos mqtt.QOS) error {
|
|
|
- return o.mqtt.PublishString(o.Ctx(), topic, payload, qos)
|
|
|
-}
|
|
|
-func (o *MqttClient) PublishJSON(topic string, payload interface{}, qos mqtt.QOS) error {
|
|
|
- return o.mqtt.PublishJSON(o.Ctx(), topic, payload, qos)
|
|
|
-}
|
|
|
-
|
|
|
-func (o *MqttClient) Subscribe(topic string, qos mqtt.QOS) error {
|
|
|
- o.mu.Lock()
|
|
|
- defer o.mu.Unlock()
|
|
|
- if _, ok := o.mapTopics[topic]; !ok {
|
|
|
- o.mapTopics[topic] = qos
|
|
|
- }
|
|
|
- return o.mqtt.Subscribe(o.Ctx(), topic, qos)
|
|
|
-}
|
|
|
-
|
|
|
-func (o *MqttClient) Unsubscribe(topic string) error {
|
|
|
- o.mu.Lock()
|
|
|
- defer o.mu.Unlock()
|
|
|
- if _, ok := o.mapTopics[topic]; ok {
|
|
|
- delete(o.mapTopics, topic)
|
|
|
- }
|
|
|
- return o.mqtt.Unsubscribe(o.Ctx(), topic)
|
|
|
-}
|
|
|
-
|
|
|
-func (o *MqttClient) Handle(topic string, handler mqtt.MessageHandler) mqtt.Route {
|
|
|
- return o.mqtt.Handle(topic, handler)
|
|
|
-}
|
|
|
-
|
|
|
-func (o *MqttClient) Ctx() context.Context {
|
|
|
- ctx, _ := context.WithTimeout(context.Background(), time.Millisecond*time.Duration(o.timeout))
|
|
|
- return ctx
|
|
|
-}
|