package main import ( "runtime" "runtime/debug" "sync" "time" "lc/common/mqtt" "lc/common/protocol" "lc/common/util" "strings" "github.com/sirupsen/logrus" ) // 海蓝物联 Zigbee集控器 var _HlZigbeeConcentratorMgrOnce sync.Once var _HlZigbeeConcentratorMgrSingle *HlZigbeeConcentratorMgr func GetHlZigbeeConcentratorMgr() *HlZigbeeConcentratorMgr { _HlZigbeeConcentratorMgrOnce.Do(func() { _HlZigbeeConcentratorMgrSingle = &HlZigbeeConcentratorMgr{ queue: util.NewQueue(10000), mapHlZigbeeConcentrator: make(map[string]*HlZigbeeConcentrator), } }) return _HlZigbeeConcentratorMgrSingle } type HlZigbeeConcentratorMgr struct { queue *util.MlQueue mapHlZigbeeConcentrator map[string]*HlZigbeeConcentrator } func (o *HlZigbeeConcentratorMgr) SubscribeTopics() { GetHlMqttMgr().Subscribe(GetHLTopicUp(), mqtt.AtMostOnce, o.HandlerData) } func (o *HlZigbeeConcentratorMgr) HandlerData(m mqtt.Message) { for { ok, cnt := o.queue.Put(&m) if ok { break } else { logrus.Errorf("HlZigbeeConcentratorMgr.HandlerData:查询队列失败,队列消息数量:%d", cnt) runtime.Gosched() } } } // Handler 解析mqtt消息,创建集控器并启用 func (o *HlZigbeeConcentratorMgr) Handler(args ...interface{}) interface{} { defer func() { if err := recover(); err != nil { gopool.Add(o.Handler, args) logrus.Errorf("HlZigbeeConcentratorMgr.Handler发生异常:%v", err) logrus.Errorf("HlZigbeeConcentratorMgr.Handler发生异常,堆栈信息:%s", string(debug.Stack())) } }() for { msg, ok, quantity := o.queue.Get() if !ok { time.Sleep(10 * time.Millisecond) continue } else if quantity > 1000 { logrus.Warnf("HlZigbeeConcentratorMgr.Handler:数据队列累积过多,请注意优化,当前队列条数:%d", quantity) } m, ok := msg.(*mqtt.Message) if !ok { continue } Tenant, t, err := ParseTopicHL(m.Topic()) if err != nil { continue } if t != GetHLTopicUp() { logrus.Debugf("Topic不对:%s", t) continue } if strings.Contains(m.PayloadString(), "\"masterSn\"") { //带有集控器字样 var hqls protocol.HLWLZB_Pack //decode后取出masterSn if err := hqls.DeCode(m.PayloadString()); err == nil { pzl, ok := o.mapHlZigbeeConcentrator[hqls.MasterSn] if !ok { pzl = NewHlZigbeeConcentrator(Tenant, hqls.MasterSn) pzl.Start() o.mapHlZigbeeConcentrator[hqls.MasterSn] = pzl } pzl.PutMessage(m) } else { logrus.Debugf("HlZigbeeConcentratorMgr.Handler:topic不符合要求2:%s", m.PayloadString()) } } else { logrus.Debugf("HlZigbeeConcentratorMgr.Handler:topic不符合要求:%s", m.PayloadString()) } } }