mqtt_handle.go 2.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104
  1. package service
  2. import (
  3. "errors"
  4. "fmt"
  5. "runtime"
  6. "runtime/debug"
  7. "smart_tunnel_edge/mqtt"
  8. "smart_tunnel_edge/util/logger"
  9. "strings"
  10. "sync"
  11. "time"
  12. )
  13. func InitMqtt() {
  14. MqttService = GetHandler()
  15. MqttService.SubscribeTopics()
  16. go MqttService.Handler()
  17. }
  18. var MqttService *MqttHandler
  19. type MqttHandler struct {
  20. queue *mqtt.MlQueue
  21. }
  22. var _handlerOnce sync.Once
  23. var _handlerSingle *MqttHandler
  24. func GetHandler() *MqttHandler {
  25. _handlerOnce.Do(func() {
  26. _handlerSingle = &MqttHandler{
  27. queue: mqtt.NewQueue(10000),
  28. }
  29. })
  30. return _handlerSingle
  31. }
  32. func (o *MqttHandler) SubscribeTopics() {
  33. mqtt.GetMQTTMgr().Subscribe("smart_intersection/#", mqtt.AtLeastOnce, o.HandlerData)
  34. }
  35. func (o *MqttHandler) HandlerData(m mqtt.Message) {
  36. for {
  37. ok, cnt := o.queue.Put(&m)
  38. if ok {
  39. break
  40. } else {
  41. logger.Logger.Errorf("HandlerData:查询队列失败,队列消息数量:%d", cnt)
  42. runtime.Gosched()
  43. }
  44. }
  45. }
  46. func (o *MqttHandler) Handler() interface{} {
  47. defer func() {
  48. if err := recover(); err != nil {
  49. go GetHandler().Handler()
  50. logger.Logger.Errorf("MqttHandler.Handler:发生异常:%s", string(debug.Stack()))
  51. }
  52. }()
  53. for {
  54. msg, ok, quantity := o.queue.Get()
  55. if !ok {
  56. time.Sleep(10 * time.Millisecond)
  57. continue
  58. } else if quantity > 1000 {
  59. logger.Logger.Warnf("数据队列累积过多,请注意优化,当前队列条数:%d", quantity)
  60. }
  61. m, ok := msg.(*mqtt.Message)
  62. if !ok {
  63. continue
  64. }
  65. _, topic, err := parseTopic(m.Topic())
  66. if err != nil {
  67. logger.Logger.Errorf("parseTopic err")
  68. continue
  69. }
  70. switch topic {
  71. case TopicDeviceGateway:
  72. case TopicDeviceCamera:
  73. default:
  74. fmt.Println("我是主题:::", topic)
  75. }
  76. }
  77. }
  78. func parseTopic(topic string) (string, string, error) {
  79. strList := strings.Split(topic, "/")
  80. if len(strList) < 4 {
  81. return "", "", errors.New("不支持的topic")
  82. }
  83. topic = strings.Join(strList[2:], "/")
  84. return strList[1], topic, nil
  85. }
  86. const (
  87. TopicDeviceGateway = "device/gateway"
  88. TopicDeviceCamera = "device/camera"
  89. TopicDeviceScreens = "device/screens"
  90. )