| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101 |
- package main
- import (
- "runtime"
- "runtime/debug"
- "sync"
- "time"
- "lc/common/mqtt"
- "lc/common/protocol"
- "lc/common/util"
- "strings"
- "github.com/sirupsen/logrus"
- )
- // 海蓝物联 Zigbee集控器
- var _HlZigbeeConcentratorMgrOnce sync.Once
- var _HlZigbeeConcentratorMgrSingle *HlZigbeeConcentratorMgr
- func GetHlZigbeeConcentratorMgr() *HlZigbeeConcentratorMgr {
- _HlZigbeeConcentratorMgrOnce.Do(func() {
- _HlZigbeeConcentratorMgrSingle = &HlZigbeeConcentratorMgr{
- queue: util.NewQueue(10000),
- mapHlZigbeeConcentrator: make(map[string]*HlZigbeeConcentrator),
- }
- })
- return _HlZigbeeConcentratorMgrSingle
- }
- type HlZigbeeConcentratorMgr struct {
- queue *util.MlQueue
- mapHlZigbeeConcentrator map[string]*HlZigbeeConcentrator
- }
- func (o *HlZigbeeConcentratorMgr) SubscribeTopics() {
- GetHlMqttMgr().Subscribe(GetHLTopicUp(), mqtt.AtMostOnce, o.HandlerData)
- }
- func (o *HlZigbeeConcentratorMgr) HandlerData(m mqtt.Message) {
- for {
- ok, cnt := o.queue.Put(&m)
- if ok {
- break
- } else {
- logrus.Errorf("HlZigbeeConcentratorMgr.HandlerData:查询队列失败,队列消息数量:%d", cnt)
- runtime.Gosched()
- }
- }
- }
- // Handler 解析mqtt消息,创建集控器并启用
- func (o *HlZigbeeConcentratorMgr) Handler(args ...interface{}) interface{} {
- defer func() {
- if err := recover(); err != nil {
- gopool.Add(o.Handler, args)
- logrus.Errorf("HlZigbeeConcentratorMgr.Handler发生异常:%v", err)
- logrus.Errorf("HlZigbeeConcentratorMgr.Handler发生异常,堆栈信息:%s", string(debug.Stack()))
- }
- }()
- for {
- msg, ok, quantity := o.queue.Get()
- if !ok {
- time.Sleep(10 * time.Millisecond)
- continue
- } else if quantity > 1000 {
- logrus.Warnf("HlZigbeeConcentratorMgr.Handler:数据队列累积过多,请注意优化,当前队列条数:%d", quantity)
- }
- m, ok := msg.(*mqtt.Message)
- if !ok {
- continue
- }
- Tenant, t, err := ParseTopicHL(m.Topic())
- if err != nil {
- continue
- }
- if t != GetHLTopicUp() {
- logrus.Debugf("Topic不对:%s", t)
- continue
- }
- if strings.Contains(m.PayloadString(), "\"masterSn\"") { //带有集控器字样
- var hqls protocol.HLWLZB_Pack //decode后取出masterSn
- if err := hqls.DeCode(m.PayloadString()); err == nil {
- pzl, ok := o.mapHlZigbeeConcentrator[hqls.MasterSn]
- if !ok {
- pzl = NewHlZigbeeConcentrator(Tenant, hqls.MasterSn)
- pzl.Start()
- o.mapHlZigbeeConcentrator[hqls.MasterSn] = pzl
- }
- pzl.PutMessage(m)
- } else {
- logrus.Debugf("HlZigbeeConcentratorMgr.Handler:topic不符合要求2:%s", m.PayloadString())
- }
- } else {
- logrus.Debugf("HlZigbeeConcentratorMgr.Handler:topic不符合要求:%s", m.PayloadString())
- }
- }
- }
|