radarhandler.go 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109
  1. package main
  2. import (
  3. "runtime"
  4. "runtime/debug"
  5. "strings"
  6. "sync"
  7. "time"
  8. "github.com/sirupsen/logrus"
  9. "lc/common/models"
  10. "lc/common/mqtt"
  11. "lc/common/protocol"
  12. "lc/common/util"
  13. )
  14. var _radarHandlerOnce sync.Once
  15. var _radarHandlerSingle *radarHandler
  16. func GetRadarHandler() *radarHandler {
  17. _radarHandlerOnce.Do(func() {
  18. _radarHandlerSingle = &radarHandler{
  19. queue: util.NewQueue(10000),
  20. }
  21. })
  22. return _radarHandlerSingle
  23. }
  24. type radarHandler struct {
  25. queue *util.MlQueue
  26. }
  27. func (o *radarHandler) SubscribeTopics() {
  28. GetMQTTMgr().Subscribe(GetVagueTopic(protocol.DT_Radar, protocol.TP_RADAR_DATA), mqtt.AtMostOnce, o.HandlerData)
  29. }
  30. func (o *radarHandler) HandlerData(m mqtt.Message) {
  31. for {
  32. ok, cnt := o.queue.Put(&m)
  33. if ok {
  34. break
  35. } else {
  36. logrus.Errorf("radarHandler.HandlerData:查询队列失败,队列消息数量:%d", cnt)
  37. runtime.Gosched()
  38. }
  39. }
  40. }
  41. func (o *radarHandler) Handler(args ...interface{}) interface{} {
  42. defer func() {
  43. if err := recover(); err != nil {
  44. gopool.Add(o.Handler, args)
  45. logrus.Errorf("radarHandler.Handler:%v发生异常:%s", args, string(debug.Stack()))
  46. }
  47. }()
  48. for {
  49. msg, ok, quantity := o.queue.Get()
  50. if !ok {
  51. time.Sleep(10 * time.Millisecond)
  52. continue
  53. } else if quantity > 1000 {
  54. logrus.Warnf("数据队列累积过多,请注意优化,当前队列条数:%d", quantity)
  55. }
  56. m, ok := msg.(*mqtt.Message)
  57. if !ok {
  58. continue
  59. }
  60. _, _, GID, topic, err := ParseTopic(m.Topic())
  61. if err != nil {
  62. continue
  63. }
  64. switch topic {
  65. case protocol.TP_RADAR_DATA:
  66. var ret protocol.Pack_UploadData
  67. if err := ret.DeCode(m.PayloadString()); err == nil {
  68. if ret.Data.State == protocol.FAILED || len(ret.Data.Data) != 5 || ret.Data.Data[3] < 0 {
  69. logrus.Warningf("雷达数据不正确 %+v", ret)
  70. continue
  71. }
  72. t, _ := util.MlParseTime(ret.Time)
  73. ipPort := strings.Split(ret.Id, ":")
  74. o := models.RadarData{
  75. GID: GID,
  76. DID: ret.Id,
  77. Time: t,
  78. UID: uint16(ret.Data.Data[0]),
  79. RadarID: uint64(ret.Data.Data[1]),
  80. Type: uint8(ret.Data.Data[2]),
  81. Speed: ret.Data.Data[3],
  82. Dist: ret.Data.Data[4],
  83. }
  84. if len(ipPort) == 2 {
  85. o.IP = ipPort[0]
  86. o.Port = ipPort[1]
  87. }
  88. if err := models.G_db.Save(&o).Error; err != nil {
  89. logrus.Errorf("插入数据库失败:%s", err.Error())
  90. }
  91. logrus.Debugf("雷达数据插入成功, 数据: %+v", ret)
  92. }
  93. default:
  94. logrus.Warnf("radarHandler.Handler:收到暂不支持的主题:%s", topic)
  95. }
  96. }
  97. }