mqtt_handle.go 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120
  1. package service
  2. import (
  3. "errors"
  4. "fmt"
  5. "runtime"
  6. "runtime/debug"
  7. "smartIntersection_edge/mqtt"
  8. "smartIntersection_edge/util/logger"
  9. "strings"
  10. "sync"
  11. "time"
  12. )
  13. func InitMqtt() {
  14. MqttService = GetHandler()
  15. MqttService.SubscribeTopics()
  16. go MqttService.Handler()
  17. }
  18. var MqttService *MqttHandler
  19. type MqttHandler struct {
  20. queue *mqtt.MlQueue
  21. }
  22. var _handlerOnce sync.Once
  23. var _handlerSingle *MqttHandler
  24. func GetHandler() *MqttHandler {
  25. _handlerOnce.Do(func() {
  26. _handlerSingle = &MqttHandler{
  27. queue: mqtt.NewQueue(10000),
  28. }
  29. })
  30. return _handlerSingle
  31. }
  32. func (o *MqttHandler) SubscribeTopics() {
  33. mqtt.GetMQTTMgr().Subscribe("smart_intersectionV2.0/#", mqtt.AtLeastOnce, o.HandlerData)
  34. }
  35. func (o *MqttHandler) HandlerData(m mqtt.Message) {
  36. for {
  37. ok, cnt := o.queue.Put(&m)
  38. if ok {
  39. break
  40. } else {
  41. logger.Logger.Errorf("HandlerData:查询队列失败,队列消息数量:%d", cnt)
  42. runtime.Gosched()
  43. }
  44. }
  45. }
  46. func (o *MqttHandler) Handler() interface{} {
  47. defer func() {
  48. if err := recover(); err != nil {
  49. go GetHandler().Handler()
  50. logger.Logger.Errorf("MqttHandler.Handler:发生异常:%s", string(debug.Stack()))
  51. }
  52. }()
  53. for {
  54. msg, ok, quantity := o.queue.Get()
  55. if !ok {
  56. time.Sleep(10 * time.Millisecond)
  57. continue
  58. } else if quantity > 1000 {
  59. logger.Logger.Warnf("数据队列累积过多,请注意优化,当前队列条数:%d", quantity)
  60. }
  61. m, ok := msg.(*mqtt.Message)
  62. if !ok {
  63. continue
  64. }
  65. sn, topic, err := ParseTopic(m.Topic())
  66. if err != nil {
  67. logger.Logger.Errorf("parseTopic err")
  68. continue
  69. }
  70. switch topic {
  71. case TopicSetControl:
  72. command, _ := UTF8ToGB2312(m.PayloadString())
  73. n, err := Devices[sn].Conn.Write(command)
  74. if n <= 0 && err != nil {
  75. logger.Logger.Errorf("设备:%v操作失败,错误:%v", sn, err)
  76. }
  77. case TopicOffline:
  78. //删除内存中的离线设备
  79. delete(Devices, m.PayloadString())
  80. }
  81. }
  82. }
  83. func (o *MqttHandler) Publish(topic string, data interface{}) error {
  84. return mqtt.GetMQTTMgr().Publish(topic, data, mqtt.AtLeastOnce)
  85. }
  86. func (o *MqttHandler) GetTopic(sn, operation string) string {
  87. return fmt.Sprintf("smart_intersectionV2.0/%s/%s", sn, operation)
  88. }
  89. func ParseTopic(topic string) (sn string, operate string, err error) {
  90. strList := strings.Split(topic, "/")
  91. sn = strList[1]
  92. operate = strList[2]
  93. if len(strList) != 3 {
  94. return "", "", errors.New("不支持的topic")
  95. }
  96. return
  97. }
  98. const (
  99. TopicChanStatus = "chanStatus" //上报状态
  100. TopicHighSpeed = "highSpeed" //超速时
  101. TopicLowSpeed = "lowSpeed" //低速时
  102. TopicSetControl = "setControl" //云台下发控制
  103. TopicReportTime = "reportTime" //上报在线时间
  104. TopicOffline = "Offline" //离线消息
  105. )