mqttclient.go 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121
  1. package main
  2. import (
  3. "context"
  4. "fmt"
  5. "sync"
  6. "time"
  7. "lc/common/mqtt"
  8. )
  9. type BaseMqttOnline interface {
  10. GetOnlineMsg() (string, string)
  11. GetWillMsg() (string, string)
  12. }
  13. type EmptyMqttOnline struct {
  14. }
  15. func (o *EmptyMqttOnline) GetOnlineMsg() (string, string) {
  16. return "", ""
  17. }
  18. func (o *EmptyMqttOnline) GetWillMsg() (string, string) {
  19. return "", ""
  20. }
  21. type MqttClient struct {
  22. mqtt *mqtt.Client //
  23. mu sync.Mutex //保护mapTopics
  24. mapTopics map[string]mqtt.QOS //订阅的主题
  25. timeout uint //超时时间,毫秒为单位
  26. MqttOnline BaseMqttOnline //是否发布上线消息&遗嘱消息
  27. }
  28. func NewMqttClient(server, clientid, user, password string, timeout uint, mqttOnline BaseMqttOnline) *MqttClient {
  29. o := MqttClient{
  30. mapTopics: make(map[string]mqtt.QOS),
  31. timeout: timeout,
  32. MqttOnline: mqttOnline,
  33. }
  34. client, err := mqtt.NewClient(mqtt.ClientOptions{
  35. Servers: []string{server},
  36. ClientID: clientid,
  37. Username: user,
  38. Password: password,
  39. AutoReconnect: true,
  40. }, &o)
  41. if err != nil {
  42. panic(fmt.Sprintf("MQTT错误:", err.Error()))
  43. return nil
  44. }
  45. o.mqtt = client
  46. err = client.Connect(o.Ctx())
  47. return &o
  48. }
  49. func (o *MqttClient) ConnectionLostHandler(err error) {
  50. }
  51. func (o *MqttClient) OnConnectHandler() {
  52. //连接成功则订阅主题
  53. for k, v := range o.mapTopics {
  54. o.Subscribe(k, v)
  55. }
  56. topic, str := o.MqttOnline.GetOnlineMsg()
  57. if topic != "" {
  58. o.PublishString(topic, str, 0)
  59. }
  60. }
  61. func (o *MqttClient) GetWill() (topic string, payload string) {
  62. return o.MqttOnline.GetWillMsg()
  63. }
  64. func (o *MqttClient) Connect() error {
  65. if !o.mqtt.IsConnected() {
  66. return o.mqtt.Connect(o.Ctx())
  67. }
  68. return nil
  69. }
  70. func (o *MqttClient) IsConnected() bool {
  71. return o.mqtt.IsConnected()
  72. }
  73. func (o *MqttClient) Publish(topic string, payload []byte, qos mqtt.QOS) error {
  74. return o.mqtt.Publish(o.Ctx(), topic, payload, qos)
  75. }
  76. func (o *MqttClient) PublishString(topic string, payload string, qos mqtt.QOS) error {
  77. return o.mqtt.PublishString(o.Ctx(), topic, payload, qos)
  78. }
  79. func (o *MqttClient) PublishJSON(topic string, payload interface{}, qos mqtt.QOS) error {
  80. return o.mqtt.PublishJSON(o.Ctx(), topic, payload, qos)
  81. }
  82. func (o *MqttClient) Subscribe(topic string, qos mqtt.QOS) error {
  83. o.mu.Lock()
  84. defer o.mu.Unlock()
  85. if _, ok := o.mapTopics[topic]; !ok {
  86. o.mapTopics[topic] = qos
  87. }
  88. return o.mqtt.Subscribe(o.Ctx(), topic, qos)
  89. }
  90. func (o *MqttClient) Unsubscribe(topic string) error {
  91. o.mu.Lock()
  92. defer o.mu.Unlock()
  93. if _, ok := o.mapTopics[topic]; ok {
  94. delete(o.mapTopics, topic)
  95. }
  96. return o.mqtt.Unsubscribe(o.Ctx(), topic)
  97. }
  98. func (o *MqttClient) Handle(topic string, handler mqtt.MessageHandler) mqtt.Route {
  99. return o.mqtt.Handle(topic, handler)
  100. }
  101. func (o *MqttClient) Ctx() context.Context {
  102. ctx, _ := context.WithTimeout(context.Background(), time.Millisecond*time.Duration(o.timeout))
  103. return ctx
  104. }