package main import ( "runtime" "runtime/debug" "sync" "time" "github.com/sirupsen/logrus" "lc/common/models" "lc/common/mqtt" "lc/common/protocol" "lc/common/util" ) var _onceItsMgr sync.Once var _singleItsMgr *ItsMgr func GetItsMgr() *ItsMgr { _onceItsMgr.Do(func() { _singleItsMgr = &ItsMgr{ queue: util.NewQueue(1000), mapItsState: make(map[string]*StateInfo), } }) return _singleItsMgr } type ItsMgr struct { queue *util.MlQueue mapItsState map[string]*StateInfo //0在线,1离线 } func (o *ItsMgr) SubscribeTopics() { GetMQTTMgr().Subscribe(GetVagueTopic(protocol.DT_ITS, protocol.TP_ITS_VEHICLESTATIC), mqtt.AtMostOnce, o.HandlerData) GetMQTTMgr().Subscribe(GetVagueTopic(protocol.DT_ITS, protocol.TP_ITSDEV_STATE), mqtt.AtMostOnce, o.HandlerData) } func (o *ItsMgr) HandlerData(m mqtt.Message) { for { ok, cnt := o.queue.Put(&m) if ok { break } else { logrus.Errorf("ItsMgr.HandlerData:查询队列失败,队列消息数量:%d", cnt) runtime.Gosched() } } } func (o *ItsMgr) HandlerTpItsVehicleStatic(m *mqtt.Message) { buf, err0 := util.GzipDecode(m.Payload()) if err0 != nil { logrus.Errorf("GipDecode失败,主题:%s,内容:%s,失败原因:%s", m.Topic(), m.PayloadString(), err0.Error()) return } var obj protocol.Pack_VehicleStatic if err := obj.DeCode(string(buf)); err != nil { logrus.Errorf("数据解析失败,主题:%s,内容:%s,失败原因:%s", m.Topic(), string(buf), err.Error()) return } t, err := util.MlParseTime(obj.Time) if err != nil { logrus.Errorf("时间[%s]解析错误:%s", obj.Time, err.Error()) return } logrus.Debugln(obj) //Vehicle Direction var DirectionList []models.ItsVehicleDirection for k, v := range obj.Data.MapDirection { oo := models.ItsVehicleDirection{DID: obj.Id, Time: t, Direction: int(k), Total: int(v)} DirectionList = append(DirectionList, oo) } err = models.MultiItsVehicleDirection(DirectionList) if err != nil { logrus.Errorf("方向数据存储失败,失败原因:%s", err.Error()) } //Vehicle Province var ProvinceList []models.ItsVehicleProvince for k, v := range obj.Data.MapProvince { oo := models.ItsVehicleProvince{DID: obj.Id, Time: t, Province: k, Total: int(v)} ProvinceList = append(ProvinceList, oo) } err = models.MultiItsVehicleProvince(ProvinceList) if err != nil { logrus.Errorf("省份数据存储失败,失败原因:%s", err.Error()) } //Province City var ProvinceCityList []models.ItsVehicleProvinceCity for k, v := range obj.Data.MapProvinceCity { oo := models.ItsVehicleProvinceCity{DID: obj.Id, Time: t, ProvinceCity: k, Total: int(v)} ProvinceCityList = append(ProvinceCityList, oo) } err = models.MultiItsVehicleProvinceCity(ProvinceCityList) if err != nil { logrus.Errorf("城市数据存储失败,失败原因:%s", err.Error()) } //Vehicle Type var VehicleTypeList []models.ItsVehicleType for k, v := range obj.Data.MapVehicleType { oo := models.ItsVehicleType{DID: obj.Id, Time: t, Vtype: int(k), Total: int(v)} VehicleTypeList = append(VehicleTypeList, oo) } err = models.MultiItsVehicleType(VehicleTypeList) if err != nil { logrus.Errorf("车辆类型数据存储失败,失败原因:%s", err.Error()) } var ItsVehicleSpeedList []models.ItsVehicleSpeed for _, v := range obj.Data.SliceVehicleSpeed { oo := models.ItsVehicleSpeed{DID: obj.Id, Plate: v.Plate, Time: time.Time(v.Time), Vtype: int(v.Type), Speed: v.Speed} ItsVehicleSpeedList = append(ItsVehicleSpeedList, oo) } err = models.MultiItsVehicleSpeed(ItsVehicleSpeedList) if err != nil { logrus.Errorf("车辆速度数据存储失败,失败原因:%s", err.Error()) } } func (o *ItsMgr) Handler(args ...interface{}) interface{} { defer func() { if err := recover(); err != nil { gopool.Add(o.Handler, args) logrus.Errorf("ItsMgr.Handler发生异常:%v", err) logrus.Errorf("ItsMgr.Handler发生异常:%s", string(debug.Stack())) } }() for { msg, ok, _ := o.queue.Get() if !ok { time.Sleep(1 * time.Second) continue } m, ok := msg.(*mqtt.Message) if !ok { continue } _, _, _, topic, err := ParseTopic(m.Topic()) if err != nil { continue } switch topic { case protocol.TP_ITS_VEHICLESTATIC: o.HandlerTpItsVehicleStatic(m) case protocol.TP_ITSDEV_STATE: o.HandlerState(m) default: logrus.Warnf("ItsMgr.Handler:收到暂不支持的主题:%s", topic) } } } func (o *ItsMgr) HandlerState(m *mqtt.Message) { var obj protocol.Pack_IPCState if err := obj.DeCode(m.PayloadString()); err != nil { logrus.Errorf("数据解析失败,主题:%s,内容:%s,失败原因:%s", m.Topic(), m.PayloadString(), err.Error()) return } t, err := util.MlParseTime(obj.Time) if err != nil { logrus.Errorf("时间[%s]解析错误:%s", obj.Time, err.Error()) return } //0在线,1离线 si, ok := o.mapItsState[obj.Id] if !ok { t, s, err := getState(obj.Id) if err != nil { cacheState(obj.Id, obj.Time, obj.Data.State) o.mapItsState[obj.Id] = &StateInfo{Time: t, State: obj.Data.State} return } else { si = &StateInfo{Time: t, State: s} o.mapItsState[obj.Id] = si } } if si.State != obj.Data.State { if obj.Data.State == 1 { //在线到离线 GetEventMgr().PushEvent(&EventObject{ID: obj.Id, EventType: models.ET_OFFLINE, Time: t}) } else { //离线到在线 GetEventMgr().PushEvent(&EventObject{ID: obj.Id, EventType: models.ET_ONLINE, Time: t}) } } cacheState(obj.Id, obj.Time, obj.Data.State) o.mapItsState[obj.Id].State = obj.Data.State o.mapItsState[obj.Id].Time = t }