package main import ( "github.com/sirupsen/logrus" "lc/common/models" "runtime" "runtime/debug" "sync" "time" "lc/common/mqtt" "lc/common/protocol" "lc/common/util" ) var _CltLedMgrOnce sync.Once var _CltLedMgrSingle *CltLedMgr func GetCltLedMgr() *CltLedMgr { _CltLedMgrOnce.Do(func() { _CltLedMgrSingle = &CltLedMgr{ queue: util.NewQueue(1000), mapCltledState: make(map[string]*StateInfo), } }) return _CltLedMgrSingle } type CltLedMgr struct { queue *util.MlQueue mapCltledState map[string]*StateInfo //0在线,1离线 } func (o *CltLedMgr) SubscribeTopics() { GetMQTTMgr().Subscribe(GetCltledTopic(protocol.TP_LED_DATA), mqtt.AtMostOnce, o.HandlerData) GetMQTTMgr().Subscribe(GetCltledTopic(protocol.TP_LED_STATE), mqtt.AtMostOnce, o.HandlerData) } func (o *CltLedMgr) HandlerData(m mqtt.Message) { for { ok, cnt := o.queue.Put(&m) if ok { break } else { logrus.Errorf("CltLedMgr.HandlerData:查询队列失败,队列消息数量:%d", cnt) runtime.Gosched() } } } func (o *CltLedMgr) Handler(args ...interface{}) interface{} { defer func() { if err := recover(); err != nil { gopool.Add(o.Handler, args) logrus.Errorf("CltLedMgr.Handler发生异常:%v", err) logrus.Errorf("CltLedMgr.Handler发生异常,堆栈信息:%s", string(debug.Stack())) } }() exit := false timer := time.NewTicker(1 * time.Minute) for { select { case <-timer.C: //每隔1分钟执行一次 //状态,防止无数据状态不更新 o.UpdateState() default: if o.handleQueue() == 0 { if exit { return 0 } time.Sleep(100 * time.Millisecond) } } } } func (o *CltLedMgr) handleQueue() uint32 { msg, ok, quantity := o.queue.Get() if !ok { return quantity } else if quantity > 1000 { logrus.Warnf("CltLedMgr.Handler:数据队列累积过多,请注意优化,当前队列条数:%d", quantity) } m, ok := msg.(*mqtt.Message) if !ok { return quantity } //tenant, _, sn, topic, _, err := ParseTopicCltLed(m.Topic()) //fmt.Printf("Topic=%v \ttenant = %v sn=%v topic=%v \n", m.Topic(), tenant, sn, topic) _, _, _, topic, _, err := ParseTopicCltLed(m.Topic()) if err != nil { return quantity } switch topic { case protocol.TP_LED_STATE: //在线状态 o.HandlerState(m) case protocol.TP_LED_DATA: //详情信息 o.HandlerLedData(m) } return quantity } func (o *CltLedMgr) UpdateState() { t := util.MlNow() for k, v := range o.mapCltledState { if v.State == 0 && t.Sub(v.Time).Minutes() > OfflineInterval { //只检查当前还在线的 GetEventMgr().PushEvent(&EventObject{ID: k, EventType: models.ET_OFFLINE, Time: t}) cacheState(k, t.Format("2006-01-02 15:04:05"), 1) o.mapCltledState[k].State = 1 o.mapCltledState[k].Time = t } } } func (o *CltLedMgr) HandlerState(m *mqtt.Message) { var obj protocol.Pack_LedState if err := obj.DeCode(m.PayloadString()); err != nil { logrus.Errorf("数据解析失败,主题:%s,内容:%s,失败原因:%s", m.Topic(), m.PayloadString(), err.Error()) return } //fmt.Printf("obj = %v \n", obj) t, err := util.MlParseTime(obj.Time) if err != nil { logrus.Errorf("时间[%s]解析错误:%s", obj.Time, err.Error()) return } //0在线,1离线 si, ok := o.mapCltledState[obj.Id] if !ok { t, s, err := getState(obj.Id) if err != nil { cacheState(obj.Id, obj.Time, obj.Data.State) o.mapCltledState[obj.Id] = &StateInfo{Time: t, State: obj.Data.State} return } else { si = &StateInfo{Time: t, State: s} o.mapCltledState[obj.Id] = si } } if si.State != obj.Data.State { if obj.Data.State == 1 { //在线到离线 GetEventMgr().PushEvent(&EventObject{ID: obj.Id, EventType: models.ET_OFFLINE, Time: t}) } else { //离线到在线 GetEventMgr().PushEvent(&EventObject{ID: obj.Id, EventType: models.ET_ONLINE, Time: t}) } } cacheState(obj.Id, obj.Time, obj.Data.State) o.mapCltledState[obj.Id].State = obj.Data.State o.mapCltledState[obj.Id].Time = t } func (o *CltLedMgr) HandlerLedData(m *mqtt.Message) { var obj protocol.Pack_LedCltledData if err := obj.DeCode(m.PayloadString()); err != nil { logrus.Errorf("数据解析失败,主题:%s,内容:%s,失败原因:%s", m.Topic(), m.PayloadString(), err.Error()) return } marshalToString, err := json.MarshalToString(obj.Data) if err != nil { logrus.Errorf("解析数据1出错%v", err.Error()) return } dataMap := make(map[string]interface{}) err = json.Unmarshal([]byte(marshalToString), &dataMap) if err != nil { logrus.Errorf("解析数2据出错%v", err.Error()) return } t, err := util.MlParseTime(obj.Time) if err != nil { logrus.Errorf("时间[%s]解析错误:%s", obj.Time, err.Error()) return } cacheLedData(obj.Id, t, dataMap) }