package main import ( "errors" "io/ioutil" "os" "path/filepath" "runtime" "runtime/debug" "strings" "sync" "time" "github.com/sirupsen/logrus" "lc/common/models" "lc/common/mqtt" "lc/common/protocol" "lc/common/util" ) var _smartXHandlerOnce sync.Once var _smartXHandlerSingle *smartXHandler func GetSmartXHandler() *smartXHandler { _smartXHandlerOnce.Do(func() { _smartXHandlerSingle = &smartXHandler{ queue: util.NewQueue(10000), } }) return _smartXHandlerSingle } type smartXHandler struct { queue *util.MlQueue } func (o *smartXHandler) SubscribeTopics() { GetMQTTMgr().Subscribe("smart_intersection/led/#/state", mqtt.AtMostOnce, o.HandlerData) } func (o *smartXHandler) HandlerData(m mqtt.Message) { for { ok, cnt := o.queue.Put(&m) if ok { break } else { logrus.Errorf("smartXHandler.HandlerData:查询队列失败,队列消息数量:%d", cnt) runtime.Gosched() } } } //smart_intersection/led/"+s.info.SN+"/state" func parseTopic(topic string) string { strList := strings.Split(topic, "/") if len(strList) != 4 { return "" } return strList[2] } func (o *smartXHandler) Handler(args ...interface{}) interface{} { defer func() { if err := recover(); err != nil { gopool.Add(o.Handler, args) logrus.Errorf("smartXHandler.Handler:%v发生异常:%s", args, string(debug.Stack())) } }() for { msg, ok, quantity := o.queue.Get() if !ok { time.Sleep(10 * time.Millisecond) continue } else if quantity > 1000 { logrus.Warnf("数据队列累积过多,请注意优化,当前队列条数:%d", quantity) } m, ok := msg.(*mqtt.Message) if !ok { continue } sn := parseTopic(m.Topic()) if sn != "" { continue } switch topic { case protocol.TP_smartX_ONLINE: //上线 var obj protocol.Pack_IDObject if err := obj.DeCode(m.PayloadString()); err == nil { //网关在线 cacheState(obj.Id, obj.Time, 0) GetEventMgr().PushEvent(&EventObject{ID: obj.Id, EventType: models.ET_ONLINE, Time: util.MlNow()}) } case protocol.TP_smartX_WILL: //下线 var obj protocol.Pack_IDObject if err := obj.DeCode(m.PayloadString()); err == nil { //网关离线 cacheState(obj.Id, obj.Time, 1) GetEventMgr().PushEvent(&EventObject{ID: obj.Id, EventType: models.ET_OFFLINE, Time: util.MlNow()}) } default: logrus.Warnf("smartXHandler.Handler:收到暂不支持的主题:%s", topic) } } }