mqttclient.go 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125
  1. package mqtt
  2. import (
  3. "context"
  4. "fmt"
  5. "server/utils/logger"
  6. "sync"
  7. "time"
  8. )
  9. type BaseMqttOnline interface {
  10. GetOnlineMsg() (string, string)
  11. GetWillMsg() (string, string)
  12. }
  13. type EmptyMqttOnline struct {
  14. }
  15. func (o *EmptyMqttOnline) GetOnlineMsg() (string, string) {
  16. return "", ""
  17. }
  18. func (o *EmptyMqttOnline) GetWillMsg() (string, string) {
  19. return "", ""
  20. }
  21. type MClient struct {
  22. mqtt *Client
  23. mu sync.Mutex //保护mapTopics
  24. mapTopics map[string]QOS //订阅的主题
  25. timeout uint //超时时间,毫秒为单位
  26. MqttOnline BaseMqttOnline //是否发布上线消息&遗嘱消息
  27. }
  28. func NewMqttClient(server, clientId, user, password string, timeout uint, mqttOnline BaseMqttOnline) *MClient {
  29. o := MClient{
  30. mapTopics: make(map[string]QOS),
  31. timeout: timeout,
  32. MqttOnline: mqttOnline,
  33. }
  34. client, err := NewClient(ClientOptions{
  35. Servers: []string{server},
  36. ClientID: clientId,
  37. Username: user,
  38. Password: password,
  39. AutoReconnect: true,
  40. }, &o)
  41. if err != nil {
  42. panic(fmt.Sprintf("MQTT错误: %s", err.Error()))
  43. return nil
  44. }
  45. o.mqtt = client
  46. err = client.Connect(o.Ctx())
  47. return &o
  48. }
  49. func (o *MClient) ConnectionLostHandler(err error) {
  50. logger.Get().Errorf("MClient.ConnectionLostHandler:MQTT连接已经断开,原因:%s", err)
  51. }
  52. func (o *MClient) OnConnectHandler() {
  53. logger.Get().Infoln("MClient.OnConnectHandler:MQTT连接成功")
  54. //连接成功则订阅主题
  55. for k, v := range o.mapTopics {
  56. err := o.Subscribe(k, v)
  57. if err != nil {
  58. return
  59. }
  60. }
  61. topic, str := o.MqttOnline.GetOnlineMsg()
  62. if topic != "" {
  63. err := o.PublishString(topic, str, 0)
  64. if err != nil {
  65. return
  66. }
  67. }
  68. }
  69. func (o *MClient) GetWill() (topic string, payload string) {
  70. return o.MqttOnline.GetWillMsg()
  71. }
  72. func (o *MClient) Connect() error {
  73. return o.mqtt.Connect(o.Ctx())
  74. }
  75. func (o *MClient) IsConnected() bool {
  76. return o.mqtt.IsConnected()
  77. }
  78. func (o *MClient) Publish(topic string, payload interface{}, qos QOS) error {
  79. return o.mqtt.Publish(o.Ctx(), topic, payload, qos)
  80. }
  81. func (o *MClient) PublishString(topic string, payload string, qos QOS) error {
  82. return o.mqtt.PublishString(o.Ctx(), topic, payload, qos)
  83. }
  84. func (o *MClient) PublishJSON(topic string, payload interface{}, qos QOS) error {
  85. return o.mqtt.PublishJSON(o.Ctx(), topic, payload, qos)
  86. }
  87. func (o *MClient) Subscribe(topic string, qos QOS) error {
  88. o.mu.Lock()
  89. defer o.mu.Unlock()
  90. if _, ok := o.mapTopics[topic]; !ok {
  91. o.mapTopics[topic] = qos
  92. }
  93. return o.mqtt.Subscribe(o.Ctx(), topic, qos)
  94. }
  95. func (o *MClient) Unsubscribe(topic string) error {
  96. o.mu.Lock()
  97. defer o.mu.Unlock()
  98. if _, ok := o.mapTopics[topic]; ok {
  99. delete(o.mapTopics, topic)
  100. }
  101. return o.mqtt.Unsubscribe(o.Ctx(), topic)
  102. }
  103. func (o *MClient) Handle(topic string, handler MessageHandler) Route {
  104. return o.mqtt.Handle(topic, handler)
  105. }
  106. func (o *MClient) Ctx() context.Context {
  107. ctx, _ := context.WithTimeout(context.Background(), time.Millisecond*time.Duration(o.timeout))
  108. return ctx
  109. }