mqttclient.go 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128
  1. package main
  2. import (
  3. "context"
  4. "fmt"
  5. "sync"
  6. "time"
  7. "lc/common/mqtt"
  8. "github.com/sirupsen/logrus"
  9. )
  10. type BaseMqttOnline interface {
  11. GetOnlineMsg() (string, string)
  12. GetWillMsg() (string, string)
  13. }
  14. type EmptyMqttOnline struct {
  15. }
  16. func (o *EmptyMqttOnline) GetOnlineMsg() (string, string) {
  17. return "", ""
  18. }
  19. func (o *EmptyMqttOnline) GetWillMsg() (string, string) {
  20. return "", ""
  21. }
  22. type MqttClient struct {
  23. mqtt *mqtt.Client //
  24. mu sync.Mutex //保护mapTopics
  25. mapTopics map[string]mqtt.QOS //订阅的主题
  26. timeout uint //超时时间,毫秒为单位
  27. MqttOnline BaseMqttOnline //是否发布上线消息&遗嘱消息
  28. }
  29. func NewMqttClient(server, clientid, user, password string, timeout uint, mqttOnline BaseMqttOnline) *MqttClient {
  30. o := MqttClient{
  31. mapTopics: make(map[string]mqtt.QOS),
  32. timeout: timeout,
  33. MqttOnline: mqttOnline,
  34. }
  35. client, err := mqtt.NewClient(mqtt.ClientOptions{
  36. Servers: []string{server},
  37. ClientID: clientid,
  38. Username: user,
  39. Password: password,
  40. AutoReconnect: true,
  41. }, &o)
  42. if err != nil {
  43. panic(fmt.Sprintf("MQTT错误:", err.Error()))
  44. return nil
  45. }
  46. o.mqtt = client
  47. err = client.Connect(o.Ctx())
  48. return &o
  49. }
  50. func (o *MqttClient) ConnectionLostHandler(err error) {
  51. logrus.Errorln("MqttClient.ConnectionLostHandler:MQTT连接已经断开,原因:", err)
  52. }
  53. func (o *MqttClient) OnConnectHandler() {
  54. logrus.Infoln("MqttClient.OnConnectHandler:MQTT连接成功")
  55. //连接成功则订阅主题
  56. for k, v := range o.mapTopics {
  57. err := o.Subscribe(k, v)
  58. if err != nil {
  59. return
  60. }
  61. }
  62. topic, str := o.MqttOnline.GetOnlineMsg()
  63. if topic != "" {
  64. err := o.PublishString(topic, str, 0)
  65. if err != nil {
  66. return
  67. }
  68. }
  69. }
  70. func (o *MqttClient) GetWill() (topic string, payload string) {
  71. return o.MqttOnline.GetWillMsg()
  72. }
  73. func (o *MqttClient) Connect() error {
  74. return o.mqtt.Connect(o.Ctx())
  75. }
  76. func (o *MqttClient) IsConnected() bool {
  77. return o.mqtt.IsConnected()
  78. }
  79. func (o *MqttClient) Publish(topic string, payload []byte, qos mqtt.QOS) error {
  80. return o.mqtt.Publish(o.Ctx(), topic, payload, qos)
  81. }
  82. func (o *MqttClient) PublishString(topic string, payload string, qos mqtt.QOS) error {
  83. return o.mqtt.PublishString(o.Ctx(), topic, payload, qos)
  84. }
  85. func (o *MqttClient) PublishJSON(topic string, payload interface{}, qos mqtt.QOS) error {
  86. return o.mqtt.PublishJSON(o.Ctx(), topic, payload, qos)
  87. }
  88. func (o *MqttClient) Subscribe(topic string, qos mqtt.QOS) error {
  89. o.mu.Lock()
  90. defer o.mu.Unlock()
  91. if _, ok := o.mapTopics[topic]; !ok {
  92. o.mapTopics[topic] = qos
  93. }
  94. return o.mqtt.Subscribe(o.Ctx(), topic, qos)
  95. }
  96. func (o *MqttClient) Unsubscribe(topic string) error {
  97. o.mu.Lock()
  98. defer o.mu.Unlock()
  99. if _, ok := o.mapTopics[topic]; ok {
  100. delete(o.mapTopics, topic)
  101. }
  102. return o.mqtt.Unsubscribe(o.Ctx(), topic)
  103. }
  104. func (o *MqttClient) Handle(topic string, handler mqtt.MessageHandler) mqtt.Route {
  105. return o.mqtt.Handle(topic, handler)
  106. }
  107. func (o *MqttClient) Ctx() context.Context {
  108. ctx, _ := context.WithTimeout(context.Background(), time.Millisecond*time.Duration(o.timeout))
  109. return ctx
  110. }