mqttclient.go 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129
  1. package mqtt
  2. import (
  3. "context"
  4. "fmt"
  5. "smartIntersection_edge/util/logger"
  6. "sync"
  7. "time"
  8. )
  9. type BaseMqttOnline interface {
  10. GetOnlineMsg() (string, string)
  11. GetWillMsg() (string, string)
  12. }
  13. type EmptyMqttOnline struct {
  14. }
  15. func (o *EmptyMqttOnline) GetOnlineMsg() (string, string) {
  16. return "", ""
  17. }
  18. func (o *EmptyMqttOnline) GetWillMsg() (string, string) {
  19. return "", ""
  20. }
  21. type MClient struct {
  22. mqtt *Client
  23. mu sync.Mutex //保护mapTopics
  24. mapTopics map[string]QOS //订阅的主题
  25. timeout uint //超时时间,毫秒为单位
  26. MqttOnline BaseMqttOnline //是否发布上线消息&遗嘱消息
  27. }
  28. func NewMqttClient(server, clientId, user, password string, timeout uint, mqttOnline BaseMqttOnline) *MClient {
  29. o := MClient{
  30. mapTopics: make(map[string]QOS),
  31. timeout: timeout,
  32. MqttOnline: mqttOnline,
  33. }
  34. client, err := NewClient(ClientOptions{
  35. Servers: []string{server},
  36. ClientID: clientId,
  37. Username: user,
  38. Password: password,
  39. AutoReconnect: true,
  40. }, &o)
  41. if err != nil {
  42. panic(fmt.Sprintf("MQTT错误: %s", err.Error()))
  43. return nil
  44. }
  45. o.mqtt = client
  46. err = client.Connect(o.Ctx())
  47. if err != nil {
  48. logger.Logger.Errorln("MClient.NewMqttClient: Connect err,原因:", err)
  49. return nil
  50. }
  51. return &o
  52. }
  53. func (o *MClient) ConnectionLostHandler(err error) {
  54. logger.Logger.Errorln("MClient.ConnectionLostHandler:MQTT连接已经断开,原因:", err)
  55. }
  56. func (o *MClient) OnConnectHandler() {
  57. logger.Logger.Infoln("MClient.OnConnectHandler:MQTT连接成功")
  58. //连接成功则订阅主题
  59. for k, v := range o.mapTopics {
  60. err := o.Subscribe(k, v)
  61. if err != nil {
  62. return
  63. }
  64. }
  65. topic, str := o.MqttOnline.GetOnlineMsg()
  66. if topic != "" {
  67. err := o.PublishString(topic, str, 0)
  68. if err != nil {
  69. return
  70. }
  71. }
  72. }
  73. func (o *MClient) GetWill() (topic string, payload string) {
  74. return o.MqttOnline.GetWillMsg()
  75. }
  76. func (o *MClient) Connect() error {
  77. return o.mqtt.Connect(o.Ctx())
  78. }
  79. func (o *MClient) IsConnected() bool {
  80. return o.mqtt.IsConnected()
  81. }
  82. func (o *MClient) Publish(topic string, payload interface{}, qos QOS) error {
  83. return o.mqtt.Publish(o.Ctx(), topic, payload, qos)
  84. }
  85. func (o *MClient) PublishString(topic string, payload string, qos QOS) error {
  86. return o.mqtt.PublishString(o.Ctx(), topic, payload, qos)
  87. }
  88. func (o *MClient) PublishJSON(topic string, payload interface{}, qos QOS) error {
  89. return o.mqtt.PublishJSON(o.Ctx(), topic, payload, qos)
  90. }
  91. func (o *MClient) Subscribe(topic string, qos QOS) error {
  92. o.mu.Lock()
  93. defer o.mu.Unlock()
  94. if _, ok := o.mapTopics[topic]; !ok {
  95. o.mapTopics[topic] = qos
  96. }
  97. return o.mqtt.Subscribe(o.Ctx(), topic, qos)
  98. }
  99. func (o *MClient) Unsubscribe(topic string) error {
  100. o.mu.Lock()
  101. defer o.mu.Unlock()
  102. if _, ok := o.mapTopics[topic]; ok {
  103. delete(o.mapTopics, topic)
  104. }
  105. return o.mqtt.Unsubscribe(o.Ctx(), topic)
  106. }
  107. func (o *MClient) Handle(topic string, handler MessageHandler) Route {
  108. return o.mqtt.Handle(topic, handler)
  109. }
  110. func (o *MClient) Ctx() context.Context {
  111. ctx, _ := context.WithTimeout(context.Background(), time.Millisecond*time.Duration(o.timeout))
  112. return ctx
  113. }