hlzbconcentratormgr.go 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101
  1. package main
  2. import (
  3. "runtime"
  4. "runtime/debug"
  5. "sync"
  6. "time"
  7. "lc/common/mqtt"
  8. "lc/common/protocol"
  9. "lc/common/util"
  10. "strings"
  11. "github.com/sirupsen/logrus"
  12. )
  13. // 海蓝物联 Zigbee集控器
  14. var _HlZigbeeConcentratorMgrOnce sync.Once
  15. var _HlZigbeeConcentratorMgrSingle *HlZigbeeConcentratorMgr
  16. func GetHlZigbeeConcentratorMgr() *HlZigbeeConcentratorMgr {
  17. _HlZigbeeConcentratorMgrOnce.Do(func() {
  18. _HlZigbeeConcentratorMgrSingle = &HlZigbeeConcentratorMgr{
  19. queue: util.NewQueue(10000),
  20. mapHlZigbeeConcentrator: make(map[string]*HlZigbeeConcentrator),
  21. }
  22. })
  23. return _HlZigbeeConcentratorMgrSingle
  24. }
  25. type HlZigbeeConcentratorMgr struct {
  26. queue *util.MlQueue
  27. mapHlZigbeeConcentrator map[string]*HlZigbeeConcentrator
  28. }
  29. func (o *HlZigbeeConcentratorMgr) SubscribeTopics() {
  30. GetHlMqttMgr().Subscribe(GetHLTopicUp(), mqtt.AtMostOnce, o.HandlerData)
  31. }
  32. func (o *HlZigbeeConcentratorMgr) HandlerData(m mqtt.Message) {
  33. for {
  34. ok, cnt := o.queue.Put(&m)
  35. if ok {
  36. break
  37. } else {
  38. logrus.Errorf("HlZigbeeConcentratorMgr.HandlerData:查询队列失败,队列消息数量:%d", cnt)
  39. runtime.Gosched()
  40. }
  41. }
  42. }
  43. // Handler 解析mqtt消息,创建集控器并启用
  44. func (o *HlZigbeeConcentratorMgr) Handler(args ...interface{}) interface{} {
  45. defer func() {
  46. if err := recover(); err != nil {
  47. gopool.Add(o.Handler, args)
  48. logrus.Errorf("HlZigbeeConcentratorMgr.Handler发生异常:%v", err)
  49. logrus.Errorf("HlZigbeeConcentratorMgr.Handler发生异常,堆栈信息:%s", string(debug.Stack()))
  50. }
  51. }()
  52. for {
  53. msg, ok, quantity := o.queue.Get()
  54. if !ok {
  55. time.Sleep(10 * time.Millisecond)
  56. continue
  57. } else if quantity > 1000 {
  58. logrus.Warnf("HlZigbeeConcentratorMgr.Handler:数据队列累积过多,请注意优化,当前队列条数:%d", quantity)
  59. }
  60. m, ok := msg.(*mqtt.Message)
  61. if !ok {
  62. continue
  63. }
  64. Tenant, t, err := ParseTopicHL(m.Topic())
  65. if err != nil {
  66. continue
  67. }
  68. if t != GetHLTopicUp() {
  69. logrus.Debugf("Topic不对:%s", t)
  70. continue
  71. }
  72. if strings.Contains(m.PayloadString(), "\"masterSn\"") { //带有集控器字样
  73. var hqls protocol.HLWLZB_Pack //decode后取出masterSn
  74. if err := hqls.DeCode(m.PayloadString()); err == nil {
  75. pzl, ok := o.mapHlZigbeeConcentrator[hqls.MasterSn]
  76. if !ok {
  77. pzl = NewHlZigbeeConcentrator(Tenant, hqls.MasterSn)
  78. pzl.Start()
  79. o.mapHlZigbeeConcentrator[hqls.MasterSn] = pzl
  80. }
  81. pzl.PutMessage(m)
  82. } else {
  83. logrus.Debugf("HlZigbeeConcentratorMgr.Handler:topic不符合要求2:%s", m.PayloadString())
  84. }
  85. } else {
  86. logrus.Debugf("HlZigbeeConcentratorMgr.Handler:topic不符合要求:%s", m.PayloadString())
  87. }
  88. }
  89. }