mqtthandler.go 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117
  1. package controllers
  2. import (
  3. "context"
  4. "sync"
  5. "time"
  6. "github.com/astaxie/beego"
  7. "lc/common/mqtt"
  8. )
  9. var _mqtthandleronce sync.Once
  10. var _mqtthandlersingle *MqttHandler
  11. func GetMqttHandler() *MqttHandler {
  12. _mqtthandleronce.Do(func() {
  13. _mqtthandlersingle = NewMqttHandler(beego.AppConfig.String("mqtt.server"), beego.AppConfig.String("mqtt.user"),
  14. beego.AppConfig.String("mqtt.password"), 3000)
  15. })
  16. return _mqtthandlersingle
  17. }
  18. type MqttHandler struct {
  19. mqtt *mqtt.Client
  20. mu sync.Mutex
  21. mapTopics map[string]mqtt.QOS
  22. Timeout uint //超时时间,毫秒为单位
  23. }
  24. func NewMqttHandler(server, user, password string, timeout uint) *MqttHandler {
  25. o := MqttHandler{
  26. mapTopics: make(map[string]mqtt.QOS),
  27. Timeout: timeout,
  28. }
  29. client, err := mqtt.NewClient(mqtt.ClientOptions{
  30. Servers: []string{server},
  31. ClientID: beego.AppConfig.String("mqtt.id"),
  32. Username: user,
  33. Password: password,
  34. AutoReconnect: true,
  35. }, &o)
  36. if client == nil || err != nil {
  37. beego.Error("未配置 MQTT Server")
  38. panic("未配置 MQTT Server")
  39. return nil
  40. }
  41. o.mqtt = client
  42. err = client.Connect(o.Ctx())
  43. return &o
  44. }
  45. func (o *MqttHandler) ConnectionLostHandler(err error) {
  46. beego.Error("MqttHandler.ConnectionLostHandler:MQTT连接已经断开,原因:", err.Error())
  47. }
  48. func (o *MqttHandler) OnConnectHandler() {
  49. beego.Debug("MqttHandler.OnConnectHandler:MQTT连接成功")
  50. //连接成功则订阅主题
  51. for k, v := range o.mapTopics {
  52. o.Subscribe(k, v)
  53. }
  54. }
  55. func (o *MqttHandler) GetWill() (string, string) {
  56. return "", ""
  57. }
  58. func (o *MqttHandler) Connect() error {
  59. if !o.mqtt.IsConnected() {
  60. return o.mqtt.Connect(o.Ctx())
  61. }
  62. return nil
  63. }
  64. func (o *MqttHandler) IsConnected() bool {
  65. return o.mqtt.IsConnected()
  66. }
  67. func (o *MqttHandler) Publish(topic string, payload []byte, qos mqtt.QOS) error {
  68. return o.mqtt.Publish(o.Ctx(), topic, payload, qos)
  69. }
  70. func (o *MqttHandler) PublishString(topic string, payload string, qos mqtt.QOS) error {
  71. return o.mqtt.PublishString(o.Ctx(), topic, payload, qos)
  72. }
  73. func (o *MqttHandler) PublishJSON(topic string, payload interface{}, qos mqtt.QOS) error {
  74. return o.mqtt.PublishJSON(o.Ctx(), topic, payload, qos)
  75. }
  76. func (o *MqttHandler) Subscribe(topic string, qos mqtt.QOS) error {
  77. o.mu.Lock()
  78. defer o.mu.Unlock()
  79. if _, ok := o.mapTopics[topic]; !ok {
  80. o.mapTopics[topic] = qos
  81. }
  82. return o.mqtt.Subscribe(o.Ctx(), topic, qos)
  83. }
  84. func (o *MqttHandler) Unsubscribe(ctx context.Context, topic string) error {
  85. o.mu.Lock()
  86. defer o.mu.Unlock()
  87. if _, ok := o.mapTopics[topic]; ok {
  88. delete(o.mapTopics, topic)
  89. }
  90. return o.mqtt.Unsubscribe(o.Ctx(), topic)
  91. }
  92. func (o *MqttHandler) Handle(topic string, handler mqtt.MessageHandler) mqtt.Route {
  93. return o.mqtt.Handle(topic, handler)
  94. }
  95. func (o *MqttHandler) Ctx() context.Context {
  96. ctx, _ := context.WithTimeout(context.Background(), time.Millisecond*time.Duration(o.Timeout))
  97. return ctx
  98. }