mqtt_handle.go 2.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110
  1. package mqtt
  2. import (
  3. "errors"
  4. "fmt"
  5. "runtime"
  6. "runtime/debug"
  7. Dev "server/dao/devices"
  8. "server/utils/logger"
  9. "strconv"
  10. "strings"
  11. "sync"
  12. "time"
  13. )
  14. func InitMqtt() {
  15. MqttService = GetHandler()
  16. MqttService.SubscribeTopics()
  17. go MqttService.Handler()
  18. }
  19. var MqttService *MqttHandler
  20. type MqttHandler struct {
  21. queue *MlQueue
  22. }
  23. var _handlerOnce sync.Once
  24. var _handlerSingle *MqttHandler
  25. func GetHandler() *MqttHandler {
  26. _handlerOnce.Do(func() {
  27. _handlerSingle = &MqttHandler{
  28. queue: NewQueue(10000),
  29. }
  30. })
  31. return _handlerSingle
  32. }
  33. func (o *MqttHandler) SubscribeTopics() {
  34. GetMQTTMgr().Subscribe("smartIntersection/#", AtLeastOnce, o.HandlerData)
  35. }
  36. func (o *MqttHandler) HandlerData(m Message) {
  37. for {
  38. ok, cnt := o.queue.Put(&m)
  39. if ok {
  40. break
  41. } else {
  42. logger.Logger.Errorf("HandlerData:查询队列失败,队列消息数量:%d", cnt)
  43. runtime.Gosched()
  44. }
  45. }
  46. }
  47. func (o *MqttHandler) Handler() interface{} {
  48. defer func() {
  49. if err := recover(); err != nil {
  50. go GetHandler().Handler()
  51. logger.Logger.Errorf("MqttHandler.Handler:发生异常:%s", string(debug.Stack()))
  52. }
  53. }()
  54. for {
  55. msg, ok, quantity := o.queue.Get()
  56. if !ok {
  57. time.Sleep(10 * time.Millisecond)
  58. continue
  59. } else if quantity > 1000 {
  60. logger.Logger.Warnf("数据队列累积过多,请注意优化,当前队列条数:%d", quantity)
  61. }
  62. m, ok := msg.(*Message)
  63. if !ok {
  64. continue
  65. }
  66. deviceCode, topic, err := parseTopic(m.Topic())
  67. if err != nil {
  68. logger.Logger.Errorf("parseTopic err")
  69. continue
  70. }
  71. switch topic {
  72. case TopicDeviceCamera:
  73. //根据编码修改摄像头状态
  74. status, _ := strconv.Atoi(m.PayloadString())
  75. Dev.UpdateCameraStatus(deviceCode, status)
  76. }
  77. }
  78. }
  79. func (o *MqttHandler) Publish(topic string, data interface{}) error {
  80. return GetMQTTMgr().Publish(topic, data, AtLeastOnce)
  81. }
  82. func (o *MqttHandler) GetTopic(deviceCode, protocol string) string {
  83. return fmt.Sprintf("smartIntersection/%s/%s", deviceCode, protocol)
  84. }
  85. func parseTopic(topic string) (string, string, error) {
  86. strList := strings.Split(topic, "/")
  87. if len(strList) < 4 {
  88. return "", "", errors.New("不支持的topic")
  89. }
  90. topic = strings.Join(strList[2:], "/")
  91. return strList[1], topic, nil
  92. }
  93. const (
  94. TopicDeviceCamera = "device/camera"
  95. )