mqttclient.go 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125
  1. package main
  2. import (
  3. "context"
  4. "fmt"
  5. "sync"
  6. "time"
  7. "github.com/sirupsen/logrus"
  8. "lc/common/mqtt"
  9. )
  10. type BaseMqttOnline interface {
  11. GetOnlineMsg() (string, string)
  12. GetWillMsg() (string, string)
  13. }
  14. type EmptyMqttOnline struct {
  15. }
  16. func (o *EmptyMqttOnline) GetOnlineMsg() (string, string) {
  17. return "", ""
  18. }
  19. func (o *EmptyMqttOnline) GetWillMsg() (string, string) {
  20. return "", ""
  21. }
  22. type MqttClient struct {
  23. mqtt *mqtt.Client //
  24. mu sync.Mutex //保护mapTopics
  25. mapTopics map[string]mqtt.QOS //订阅的主题
  26. timeout uint //超时时间,毫秒为单位
  27. MqttOnline BaseMqttOnline //是否发布上线消息&遗嘱消息
  28. }
  29. func NewMqttClient(server, clientid, user, password string, timeout uint, mqttOnline BaseMqttOnline) *MqttClient {
  30. o := MqttClient{
  31. mapTopics: make(map[string]mqtt.QOS),
  32. timeout: timeout,
  33. MqttOnline: mqttOnline,
  34. }
  35. client, err := mqtt.NewClient(mqtt.ClientOptions{
  36. Servers: []string{server},
  37. ClientID: clientid,
  38. Username: user,
  39. Password: password,
  40. AutoReconnect: true,
  41. }, &o)
  42. if err != nil {
  43. panic(fmt.Sprintln("MQTT错误:", err.Error()))
  44. return nil
  45. }
  46. o.mqtt = client
  47. err = client.Connect(o.Ctx())
  48. return &o
  49. }
  50. func (o *MqttClient) ConnectionLostHandler(err error) {
  51. logrus.Errorln("MqttClient.ConnectionLostHandler:MQTT连接已经断开,原因:", err)
  52. }
  53. func (o *MqttClient) OnConnectHandler() {
  54. logrus.Infoln("MqttClient.OnConnectHandler:MQTT连接成功")
  55. //连接成功则订阅主题
  56. for k, v := range o.mapTopics {
  57. o.Subscribe(k, v)
  58. }
  59. topic, str := o.MqttOnline.GetOnlineMsg()
  60. if topic != "" {
  61. o.PublishString(topic, str, 0)
  62. }
  63. }
  64. func (o *MqttClient) GetWill() (topic string, payload string) {
  65. return o.MqttOnline.GetWillMsg()
  66. }
  67. func (o *MqttClient) Connect() error {
  68. if !o.mqtt.IsConnected() {
  69. return o.mqtt.Connect(o.Ctx())
  70. }
  71. return nil
  72. }
  73. func (o *MqttClient) IsConnected() bool {
  74. return o.mqtt.IsConnected()
  75. }
  76. func (o *MqttClient) Publish(topic string, payload []byte, qos mqtt.QOS) error {
  77. return o.mqtt.Publish(o.Ctx(), topic, payload, qos)
  78. }
  79. func (o *MqttClient) PublishString(topic string, payload string, qos mqtt.QOS) error {
  80. return o.mqtt.PublishString(o.Ctx(), topic, payload, qos)
  81. }
  82. func (o *MqttClient) PublishJSON(topic string, payload interface{}, qos mqtt.QOS) error {
  83. return o.mqtt.PublishJSON(o.Ctx(), topic, payload, qos)
  84. }
  85. func (o *MqttClient) Subscribe(topic string, qos mqtt.QOS) error {
  86. o.mu.Lock()
  87. defer o.mu.Unlock()
  88. if _, ok := o.mapTopics[topic]; !ok {
  89. o.mapTopics[topic] = qos
  90. }
  91. return o.mqtt.Subscribe(o.Ctx(), topic, qos)
  92. }
  93. func (o *MqttClient) Unsubscribe(topic string) error {
  94. o.mu.Lock()
  95. defer o.mu.Unlock()
  96. if _, ok := o.mapTopics[topic]; ok {
  97. delete(o.mapTopics, topic)
  98. }
  99. return o.mqtt.Unsubscribe(o.Ctx(), topic)
  100. }
  101. func (o *MqttClient) Handle(topic string, handler mqtt.MessageHandler) mqtt.Route {
  102. return o.mqtt.Handle(topic, handler)
  103. }
  104. func (o *MqttClient) Ctx() context.Context {
  105. ctx, _ := context.WithTimeout(context.Background(), time.Millisecond*time.Duration(o.timeout))
  106. return ctx
  107. }