package main import ( "runtime" "runtime/debug" "sync" "time" "github.com/sirupsen/logrus" "lc/common/mqtt" "lc/common/protocol" "lc/common/util" ) var _ChzbConcentratorMgrOnce sync.Once var _ChzbConcentratorMgrSingle *ChzbConcentratorMgr func GetChzbConcentratorMgr() *ChzbConcentratorMgr { _ChzbConcentratorMgrOnce.Do(func() { _ChzbConcentratorMgrSingle = &ChzbConcentratorMgr{ queue: util.NewQueue(10000), mapChzbConcentrator: make(map[string]*ChZigbeeConcentrator), } }) return _ChzbConcentratorMgrSingle } type ChzbConcentratorMgr struct { queue *util.MlQueue mapChzbConcentrator map[string]*ChZigbeeConcentrator } func (o *ChzbConcentratorMgr) SubscribeTopics() { GetMQTTMgr().Subscribe(GetVagueTopic(protocol.DT_CONCENTRATOR, protocol.TP_CHZB_DATA), mqtt.AtMostOnce, o.HandlerData) GetMQTTMgr().Subscribe(GetVagueTopic(protocol.DT_CONCENTRATOR, protocol.TP_CHZB_SET_WAITTIME_ACK), mqtt.AtMostOnce, o.HandlerData) GetMQTTMgr().Subscribe(GetVagueTopic(protocol.DT_CONCENTRATOR, protocol.TP_CHZB_SET_SWITCH_ACK), mqtt.AtMostOnce, o.HandlerData) GetMQTTMgr().Subscribe(GetVagueTopic(protocol.DT_CONCENTRATOR, protocol.TP_CHZB_SET_RECOVERY_AUTO_ACK), mqtt.AtMostOnce, o.HandlerData) GetMQTTMgr().Subscribe(GetVagueTopic(protocol.DT_CONCENTRATOR, protocol.TP_CHZB_SET_ONOFFTIME_ACK), mqtt.AtMostOnce, o.HandlerData) GetMQTTMgr().Subscribe(GetVagueTopic(protocol.DT_CONCENTRATOR, protocol.TP_CHZB_QUERY_ONOFFTIME_ACK), mqtt.AtMostOnce, o.HandlerData) GetMQTTMgr().Subscribe(GetVagueTopic(protocol.DT_CONCENTRATOR, protocol.TP_CHZB_SET_UPDATE_LAMP_ACK), mqtt.AtMostOnce, o.HandlerData) GetMQTTMgr().Subscribe(GetVagueTopic(protocol.DT_CONCENTRATOR, protocol.TP_CHZB_QUERY_LAMP), mqtt.AtMostOnce, o.HandlerData) GetMQTTMgr().Subscribe(GetVagueTopic(protocol.DT_CONCENTRATOR, protocol.TP_CHZB_QUERY_TIME_ACK), mqtt.AtMostOnce, o.HandlerData) GetMQTTMgr().Subscribe(GetVagueTopic(protocol.DT_CONCENTRATOR, protocol.TP_CHZB_SET_BROADCASTTIME_ACK), mqtt.AtMostOnce, o.HandlerData) GetMQTTMgr().Subscribe(GetVagueTopic(protocol.DT_CONCENTRATOR, protocol.TP_CHZB_ALARM), mqtt.AtMostOnce, o.HandlerData) } func (o *ChzbConcentratorMgr) HandlerData(m mqtt.Message) { for { ok, cnt := o.queue.Put(&m) if ok { break } else { logrus.Errorf("ChzbConcentratorMgr.HandlerData:查询队列失败,队列消息数量:%d", cnt) runtime.Gosched() } } } func (o *ChzbConcentratorMgr) Handler(args ...interface{}) interface{} { defer func() { if err := recover(); err != nil { gopool.Add(o.Handler, args) logrus.Errorf("ChzbConcentratorMgr.Handler发生异常:%v", err) logrus.Errorf("ChzbConcentratorMgr.Handler发生异常,堆栈信息:%s", string(debug.Stack())) } }() for { msg, ok, quantity := o.queue.Get() if !ok { time.Sleep(10 * time.Millisecond) continue } else if quantity > 1000 { logrus.Warnf("ChzbConcentratorMgr.Handler:数据队列累积过多,请注意优化,当前队列条数:%d", quantity) } m, ok := msg.(*mqtt.Message) if !ok { continue } Tenant, _, DID, _, err := ParseTopic(m.Topic()) if err != nil { continue } pzl, ok := o.mapChzbConcentrator[DID] if !ok { pzl = NewChZigbeeConcentrator(Tenant, DID) pzl.Start() o.mapChzbConcentrator[DID] = pzl } pzl.PutMessage(m) } }