package main import ( "runtime" "runtime/debug" "strings" "sync" "time" "github.com/sirupsen/logrus" "lc/common/models" "lc/common/mqtt" "lc/common/protocol" "lc/common/util" ) var _radarHandlerOnce sync.Once var _radarHandlerSingle *radarHandler func GetRadarHandler() *radarHandler { _radarHandlerOnce.Do(func() { _radarHandlerSingle = &radarHandler{ queue: util.NewQueue(10000), } }) return _radarHandlerSingle } type radarHandler struct { queue *util.MlQueue } func (o *radarHandler) SubscribeTopics() { GetMQTTMgr().Subscribe(GetVagueTopic(protocol.DT_Radar, protocol.TP_RADAR_DATA), mqtt.AtMostOnce, o.HandlerData) } func (o *radarHandler) HandlerData(m mqtt.Message) { for { ok, cnt := o.queue.Put(&m) if ok { break } else { logrus.Errorf("radarHandler.HandlerData:查询队列失败,队列消息数量:%d", cnt) runtime.Gosched() } } } func (o *radarHandler) Handler(args ...interface{}) interface{} { defer func() { if err := recover(); err != nil { gopool.Add(o.Handler, args) logrus.Errorf("radarHandler.Handler:%v发生异常:%s", args, string(debug.Stack())) } }() for { msg, ok, quantity := o.queue.Get() if !ok { time.Sleep(10 * time.Millisecond) continue } else if quantity > 1000 { logrus.Warnf("数据队列累积过多,请注意优化,当前队列条数:%d", quantity) } m, ok := msg.(*mqtt.Message) if !ok { continue } _, _, GID, topic, err := ParseTopic(m.Topic()) if err != nil { continue } switch topic { case protocol.TP_RADAR_DATA: var ret protocol.Pack_UploadData if err := ret.DeCode(m.PayloadString()); err == nil { if ret.Data.State == protocol.FAILED || len(ret.Data.Data) != 5 || ret.Data.Data[3] < 0 { logrus.Warningf("雷达数据不正确 %+v", ret) continue } t, _ := util.MlParseTime(ret.Time) ipPort := strings.Split(ret.Id, ":") o := models.RadarData{ GID: GID, DID: ret.Id, Time: t, UID: uint16(ret.Data.Data[0]), RadarID: uint64(ret.Data.Data[1]), Type: uint8(ret.Data.Data[2]), Speed: ret.Data.Data[3], Dist: ret.Data.Data[4], } if len(ipPort) == 2 { o.IP = ipPort[0] o.Port = ipPort[1] } if err := models.G_db.Save(&o).Error; err != nil { logrus.Errorf("插入数据库失败:%s", err.Error()) } logrus.Debugf("雷达数据插入成功, 数据: %+v", ret) } default: logrus.Warnf("radarHandler.Handler:收到暂不支持的主题:%s", topic) } } }