mqtt.go 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138
  1. package devices
  2. import (
  3. "errors"
  4. "fmt"
  5. "regexp"
  6. "runtime"
  7. "runtime/debug"
  8. "server/dao"
  9. "server/global"
  10. "server/utils/mqtt"
  11. "server/utils/protocol"
  12. "strconv"
  13. "strings"
  14. "sync"
  15. "time"
  16. )
  17. func InitMqtt() {
  18. MqttService = GetHandler()
  19. MqttService.SubscribeTopics()
  20. go MqttService.Handler()
  21. }
  22. var MqttService *MqttHandler
  23. var timeoutReg = regexp.MustCompile("Client .* has exceeded timeout")
  24. var connectReg = regexp.MustCompile(`New client connected from .* as .*\(`)
  25. var disconnectReg = regexp.MustCompile("Client mqttx_893e4b7d disconnected")
  26. type MqttHandler struct {
  27. queue *mqtt.MlQueue
  28. }
  29. var _handlerOnce sync.Once
  30. var _handlerSingle *MqttHandler
  31. func GetHandler() *MqttHandler {
  32. _handlerOnce.Do(func() {
  33. _handlerSingle = &MqttHandler{
  34. queue: mqtt.NewQueue(10000),
  35. }
  36. })
  37. return _handlerSingle
  38. }
  39. func (o *MqttHandler) SubscribeTopics() {
  40. mqtt.GetMQTTMgr().Subscribe("smart_intersectionV2.0/#", mqtt.AtLeastOnce, o.HandlerData)
  41. }
  42. func (o *MqttHandler) HandlerData(m mqtt.Message) {
  43. for {
  44. ok, cnt := o.queue.Put(&m)
  45. if ok {
  46. break
  47. } else {
  48. global.GVA_LOG.Error(fmt.Sprintf("HandlerData:查询队列失败,队列消息数量:%d", cnt))
  49. runtime.Gosched()
  50. }
  51. }
  52. }
  53. func (o *MqttHandler) Handler() interface{} {
  54. defer func() {
  55. if err := recover(); err != nil {
  56. go GetHandler().Handler()
  57. global.GVA_LOG.Error(fmt.Sprintf("MqttHandler.Handler:发生异常:%s", string(debug.Stack())))
  58. }
  59. }()
  60. for {
  61. msg, ok, quantity := o.queue.Get()
  62. if !ok {
  63. time.Sleep(10 * time.Millisecond)
  64. continue
  65. } else if quantity > 1000 {
  66. global.GVA_LOG.Error(fmt.Sprintf("数据队列累积过多,请注意优化,当前队列条数:%d", quantity))
  67. }
  68. m, ok := msg.(*mqtt.Message)
  69. if !ok {
  70. continue
  71. }
  72. sn, topic, err := parseTopic(m.Topic())
  73. if err != nil {
  74. global.GVA_LOG.Error("parseTopic err")
  75. continue
  76. }
  77. switch topic {
  78. case TopicHighSpeed: //存储超速事件
  79. case TopicLowSpeed: //存储低俗时间
  80. case TopicChanStatus: //修改屏幕状态
  81. status, _ := strconv.Atoi(m.PayloadString())
  82. err := dao.UpdateScreensStatusBySn(sn, status)
  83. if err != nil {
  84. global.GVA_LOG.Error(fmt.Sprintf("修改屏幕状态失败:%s", err.Error()))
  85. }
  86. }
  87. }
  88. }
  89. func (o *MqttHandler) Publish(topic string, data interface{}) error {
  90. return mqtt.GetMQTTMgr().Publish(topic, data, mqtt.AtLeastOnce)
  91. }
  92. func (o *MqttHandler) GetTopic(deviceSn, protocol string) string {
  93. return fmt.Sprintf("smart_intersectionV2.0/%s/%s", deviceSn, protocol)
  94. }
  95. // parseTopic 获取设备SN, topic
  96. // "mini/*****/switch_control/ack"
  97. func parseTopic(topic string) (string, string, error) {
  98. strList := strings.Split(topic, "/")
  99. if len(strList) < 4 {
  100. return "", "", errors.New("不支持的topic")
  101. }
  102. topic = strings.Join(strList[2:], "/")
  103. return strList[1], topic, nil
  104. }
  105. func Sending(sn, json string) error {
  106. err := MqttService.Publish(MqttService.GetTopic(sn, protocol.TopicChanStatus), []byte(json))
  107. if err != nil {
  108. return fmt.Errorf("error updating: %v", err)
  109. }
  110. return nil
  111. }
  112. const (
  113. TopicChanStatus = "chanStatus" //上报状态
  114. TopicHighSpeed = "highSpeed" //超速时
  115. TopicLowSpeed = "lowSpeed" //低速时
  116. TopicSetControl = "setControl" //云台下发控制
  117. )