mqttclient.go 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125
  1. package lc
  2. import (
  3. "context"
  4. "fmt"
  5. "lc-smartX/util/mqtt"
  6. "sync"
  7. "time"
  8. "github.com/sirupsen/logrus"
  9. )
  10. type BaseMqttOnline interface {
  11. GetOnlineMsg() (string, string)
  12. GetWillMsg() (string, string)
  13. }
  14. type EmptyMqttOnline struct {
  15. }
  16. func (o *EmptyMqttOnline) GetOnlineMsg() (string, string) {
  17. return "", ""
  18. }
  19. func (o *EmptyMqttOnline) GetWillMsg() (string, string) {
  20. return "", ""
  21. }
  22. type MqttClient struct {
  23. mqtt *mqtt.Client //
  24. mu sync.Mutex //保护mapTopics
  25. mapTopics map[string]mqtt.QOS //订阅的主题
  26. timeout uint //超时时间,毫秒为单位
  27. MqttOnline BaseMqttOnline //是否发布上线消息&遗嘱消息
  28. }
  29. func NewMqttClient(server, clientid, user, password string, timeout uint, mqttOnline BaseMqttOnline) *MqttClient {
  30. o := MqttClient{
  31. mapTopics: make(map[string]mqtt.QOS),
  32. timeout: timeout,
  33. MqttOnline: mqttOnline,
  34. }
  35. client, err := mqtt.NewClient(mqtt.ClientOptions{
  36. Servers: []string{server},
  37. ClientID: clientid,
  38. Username: user,
  39. Password: password,
  40. AutoReconnect: true,
  41. }, &o)
  42. if err != nil {
  43. panic(fmt.Sprintf("MQTT错误:", err.Error()))
  44. return nil
  45. }
  46. o.mqtt = client
  47. err = client.Connect(o.Ctx())
  48. return &o
  49. }
  50. func (o *MqttClient) ConnectionLostHandler(err error) {
  51. logrus.Errorln("MqttClient.ConnectionLostHandler:MQTT连接已经断开,原因:", err)
  52. }
  53. func (o *MqttClient) OnConnectHandler() {
  54. logrus.Infoln("MqttClient.OnConnectHandler:MQTT连接成功")
  55. //连接成功则订阅主题
  56. for k, v := range o.mapTopics {
  57. o.Subscribe(k, v)
  58. }
  59. topic, str := o.MqttOnline.GetOnlineMsg()
  60. if topic != "" {
  61. o.PublishString(topic, str, 0)
  62. }
  63. o.Publish("000000/cltled/LED20230408/down/switch", []byte("路口new-54"), mqtt.AtMostOnce)
  64. }
  65. func (o *MqttClient) GetWill() (topic string, payload string) {
  66. return o.MqttOnline.GetWillMsg()
  67. }
  68. func (o *MqttClient) Connect() error {
  69. if !o.mqtt.IsConnected() {
  70. return o.mqtt.Connect(o.Ctx())
  71. }
  72. return nil
  73. }
  74. func (o *MqttClient) IsConnected() bool {
  75. return o.mqtt.IsConnected()
  76. }
  77. func (o *MqttClient) Publish(topic string, payload []byte, qos mqtt.QOS) error {
  78. return o.mqtt.Publish(o.Ctx(), topic, payload, qos)
  79. }
  80. func (o *MqttClient) PublishString(topic string, payload string, qos mqtt.QOS) error {
  81. return o.mqtt.PublishString(o.Ctx(), topic, payload, qos)
  82. }
  83. func (o *MqttClient) PublishJSON(topic string, payload interface{}, qos mqtt.QOS) error {
  84. return o.mqtt.PublishJSON(o.Ctx(), topic, payload, qos)
  85. }
  86. func (o *MqttClient) Subscribe(topic string, qos mqtt.QOS) error {
  87. o.mu.Lock()
  88. defer o.mu.Unlock()
  89. if _, ok := o.mapTopics[topic]; !ok {
  90. o.mapTopics[topic] = qos
  91. }
  92. return o.mqtt.Subscribe(o.Ctx(), topic, qos)
  93. }
  94. func (o *MqttClient) Unsubscribe(topic string) error {
  95. o.mu.Lock()
  96. defer o.mu.Unlock()
  97. if _, ok := o.mapTopics[topic]; ok {
  98. delete(o.mapTopics, topic)
  99. }
  100. return o.mqtt.Unsubscribe(o.Ctx(), topic)
  101. }
  102. func (o *MqttClient) Handle(topic string, handler mqtt.MessageHandler) mqtt.Route {
  103. return o.mqtt.Handle(topic, handler)
  104. }
  105. func (o *MqttClient) Ctx() context.Context {
  106. ctx, _ := context.WithTimeout(context.Background(), time.Millisecond*time.Duration(o.timeout))
  107. return ctx
  108. }