| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192 |
- package main
- import (
- "runtime"
- "runtime/debug"
- "sync"
- "time"
- "github.com/sirupsen/logrus"
- "lc/common/models"
- "lc/common/mqtt"
- "lc/common/protocol"
- "lc/common/util"
- )
- var _onceItsMgr sync.Once
- var _singleItsMgr *ItsMgr
- func GetItsMgr() *ItsMgr {
- _onceItsMgr.Do(func() {
- _singleItsMgr = &ItsMgr{
- queue: util.NewQueue(1000),
- mapItsState: make(map[string]*StateInfo),
- }
- })
- return _singleItsMgr
- }
- type ItsMgr struct {
- queue *util.MlQueue
- mapItsState map[string]*StateInfo //0在线,1离线
- }
- func (o *ItsMgr) SubscribeTopics() {
- GetMQTTMgr().Subscribe(GetVagueTopic(protocol.DT_ITS, protocol.TP_ITS_VEHICLESTATIC), mqtt.AtMostOnce, o.HandlerData)
- GetMQTTMgr().Subscribe(GetVagueTopic(protocol.DT_ITS, protocol.TP_ITSDEV_STATE), mqtt.AtMostOnce, o.HandlerData)
- }
- func (o *ItsMgr) HandlerData(m mqtt.Message) {
- for {
- ok, cnt := o.queue.Put(&m)
- if ok {
- break
- } else {
- logrus.Errorf("ItsMgr.HandlerData:查询队列失败,队列消息数量:%d", cnt)
- runtime.Gosched()
- }
- }
- }
- func (o *ItsMgr) HandlerTpItsVehicleStatic(m *mqtt.Message) {
- buf, err0 := util.GzipDecode(m.Payload())
- if err0 != nil {
- logrus.Errorf("GipDecode失败,主题:%s,内容:%s,失败原因:%s", m.Topic(), m.PayloadString(), err0.Error())
- return
- }
- var obj protocol.Pack_VehicleStatic
- if err := obj.DeCode(string(buf)); err != nil {
- logrus.Errorf("数据解析失败,主题:%s,内容:%s,失败原因:%s", m.Topic(), string(buf), err.Error())
- return
- }
- t, err := util.MlParseTime(obj.Time)
- if err != nil {
- logrus.Errorf("时间[%s]解析错误:%s", obj.Time, err.Error())
- return
- }
- logrus.Debugln(obj)
- //Vehicle Direction
- var DirectionList []models.ItsVehicleDirection
- for k, v := range obj.Data.MapDirection {
- oo := models.ItsVehicleDirection{DID: obj.Id, Time: t, Direction: int(k), Total: int(v)}
- DirectionList = append(DirectionList, oo)
- }
- err = models.MultiItsVehicleDirection(DirectionList)
- if err != nil {
- logrus.Errorf("方向数据存储失败,失败原因:%s", err.Error())
- }
- //Vehicle Province
- var ProvinceList []models.ItsVehicleProvince
- for k, v := range obj.Data.MapProvince {
- oo := models.ItsVehicleProvince{DID: obj.Id, Time: t, Province: k, Total: int(v)}
- ProvinceList = append(ProvinceList, oo)
- }
- err = models.MultiItsVehicleProvince(ProvinceList)
- if err != nil {
- logrus.Errorf("省份数据存储失败,失败原因:%s", err.Error())
- }
- //Province City
- var ProvinceCityList []models.ItsVehicleProvinceCity
- for k, v := range obj.Data.MapProvinceCity {
- oo := models.ItsVehicleProvinceCity{DID: obj.Id, Time: t, ProvinceCity: k, Total: int(v)}
- ProvinceCityList = append(ProvinceCityList, oo)
- }
- err = models.MultiItsVehicleProvinceCity(ProvinceCityList)
- if err != nil {
- logrus.Errorf("城市数据存储失败,失败原因:%s", err.Error())
- }
- //Vehicle Type
- var VehicleTypeList []models.ItsVehicleType
- for k, v := range obj.Data.MapVehicleType {
- oo := models.ItsVehicleType{DID: obj.Id, Time: t, Vtype: int(k), Total: int(v)}
- VehicleTypeList = append(VehicleTypeList, oo)
- }
- err = models.MultiItsVehicleType(VehicleTypeList)
- if err != nil {
- logrus.Errorf("车辆类型数据存储失败,失败原因:%s", err.Error())
- }
- var ItsVehicleSpeedList []models.ItsVehicleSpeed
- for _, v := range obj.Data.SliceVehicleSpeed {
- oo := models.ItsVehicleSpeed{DID: obj.Id, Plate: v.Plate, Time: time.Time(v.Time), Vtype: int(v.Type), Speed: v.Speed}
- ItsVehicleSpeedList = append(ItsVehicleSpeedList, oo)
- }
- err = models.MultiItsVehicleSpeed(ItsVehicleSpeedList)
- if err != nil {
- logrus.Errorf("车辆速度数据存储失败,失败原因:%s", err.Error())
- }
- }
- func (o *ItsMgr) Handler(args ...interface{}) interface{} {
- defer func() {
- if err := recover(); err != nil {
- gopool.Add(o.Handler, args)
- logrus.Errorf("ItsMgr.Handler发生异常:%v", err)
- logrus.Errorf("ItsMgr.Handler发生异常:%s", string(debug.Stack()))
- }
- }()
- for {
- msg, ok, _ := o.queue.Get()
- if !ok {
- time.Sleep(1 * time.Second)
- continue
- }
- m, ok := msg.(*mqtt.Message)
- if !ok {
- continue
- }
- _, _, _, topic, err := ParseTopic(m.Topic())
- if err != nil {
- continue
- }
- switch topic {
- case protocol.TP_ITS_VEHICLESTATIC:
- o.HandlerTpItsVehicleStatic(m)
- case protocol.TP_ITSDEV_STATE:
- o.HandlerState(m)
- default:
- logrus.Warnf("ItsMgr.Handler:收到暂不支持的主题:%s", topic)
- }
- }
- }
- func (o *ItsMgr) HandlerState(m *mqtt.Message) {
- var obj protocol.Pack_IPCState
- if err := obj.DeCode(m.PayloadString()); err != nil {
- logrus.Errorf("数据解析失败,主题:%s,内容:%s,失败原因:%s", m.Topic(), m.PayloadString(), err.Error())
- return
- }
- t, err := util.MlParseTime(obj.Time)
- if err != nil {
- logrus.Errorf("时间[%s]解析错误:%s", obj.Time, err.Error())
- return
- }
- //0在线,1离线
- si, ok := o.mapItsState[obj.Id]
- if !ok {
- t, s, err := getState(obj.Id)
- if err != nil {
- cacheState(obj.Id, obj.Time, obj.Data.State)
- o.mapItsState[obj.Id] = &StateInfo{Time: t, State: obj.Data.State}
- return
- } else {
- si = &StateInfo{Time: t, State: s}
- o.mapItsState[obj.Id] = si
- }
- }
- if si.State != obj.Data.State {
- if obj.Data.State == 1 { //在线到离线
- GetEventMgr().PushEvent(&EventObject{ID: obj.Id, EventType: models.ET_OFFLINE, Time: t})
- } else { //离线到在线
- GetEventMgr().PushEvent(&EventObject{ID: obj.Id, EventType: models.ET_ONLINE, Time: t})
- }
- }
- cacheState(obj.Id, obj.Time, obj.Data.State)
- o.mapItsState[obj.Id].State = obj.Data.State
- o.mapItsState[obj.Id].Time = t
- }
|