mqtt_handle.go 2.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108
  1. package mqtt
  2. import (
  3. "errors"
  4. "fmt"
  5. "runtime"
  6. "runtime/debug"
  7. "server/utils/logger"
  8. "strings"
  9. "sync"
  10. "time"
  11. )
  12. func InitMqtt() {
  13. MqttService = GetHandler()
  14. MqttService.SubscribeTopics()
  15. go MqttService.Handler()
  16. }
  17. var MqttService *MqttHandler
  18. type MqttHandler struct {
  19. queue *MlQueue
  20. }
  21. var _handlerOnce sync.Once
  22. var _handlerSingle *MqttHandler
  23. func GetHandler() *MqttHandler {
  24. _handlerOnce.Do(func() {
  25. _handlerSingle = &MqttHandler{
  26. queue: NewQueue(10000),
  27. }
  28. })
  29. return _handlerSingle
  30. }
  31. func (o *MqttHandler) SubscribeTopics() {
  32. GetMQTTMgr().Subscribe("smartIntersection/#", AtLeastOnce, o.HandlerData)
  33. }
  34. func (o *MqttHandler) HandlerData(m Message) {
  35. for {
  36. ok, cnt := o.queue.Put(&m)
  37. if ok {
  38. break
  39. } else {
  40. logger.Logger.Errorf("HandlerData:查询队列失败,队列消息数量:%d", cnt)
  41. runtime.Gosched()
  42. }
  43. }
  44. }
  45. func (o *MqttHandler) Handler() interface{} {
  46. defer func() {
  47. if err := recover(); err != nil {
  48. go GetHandler().Handler()
  49. logger.Logger.Errorf("MqttHandler.Handler:发生异常:%s", string(debug.Stack()))
  50. }
  51. }()
  52. for {
  53. msg, ok, quantity := o.queue.Get()
  54. if !ok {
  55. time.Sleep(10 * time.Millisecond)
  56. continue
  57. } else if quantity > 1000 {
  58. logger.Logger.Warnf("数据队列累积过多,请注意优化,当前队列条数:%d", quantity)
  59. }
  60. m, ok := msg.(*Message)
  61. if !ok {
  62. continue
  63. }
  64. deviceCode, topic, err := parseTopic(m.Topic())
  65. if err != nil {
  66. logger.Logger.Errorf("parseTopic err")
  67. continue
  68. }
  69. switch topic {
  70. case TopicDeviceCamera:
  71. fmt.Println("code...", deviceCode)
  72. fmt.Println("数据...", m.PayloadString())
  73. //TODO 待更新数据库
  74. }
  75. }
  76. }
  77. func (o *MqttHandler) Publish(topic string, data interface{}) error {
  78. return GetMQTTMgr().Publish(topic, data, AtLeastOnce)
  79. }
  80. func (o *MqttHandler) GetTopic(deviceCode, protocol string) string {
  81. return fmt.Sprintf("smartIntersection/%s/%s", deviceCode, protocol)
  82. }
  83. func parseTopic(topic string) (string, string, error) {
  84. strList := strings.Split(topic, "/")
  85. if len(strList) < 4 {
  86. return "", "", errors.New("不支持的topic")
  87. }
  88. topic = strings.Join(strList[2:], "/")
  89. return strList[1], topic, nil
  90. }
  91. const (
  92. TopicDeviceCamera = "device/camera"
  93. )