mqtt.go 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153
  1. package devices
  2. import (
  3. "errors"
  4. "fmt"
  5. "regexp"
  6. "runtime"
  7. "runtime/debug"
  8. "server/dao"
  9. "server/global"
  10. "server/utils/mqtt"
  11. "server/utils/protocol"
  12. "strconv"
  13. "strings"
  14. "sync"
  15. "time"
  16. )
  17. func InitMqtt() {
  18. MqttService = GetHandler()
  19. MqttService.SubscribeTopics()
  20. go MqttService.Handler()
  21. }
  22. var MqttService *MqttHandler
  23. var timeoutReg = regexp.MustCompile("Client .* has exceeded timeout")
  24. var connectReg = regexp.MustCompile(`New client connected from .* as .*\(`)
  25. var disconnectReg = regexp.MustCompile("Client mqttx_893e4b7d disconnected")
  26. var DeviceOnlineTimes = make(map[string]*time.Time)
  27. type MqttHandler struct {
  28. queue *mqtt.MlQueue
  29. }
  30. var _handlerOnce sync.Once
  31. var _handlerSingle *MqttHandler
  32. func GetHandler() *MqttHandler {
  33. _handlerOnce.Do(func() {
  34. _handlerSingle = &MqttHandler{
  35. queue: mqtt.NewQueue(10000),
  36. }
  37. })
  38. return _handlerSingle
  39. }
  40. func (o *MqttHandler) SubscribeTopics() {
  41. mqtt.GetMQTTMgr().Subscribe("smart_intersectionV2.0/#", mqtt.AtLeastOnce, o.HandlerData)
  42. }
  43. func (o *MqttHandler) HandlerData(m mqtt.Message) {
  44. for {
  45. ok, cnt := o.queue.Put(&m)
  46. if ok {
  47. break
  48. } else {
  49. global.GVA_LOG.Error(fmt.Sprintf("HandlerData:查询队列失败,队列消息数量:%d", cnt))
  50. runtime.Gosched()
  51. }
  52. }
  53. }
  54. func (o *MqttHandler) Handler() interface{} {
  55. defer func() {
  56. if err := recover(); err != nil {
  57. go GetHandler().Handler()
  58. global.GVA_LOG.Error(fmt.Sprintf("MqttHandler.Handler:发生异常:%s", string(debug.Stack())))
  59. }
  60. }()
  61. for {
  62. msg, ok, quantity := o.queue.Get()
  63. if !ok {
  64. time.Sleep(10 * time.Millisecond)
  65. continue
  66. } else if quantity > 1000 {
  67. global.GVA_LOG.Error(fmt.Sprintf("数据队列累积过多,请注意优化,当前队列条数:%d", quantity))
  68. }
  69. m, ok := msg.(*mqtt.Message)
  70. if !ok {
  71. continue
  72. }
  73. sn, topic, err := parseTopic(m.Topic())
  74. if err != nil {
  75. global.GVA_LOG.Error("parseTopic err")
  76. continue
  77. }
  78. global.GVA_LOG.Info(fmt.Sprintf("mqtt Handler sn :%s", sn))
  79. global.GVA_LOG.Info(fmt.Sprintf("mqtt Handler topic :%s", topic))
  80. switch topic {
  81. case protocol.TopicHighSpeed: //存储超速事件
  82. event := dao.Event{Sn: sn, Type: "超速", Time: m.PayloadString()}
  83. err := event.AddEvent()
  84. if err != nil {
  85. global.GVA_LOG.Error(fmt.Sprintf("添加事件失败:%s", err.Error()))
  86. }
  87. case protocol.TopicLowSpeed: //存储低速时间
  88. event := dao.Event{Sn: sn, Type: "低速", Time: m.PayloadString()}
  89. err := event.AddEvent()
  90. if err != nil {
  91. global.GVA_LOG.Error(fmt.Sprintf("添加事件失败:%s", err.Error()))
  92. }
  93. case protocol.TopicChanStatus: //修改屏幕状态
  94. status, _ := strconv.Atoi(m.PayloadString())
  95. err := dao.UpdateScreensStatusBySn(sn, time.Now().Format("2006-1-2 15:04:05"), status)
  96. if err != nil {
  97. global.GVA_LOG.Error(fmt.Sprintf("修改屏幕状态失败:%s", err.Error()))
  98. }
  99. case protocol.TopicReportTime:
  100. parts := strings.Split(m.PayloadString(), " m=")
  101. cleanTimeStr := parts[0]
  102. // 时间格式
  103. layout := "2006-01-02 15:04:05.999999999 +0000 UTC"
  104. // 解析时间
  105. parsedTime, _ := time.Parse(layout, cleanTimeStr)
  106. //fmt.Println("转换时间:", parsedTime)
  107. DeviceOnlineTimes[sn] = &parsedTime
  108. dao.UpdateScreensStatusBySn(sn, parsedTime.Format("2006-1-2 15:04:05"), 1)
  109. }
  110. }
  111. }
  112. func (o *MqttHandler) Publish(topic string, data interface{}) error {
  113. return mqtt.GetMQTTMgr().Publish(topic, data, mqtt.AtLeastOnce)
  114. }
  115. func (o *MqttHandler) GetTopic(deviceSn, protocol string) string {
  116. return fmt.Sprintf("smart_intersectionV2.0/%s/%s", deviceSn, protocol)
  117. }
  118. // parseTopic 获取设备SN, topic
  119. // "mini/*****/switch_control/ack"
  120. func parseTopic(topic string) (string, string, error) {
  121. strList := strings.Split(topic, "/")
  122. if len(strList) < 3 {
  123. return "", "", errors.New("不支持的topic")
  124. }
  125. topic = strings.Join(strList[2:], "/")
  126. return strList[1], topic, nil
  127. }
  128. func Sending(sn, json string) error {
  129. err := MqttService.Publish(MqttService.GetTopic(sn, protocol.TopicSetControl), []byte(json))
  130. if err != nil {
  131. return fmt.Errorf("error updating: %v", err)
  132. }
  133. return nil
  134. }