mqtt.go 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118
  1. package admin
  2. import (
  3. "errors"
  4. "fmt"
  5. "regexp"
  6. "runtime"
  7. "server/utils/logger"
  8. "server/utils/mqtt"
  9. "server/utils/protocol"
  10. "strings"
  11. "sync"
  12. "time"
  13. )
  14. func InitMqtt() {
  15. MqttService = GetHandler()
  16. MqttService.SubscribeTopics()
  17. go MqttService.Handler()
  18. }
  19. var MqttService *MqttHandler
  20. var timeoutReg = regexp.MustCompile("Client .* has exceeded timeout")
  21. var connectReg = regexp.MustCompile(`New client connected from .* as .*\(`)
  22. var disconnectReg = regexp.MustCompile("Client mqttx_893e4b7d disconnected")
  23. type MqttHandler struct {
  24. queue *mqtt.MlQueue
  25. }
  26. var _handlerOnce sync.Once
  27. var _handlerSingle *MqttHandler
  28. func GetHandler() *MqttHandler {
  29. _handlerOnce.Do(func() {
  30. _handlerSingle = &MqttHandler{
  31. queue: mqtt.NewQueue(10000),
  32. }
  33. })
  34. return _handlerSingle
  35. }
  36. func (o *MqttHandler) SubscribeTopics() {
  37. mqtt.GetMQTTMgr().Subscribe("mini/#", mqtt.AtLeastOnce, o.HandlerData)
  38. mqtt.GetMQTTMgr().Subscribe("$SYS/broker/log/N", mqtt.AtLeastOnce, o.HandlerData)
  39. }
  40. func (o *MqttHandler) HandlerData(m mqtt.Message) {
  41. for {
  42. ok, cnt := o.queue.Put(&m)
  43. if ok {
  44. break
  45. } else {
  46. logger.Get().Error("HandlerData:查询队列失败,队列消息数量:%d", cnt)
  47. runtime.Gosched()
  48. }
  49. }
  50. }
  51. func (o *MqttHandler) Handler() interface{} {
  52. defer func() {
  53. if err := recover(); err != nil {
  54. go GetHandler().Handler()
  55. //logger.Logger.Errorf("MqttHandler.Handler:发生异常:%s", string(debug.Stack()))
  56. }
  57. }()
  58. for {
  59. msg, ok, quantity := o.queue.Get()
  60. if !ok {
  61. time.Sleep(10 * time.Millisecond)
  62. continue
  63. } else if quantity > 1000 {
  64. logger.Get().Warnf("数据队列累积过多,请注意优化,当前队列条数:%d", quantity)
  65. }
  66. m, ok := msg.(*mqtt.Message)
  67. if !ok {
  68. continue
  69. }
  70. _, topic, err := parseTopic(m.Topic())
  71. if err != nil {
  72. //logger.Logger.Errorf("parseTopic err")
  73. continue
  74. }
  75. switch topic {
  76. case protocol.TopicSwitchControlAck:
  77. case protocol.TopicSwitchControlDg:
  78. case protocol.TopicIlluminance:
  79. logger.Get().Debugf("MqttHandler Illuminance = %s", m.PayloadString())
  80. //TODO: 更新光照
  81. //protocol.SetCurrentIlluminance(m.PayloadString())
  82. }
  83. }
  84. }
  85. func (o *MqttHandler) Publish(topic string, data interface{}) error {
  86. return mqtt.GetMQTTMgr().Publish(topic, data, mqtt.AtLeastOnce)
  87. }
  88. func (o *MqttHandler) GetTopic(deviceSn, protocol string) string {
  89. return fmt.Sprintf("mini/%s/%s", deviceSn, protocol)
  90. }
  91. // parseTopic 获取设备SN, topic
  92. // "mini/*****/switch_control/ack"
  93. func parseTopic(topic string) (string, string, error) {
  94. strList := strings.Split(topic, "/")
  95. if len(strList) < 4 {
  96. return "", "", errors.New("不支持的topic")
  97. }
  98. topic = strings.Join(strList[2:], "/")
  99. return strList[1], topic, nil
  100. }