mqtthandlerHL.go 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118
  1. package controllers
  2. import (
  3. "context"
  4. "sync"
  5. "time"
  6. "github.com/astaxie/beego"
  7. "lc/common/mqtt"
  8. )
  9. //海蓝灯控的mqtt
  10. var _mqtthandleroncehl sync.Once
  11. var _mqtthandlersinglehl *MqttHandler
  12. func GetMqttHandlerHL() *MqttHandler {
  13. _mqtthandleroncehl.Do(func() {
  14. _mqtthandlersinglehl = NewMqttHandler(beego.AppConfig.String("hlmqtt.server"), beego.AppConfig.String("hlmqtt.user"),
  15. beego.AppConfig.String("hlmqtt.password"), 3000)
  16. })
  17. return _mqtthandlersinglehl
  18. }
  19. type MqttHandlerhl struct {
  20. mqtt *mqtt.Client
  21. mu sync.Mutex
  22. mapTopics map[string]mqtt.QOS
  23. Timeout uint //超时时间,毫秒为单位
  24. }
  25. func NewMqttHandlerhl(server, user, password string, timeout uint) *MqttHandlerhl {
  26. o := MqttHandlerhl{
  27. mapTopics: make(map[string]mqtt.QOS),
  28. Timeout: timeout,
  29. }
  30. client, err := mqtt.NewClient(mqtt.ClientOptions{
  31. Servers: []string{server},
  32. ClientID: beego.AppConfig.String("mqtt.id"),
  33. Username: user,
  34. Password: password,
  35. AutoReconnect: true,
  36. }, &o)
  37. if client == nil || err != nil {
  38. beego.Error("未配置 MQTT Server")
  39. panic("未配置 MQTT Server")
  40. return nil
  41. }
  42. o.mqtt = client
  43. err = client.Connect(o.Ctx())
  44. return &o
  45. }
  46. func (o *MqttHandlerhl) ConnectionLostHandler(err error) {
  47. beego.Error("MqttHandlerhl.ConnectionLostHandler:MQTT连接已经断开,原因:", err)
  48. }
  49. func (o *MqttHandlerhl) OnConnectHandler() {
  50. beego.Debug("MqttHandlerhl.OnConnectHandler:MQTT连接成功")
  51. //连接成功则订阅主题
  52. for k, v := range o.mapTopics {
  53. o.Subscribe(k, v)
  54. }
  55. }
  56. func (o *MqttHandlerhl) GetWill() (string, string) {
  57. return "", ""
  58. }
  59. func (o *MqttHandlerhl) Connect() error {
  60. if !o.mqtt.IsConnected() {
  61. return o.mqtt.Connect(o.Ctx())
  62. }
  63. return nil
  64. }
  65. func (o *MqttHandlerhl) IsConnected() bool {
  66. return o.mqtt.IsConnected()
  67. }
  68. func (o *MqttHandlerhl) Publish(topic string, payload []byte, qos mqtt.QOS) error {
  69. return o.mqtt.Publish(o.Ctx(), topic, payload, qos)
  70. }
  71. func (o *MqttHandlerhl) PublishString(topic string, payload string, qos mqtt.QOS) error {
  72. return o.mqtt.PublishString(o.Ctx(), topic, payload, qos)
  73. }
  74. func (o *MqttHandlerhl) PublishJSON(topic string, payload interface{}, qos mqtt.QOS) error {
  75. return o.mqtt.PublishJSON(o.Ctx(), topic, payload, qos)
  76. }
  77. func (o *MqttHandlerhl) Subscribe(topic string, qos mqtt.QOS) error {
  78. o.mu.Lock()
  79. defer o.mu.Unlock()
  80. if _, ok := o.mapTopics[topic]; !ok {
  81. o.mapTopics[topic] = qos
  82. }
  83. return o.mqtt.Subscribe(o.Ctx(), topic, qos)
  84. }
  85. func (o *MqttHandlerhl) Unsubscribe(ctx context.Context, topic string) error {
  86. o.mu.Lock()
  87. defer o.mu.Unlock()
  88. if _, ok := o.mapTopics[topic]; ok {
  89. delete(o.mapTopics, topic)
  90. }
  91. return o.mqtt.Unsubscribe(o.Ctx(), topic)
  92. }
  93. func (o *MqttHandlerhl) Handle(topic string, handler mqtt.MessageHandler) mqtt.Route {
  94. return o.mqtt.Handle(topic, handler)
  95. }
  96. func (o *MqttHandlerhl) Ctx() context.Context {
  97. ctx, _ := context.WithTimeout(context.Background(), time.Millisecond*time.Duration(o.Timeout))
  98. return ctx
  99. }