chzbconcentratormgr.go 3.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394
  1. package main
  2. import (
  3. "runtime"
  4. "runtime/debug"
  5. "sync"
  6. "time"
  7. "github.com/sirupsen/logrus"
  8. "lc/common/mqtt"
  9. "lc/common/protocol"
  10. "lc/common/util"
  11. )
  12. var _ChzbConcentratorMgrOnce sync.Once
  13. var _ChzbConcentratorMgrSingle *ChzbConcentratorMgr
  14. func GetChzbConcentratorMgr() *ChzbConcentratorMgr {
  15. _ChzbConcentratorMgrOnce.Do(func() {
  16. _ChzbConcentratorMgrSingle = &ChzbConcentratorMgr{
  17. queue: util.NewQueue(10000),
  18. mapChzbConcentrator: make(map[string]*ChZigbeeConcentrator),
  19. }
  20. })
  21. return _ChzbConcentratorMgrSingle
  22. }
  23. type ChzbConcentratorMgr struct {
  24. queue *util.MlQueue
  25. mapChzbConcentrator map[string]*ChZigbeeConcentrator
  26. }
  27. func (o *ChzbConcentratorMgr) SubscribeTopics() {
  28. GetMQTTMgr().Subscribe(GetVagueTopic(protocol.DT_CONCENTRATOR, protocol.TP_CHZB_DATA), mqtt.AtMostOnce, o.HandlerData)
  29. GetMQTTMgr().Subscribe(GetVagueTopic(protocol.DT_CONCENTRATOR, protocol.TP_CHZB_SET_WAITTIME_ACK), mqtt.AtMostOnce, o.HandlerData)
  30. GetMQTTMgr().Subscribe(GetVagueTopic(protocol.DT_CONCENTRATOR, protocol.TP_CHZB_SET_SWITCH_ACK), mqtt.AtMostOnce, o.HandlerData)
  31. GetMQTTMgr().Subscribe(GetVagueTopic(protocol.DT_CONCENTRATOR, protocol.TP_CHZB_SET_RECOVERY_AUTO_ACK), mqtt.AtMostOnce, o.HandlerData)
  32. GetMQTTMgr().Subscribe(GetVagueTopic(protocol.DT_CONCENTRATOR, protocol.TP_CHZB_SET_ONOFFTIME_ACK), mqtt.AtMostOnce, o.HandlerData)
  33. GetMQTTMgr().Subscribe(GetVagueTopic(protocol.DT_CONCENTRATOR, protocol.TP_CHZB_QUERY_ONOFFTIME_ACK), mqtt.AtMostOnce, o.HandlerData)
  34. GetMQTTMgr().Subscribe(GetVagueTopic(protocol.DT_CONCENTRATOR, protocol.TP_CHZB_SET_UPDATE_LAMP_ACK), mqtt.AtMostOnce, o.HandlerData)
  35. GetMQTTMgr().Subscribe(GetVagueTopic(protocol.DT_CONCENTRATOR, protocol.TP_CHZB_QUERY_LAMP), mqtt.AtMostOnce, o.HandlerData)
  36. GetMQTTMgr().Subscribe(GetVagueTopic(protocol.DT_CONCENTRATOR, protocol.TP_CHZB_QUERY_TIME_ACK), mqtt.AtMostOnce, o.HandlerData)
  37. GetMQTTMgr().Subscribe(GetVagueTopic(protocol.DT_CONCENTRATOR, protocol.TP_CHZB_SET_BROADCASTTIME_ACK), mqtt.AtMostOnce, o.HandlerData)
  38. GetMQTTMgr().Subscribe(GetVagueTopic(protocol.DT_CONCENTRATOR, protocol.TP_CHZB_ALARM), mqtt.AtMostOnce, o.HandlerData)
  39. }
  40. func (o *ChzbConcentratorMgr) HandlerData(m mqtt.Message) {
  41. for {
  42. ok, cnt := o.queue.Put(&m)
  43. if ok {
  44. break
  45. } else {
  46. logrus.Errorf("ChzbConcentratorMgr.HandlerData:查询队列失败,队列消息数量:%d", cnt)
  47. runtime.Gosched()
  48. }
  49. }
  50. }
  51. func (o *ChzbConcentratorMgr) Handler(args ...interface{}) interface{} {
  52. defer func() {
  53. if err := recover(); err != nil {
  54. gopool.Add(o.Handler, args)
  55. logrus.Errorf("ChzbConcentratorMgr.Handler发生异常:%v", err)
  56. logrus.Errorf("ChzbConcentratorMgr.Handler发生异常,堆栈信息:%s", string(debug.Stack()))
  57. }
  58. }()
  59. for {
  60. msg, ok, quantity := o.queue.Get()
  61. if !ok {
  62. time.Sleep(10 * time.Millisecond)
  63. continue
  64. } else if quantity > 1000 {
  65. logrus.Warnf("ChzbConcentratorMgr.Handler:数据队列累积过多,请注意优化,当前队列条数:%d", quantity)
  66. }
  67. m, ok := msg.(*mqtt.Message)
  68. if !ok {
  69. continue
  70. }
  71. Tenant, _, DID, _, err := ParseTopic(m.Topic())
  72. if err != nil {
  73. continue
  74. }
  75. pzl, ok := o.mapChzbConcentrator[DID]
  76. if !ok {
  77. pzl = NewChZigbeeConcentrator(Tenant, DID)
  78. pzl.Start()
  79. o.mapChzbConcentrator[DID] = pzl
  80. }
  81. pzl.PutMessage(m)
  82. }
  83. }