mqtt_handle.go 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125
  1. package mqtt
  2. import (
  3. "errors"
  4. "fmt"
  5. jsoniter "github.com/json-iterator/go"
  6. "go.uber.org/zap"
  7. "runtime"
  8. "runtime/debug"
  9. Dev "server/dao/devices"
  10. "server/model/common/request"
  11. "server/utils/logger"
  12. "strconv"
  13. "strings"
  14. "sync"
  15. "time"
  16. )
  17. func InitMqtt() {
  18. MqttService = GetHandler()
  19. MqttService.SubscribeTopics()
  20. go MqttService.Handler()
  21. }
  22. var MqttService *MqttHandler
  23. var jsoni = jsoniter.ConfigFastest
  24. type MqttHandler struct {
  25. queue *MlQueue
  26. }
  27. var _handlerOnce sync.Once
  28. var _handlerSingle *MqttHandler
  29. func GetHandler() *MqttHandler {
  30. _handlerOnce.Do(func() {
  31. _handlerSingle = &MqttHandler{
  32. queue: NewQueue(10000),
  33. }
  34. })
  35. return _handlerSingle
  36. }
  37. func (o *MqttHandler) SubscribeTopics() {
  38. GetMQTTMgr().Subscribe("smart_intersection/#", AtLeastOnce, o.HandlerData)
  39. }
  40. func (o *MqttHandler) HandlerData(m Message) {
  41. for {
  42. ok, cnt := o.queue.Put(&m)
  43. if ok {
  44. break
  45. } else {
  46. logger.Logger.Errorf("HandlerData:查询队列失败,队列消息数量:%d", cnt)
  47. runtime.Gosched()
  48. }
  49. }
  50. }
  51. func (o *MqttHandler) Handler() interface{} {
  52. defer func() {
  53. if err := recover(); err != nil {
  54. go GetHandler().Handler()
  55. logger.Logger.Errorf("MqttHandler.Handler:发生异常:%s", string(debug.Stack()))
  56. }
  57. }()
  58. for {
  59. msg, ok, quantity := o.queue.Get()
  60. if !ok {
  61. time.Sleep(10 * time.Millisecond)
  62. continue
  63. } else if quantity > 1000 {
  64. logger.Logger.Warnf("数据队列累积过多,请注意优化,当前队列条数:%d", quantity)
  65. }
  66. m, ok := msg.(*Message)
  67. if !ok {
  68. continue
  69. }
  70. deviceCode, topic, err := parseTopic(m.Topic())
  71. if err != nil {
  72. logger.Logger.Errorf("parseTopic err")
  73. continue
  74. }
  75. switch topic {
  76. case TopicDeviceGateway:
  77. time, _ := time.Parse("2006-01-02 15:04:05.999999999 -0700 MST", m.PayloadString())
  78. err := Dev.UpdateGatewayRecentOnline(deviceCode, time)
  79. if err != nil {
  80. logger.Logger.Error("UpdateGatewayRecentOnline err", zap.Error(err))
  81. }
  82. case TopicDeviceCamera:
  83. //根据编码修改摄像头状态
  84. status, _ := strconv.Atoi(m.PayloadString())
  85. err := Dev.UpdateCameraStatus(deviceCode, status)
  86. if err != nil {
  87. logger.Logger.Error("UpdateCameraStatus err", zap.Error(err))
  88. }
  89. case TopicDeviceScreens:
  90. var stat request.DeviceStatus
  91. jsoni.Unmarshal([]byte(m.PayloadString()), &stat)
  92. err = Dev.UpdateScreensStatus(deviceCode, stat)
  93. if err != nil {
  94. logger.Logger.Error("UpdateScreensStatus err", zap.Error(err))
  95. }
  96. default:
  97. fmt.Println("我是主题:::", topic)
  98. }
  99. }
  100. }
  101. func parseTopic(topic string) (string, string, error) {
  102. strList := strings.Split(topic, "/")
  103. if len(strList) < 4 {
  104. return "", "", errors.New("不支持的topic")
  105. }
  106. topic = strings.Join(strList[2:], "/")
  107. return strList[1], topic, nil
  108. }
  109. const (
  110. TopicDeviceGateway = "device/gateway"
  111. TopicDeviceCamera = "device/camera"
  112. TopicDeviceScreens = "device/screens"
  113. )