mqtt.go 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142
  1. package admin
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "fmt"
  6. "regexp"
  7. "runtime"
  8. "server/dao"
  9. "server/utils/logger"
  10. "server/utils/mqtt"
  11. "server/utils/protocol"
  12. "strings"
  13. "sync"
  14. "time"
  15. )
  16. func InitMqtt() {
  17. MqttService = GetHandler()
  18. MqttService.SubscribeTopics()
  19. go MqttService.Handler()
  20. }
  21. var MqttService *MqttHandler
  22. var timeoutReg = regexp.MustCompile("Client .* has exceeded timeout")
  23. var connectReg = regexp.MustCompile(`New client connected from .* as .*\(`)
  24. var disconnectReg = regexp.MustCompile("Client mqttx_893e4b7d disconnected")
  25. type MqttHandler struct {
  26. queue *mqtt.MlQueue
  27. }
  28. var _handlerOnce sync.Once
  29. var _handlerSingle *MqttHandler
  30. func GetHandler() *MqttHandler {
  31. _handlerOnce.Do(func() {
  32. _handlerSingle = &MqttHandler{
  33. queue: mqtt.NewQueue(10000),
  34. }
  35. })
  36. return _handlerSingle
  37. }
  38. func (o *MqttHandler) SubscribeTopics() {
  39. mqtt.GetMQTTMgr().Subscribe("smart_tunnel/#", mqtt.AtLeastOnce, o.HandlerData)
  40. }
  41. func (o *MqttHandler) HandlerData(m mqtt.Message) {
  42. for {
  43. ok, cnt := o.queue.Put(&m)
  44. if ok {
  45. break
  46. } else {
  47. logger.Get().Error("HandlerData:查询队列失败,队列消息数量:%d", cnt)
  48. runtime.Gosched()
  49. }
  50. }
  51. }
  52. func (o *MqttHandler) Handler() interface{} {
  53. defer func() {
  54. if err := recover(); err != nil {
  55. go GetHandler().Handler()
  56. //logger.Logger.Errorf("MqttHandler.Handler:发生异常:%s", string(debug.Stack()))
  57. }
  58. }()
  59. for {
  60. msg, ok, quantity := o.queue.Get()
  61. if !ok {
  62. time.Sleep(10 * time.Millisecond)
  63. continue
  64. } else if quantity > 1000 {
  65. logger.Get().Warnf("数据队列累积过多,请注意优化,当前队列条数:%d", quantity)
  66. }
  67. m, ok := msg.(*mqtt.Message)
  68. if !ok {
  69. continue
  70. }
  71. sn, operate, err := ParseTopic(m.Topic())
  72. if err != nil {
  73. //logger.Logger.Errorf("parseTopic err")
  74. continue
  75. }
  76. switch operate {
  77. case protocol.TopicGatherDataEnv:
  78. var data dao.EnvData
  79. if err := json.Unmarshal(m.Payload(), &data); err != nil {
  80. logger.Get().Errorf("Error unmarshalling JSON: %v", err)
  81. continue
  82. }
  83. data.TunnelSn = sn
  84. if data.Sn == "" {
  85. continue
  86. }
  87. err = data.CreateEnvData()
  88. if err != nil {
  89. logger.Get().Errorf("sn:%s device:%s 新增环境信息失败: %v", sn, data.Sn, err)
  90. continue
  91. }
  92. case protocol.TopicGatherDataOpt:
  93. var data dao.OpticalData
  94. if err := json.Unmarshal(m.Payload(), &data); err != nil {
  95. logger.Get().Errorf("Error unmarshalling JSON: %v", err)
  96. continue
  97. }
  98. data.TunnelSn = sn
  99. if data.Sn == "" {
  100. continue
  101. }
  102. err = data.CreateOpticalData()
  103. if err != nil {
  104. logger.Get().Errorf("sn:%s device:%s 新增光感信息失败: %v", sn, data.Sn, err)
  105. continue
  106. }
  107. }
  108. }
  109. }
  110. func (o *MqttHandler) Publish(topic string, data interface{}) error {
  111. return mqtt.GetMQTTMgr().Publish(topic, data, mqtt.AtLeastOnce)
  112. }
  113. func (o *MqttHandler) GetTopic(sn, protocol string) string {
  114. return fmt.Sprintf("smart_tunnel/%s/%s", sn, protocol)
  115. }
  116. // ParseTopic 获取设备SN, topic
  117. // "mini/*****/switch_control/ack"
  118. func ParseTopic(topic string) (sn string, operate string, err error) {
  119. strList := strings.Split(topic, "/")
  120. sn = strList[1]
  121. operate = strList[2]
  122. if len(strList) != 3 {
  123. return "", "", errors.New("不支持的topic")
  124. }
  125. return
  126. }