| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104 |
- package main
- import (
- "errors"
- "io/ioutil"
- "os"
- "path/filepath"
- "runtime"
- "runtime/debug"
- "strings"
- "sync"
- "time"
- "github.com/sirupsen/logrus"
- "lc/common/models"
- "lc/common/mqtt"
- "lc/common/protocol"
- "lc/common/util"
- )
- var _smartXHandlerOnce sync.Once
- var _smartXHandlerSingle *smartXHandler
- func GetSmartXHandler() *smartXHandler {
- _smartXHandlerOnce.Do(func() {
- _smartXHandlerSingle = &smartXHandler{
- queue: util.NewQueue(10000),
- }
- })
- return _smartXHandlerSingle
- }
- type smartXHandler struct {
- queue *util.MlQueue
- }
- func (o *smartXHandler) SubscribeTopics() {
- GetMQTTMgr().Subscribe("smart_intersection/led/#/state", mqtt.AtMostOnce, o.HandlerData)
- }
- func (o *smartXHandler) HandlerData(m mqtt.Message) {
- for {
- ok, cnt := o.queue.Put(&m)
- if ok {
- break
- } else {
- logrus.Errorf("smartXHandler.HandlerData:查询队列失败,队列消息数量:%d", cnt)
- runtime.Gosched()
- }
- }
- }
- //smart_intersection/led/"+s.info.SN+"/state"
- func parseTopic(topic string) string {
- strList := strings.Split(topic, "/")
- if len(strList) != 4 {
- return ""
- }
- return strList[2]
- }
- func (o *smartXHandler) Handler(args ...interface{}) interface{} {
- defer func() {
- if err := recover(); err != nil {
- gopool.Add(o.Handler, args)
- logrus.Errorf("smartXHandler.Handler:%v发生异常:%s", args, string(debug.Stack()))
- }
- }()
- for {
- msg, ok, quantity := o.queue.Get()
- if !ok {
- time.Sleep(10 * time.Millisecond)
- continue
- } else if quantity > 1000 {
- logrus.Warnf("数据队列累积过多,请注意优化,当前队列条数:%d", quantity)
- }
- m, ok := msg.(*mqtt.Message)
- if !ok {
- continue
- }
- sn := parseTopic(m.Topic())
- if sn != "" {
- continue
- }
- switch topic {
- case protocol.TP_smartX_ONLINE: //上线
- var obj protocol.Pack_IDObject
- if err := obj.DeCode(m.PayloadString()); err == nil { //网关在线
- cacheState(obj.Id, obj.Time, 0)
- GetEventMgr().PushEvent(&EventObject{ID: obj.Id, EventType: models.ET_ONLINE, Time: util.MlNow()})
- }
- case protocol.TP_smartX_WILL: //下线
- var obj protocol.Pack_IDObject
- if err := obj.DeCode(m.PayloadString()); err == nil { //网关离线
- cacheState(obj.Id, obj.Time, 1)
- GetEventMgr().PushEvent(&EventObject{ID: obj.Id, EventType: models.ET_OFFLINE, Time: util.MlNow()})
- }
- default:
- logrus.Warnf("smartXHandler.Handler:收到暂不支持的主题:%s", topic)
- }
- }
- }
|