smartXHandler.go 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104
  1. package main
  2. import (
  3. "errors"
  4. "io/ioutil"
  5. "os"
  6. "path/filepath"
  7. "runtime"
  8. "runtime/debug"
  9. "strings"
  10. "sync"
  11. "time"
  12. "github.com/sirupsen/logrus"
  13. "lc/common/models"
  14. "lc/common/mqtt"
  15. "lc/common/protocol"
  16. "lc/common/util"
  17. )
  18. var _smartXHandlerOnce sync.Once
  19. var _smartXHandlerSingle *smartXHandler
  20. func GetSmartXHandler() *smartXHandler {
  21. _smartXHandlerOnce.Do(func() {
  22. _smartXHandlerSingle = &smartXHandler{
  23. queue: util.NewQueue(10000),
  24. }
  25. })
  26. return _smartXHandlerSingle
  27. }
  28. type smartXHandler struct {
  29. queue *util.MlQueue
  30. }
  31. func (o *smartXHandler) SubscribeTopics() {
  32. GetMQTTMgr().Subscribe("smart_intersection/led/#/state", mqtt.AtMostOnce, o.HandlerData)
  33. }
  34. func (o *smartXHandler) HandlerData(m mqtt.Message) {
  35. for {
  36. ok, cnt := o.queue.Put(&m)
  37. if ok {
  38. break
  39. } else {
  40. logrus.Errorf("smartXHandler.HandlerData:查询队列失败,队列消息数量:%d", cnt)
  41. runtime.Gosched()
  42. }
  43. }
  44. }
  45. //smart_intersection/led/"+s.info.SN+"/state"
  46. func parseTopic(topic string) string {
  47. strList := strings.Split(topic, "/")
  48. if len(strList) != 4 {
  49. return ""
  50. }
  51. return strList[2]
  52. }
  53. func (o *smartXHandler) Handler(args ...interface{}) interface{} {
  54. defer func() {
  55. if err := recover(); err != nil {
  56. gopool.Add(o.Handler, args)
  57. logrus.Errorf("smartXHandler.Handler:%v发生异常:%s", args, string(debug.Stack()))
  58. }
  59. }()
  60. for {
  61. msg, ok, quantity := o.queue.Get()
  62. if !ok {
  63. time.Sleep(10 * time.Millisecond)
  64. continue
  65. } else if quantity > 1000 {
  66. logrus.Warnf("数据队列累积过多,请注意优化,当前队列条数:%d", quantity)
  67. }
  68. m, ok := msg.(*mqtt.Message)
  69. if !ok {
  70. continue
  71. }
  72. sn := parseTopic(m.Topic())
  73. if sn != "" {
  74. continue
  75. }
  76. switch topic {
  77. case protocol.TP_smartX_ONLINE: //上线
  78. var obj protocol.Pack_IDObject
  79. if err := obj.DeCode(m.PayloadString()); err == nil { //网关在线
  80. cacheState(obj.Id, obj.Time, 0)
  81. GetEventMgr().PushEvent(&EventObject{ID: obj.Id, EventType: models.ET_ONLINE, Time: util.MlNow()})
  82. }
  83. case protocol.TP_smartX_WILL: //下线
  84. var obj protocol.Pack_IDObject
  85. if err := obj.DeCode(m.PayloadString()); err == nil { //网关离线
  86. cacheState(obj.Id, obj.Time, 1)
  87. GetEventMgr().PushEvent(&EventObject{ID: obj.Id, EventType: models.ET_OFFLINE, Time: util.MlNow()})
  88. }
  89. default:
  90. logrus.Warnf("smartXHandler.Handler:收到暂不支持的主题:%s", topic)
  91. }
  92. }
  93. }