itsmgr.go 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192
  1. package main
  2. import (
  3. "runtime"
  4. "runtime/debug"
  5. "sync"
  6. "time"
  7. "github.com/sirupsen/logrus"
  8. "lc/common/models"
  9. "lc/common/mqtt"
  10. "lc/common/protocol"
  11. "lc/common/util"
  12. )
  13. var _onceItsMgr sync.Once
  14. var _singleItsMgr *ItsMgr
  15. func GetItsMgr() *ItsMgr {
  16. _onceItsMgr.Do(func() {
  17. _singleItsMgr = &ItsMgr{
  18. queue: util.NewQueue(1000),
  19. mapItsState: make(map[string]*StateInfo),
  20. }
  21. })
  22. return _singleItsMgr
  23. }
  24. type ItsMgr struct {
  25. queue *util.MlQueue
  26. mapItsState map[string]*StateInfo //0在线,1离线
  27. }
  28. func (o *ItsMgr) SubscribeTopics() {
  29. GetMQTTMgr().Subscribe(GetVagueTopic(protocol.DT_ITS, protocol.TP_ITS_VEHICLESTATIC), mqtt.AtMostOnce, o.HandlerData)
  30. GetMQTTMgr().Subscribe(GetVagueTopic(protocol.DT_ITS, protocol.TP_ITSDEV_STATE), mqtt.AtMostOnce, o.HandlerData)
  31. }
  32. func (o *ItsMgr) HandlerData(m mqtt.Message) {
  33. for {
  34. ok, cnt := o.queue.Put(&m)
  35. if ok {
  36. break
  37. } else {
  38. logrus.Errorf("ItsMgr.HandlerData:查询队列失败,队列消息数量:%d", cnt)
  39. runtime.Gosched()
  40. }
  41. }
  42. }
  43. func (o *ItsMgr) HandlerTpItsVehicleStatic(m *mqtt.Message) {
  44. buf, err0 := util.GzipDecode(m.Payload())
  45. if err0 != nil {
  46. logrus.Errorf("GipDecode失败,主题:%s,内容:%s,失败原因:%s", m.Topic(), m.PayloadString(), err0.Error())
  47. return
  48. }
  49. var obj protocol.Pack_VehicleStatic
  50. if err := obj.DeCode(string(buf)); err != nil {
  51. logrus.Errorf("数据解析失败,主题:%s,内容:%s,失败原因:%s", m.Topic(), string(buf), err.Error())
  52. return
  53. }
  54. t, err := util.MlParseTime(obj.Time)
  55. if err != nil {
  56. logrus.Errorf("时间[%s]解析错误:%s", obj.Time, err.Error())
  57. return
  58. }
  59. logrus.Debugln(obj)
  60. //Vehicle Direction
  61. var DirectionList []models.ItsVehicleDirection
  62. for k, v := range obj.Data.MapDirection {
  63. oo := models.ItsVehicleDirection{DID: obj.Id, Time: t, Direction: int(k), Total: int(v)}
  64. DirectionList = append(DirectionList, oo)
  65. }
  66. err = models.MultiItsVehicleDirection(DirectionList)
  67. if err != nil {
  68. logrus.Errorf("方向数据存储失败,失败原因:%s", err.Error())
  69. }
  70. //Vehicle Province
  71. var ProvinceList []models.ItsVehicleProvince
  72. for k, v := range obj.Data.MapProvince {
  73. oo := models.ItsVehicleProvince{DID: obj.Id, Time: t, Province: k, Total: int(v)}
  74. ProvinceList = append(ProvinceList, oo)
  75. }
  76. err = models.MultiItsVehicleProvince(ProvinceList)
  77. if err != nil {
  78. logrus.Errorf("省份数据存储失败,失败原因:%s", err.Error())
  79. }
  80. //Province City
  81. var ProvinceCityList []models.ItsVehicleProvinceCity
  82. for k, v := range obj.Data.MapProvinceCity {
  83. oo := models.ItsVehicleProvinceCity{DID: obj.Id, Time: t, ProvinceCity: k, Total: int(v)}
  84. ProvinceCityList = append(ProvinceCityList, oo)
  85. }
  86. err = models.MultiItsVehicleProvinceCity(ProvinceCityList)
  87. if err != nil {
  88. logrus.Errorf("城市数据存储失败,失败原因:%s", err.Error())
  89. }
  90. //Vehicle Type
  91. var VehicleTypeList []models.ItsVehicleType
  92. for k, v := range obj.Data.MapVehicleType {
  93. oo := models.ItsVehicleType{DID: obj.Id, Time: t, Vtype: int(k), Total: int(v)}
  94. VehicleTypeList = append(VehicleTypeList, oo)
  95. }
  96. err = models.MultiItsVehicleType(VehicleTypeList)
  97. if err != nil {
  98. logrus.Errorf("车辆类型数据存储失败,失败原因:%s", err.Error())
  99. }
  100. var ItsVehicleSpeedList []models.ItsVehicleSpeed
  101. for _, v := range obj.Data.SliceVehicleSpeed {
  102. oo := models.ItsVehicleSpeed{DID: obj.Id, Plate: v.Plate, Time: time.Time(v.Time), Vtype: int(v.Type), Speed: v.Speed}
  103. ItsVehicleSpeedList = append(ItsVehicleSpeedList, oo)
  104. }
  105. err = models.MultiItsVehicleSpeed(ItsVehicleSpeedList)
  106. if err != nil {
  107. logrus.Errorf("车辆速度数据存储失败,失败原因:%s", err.Error())
  108. }
  109. }
  110. func (o *ItsMgr) Handler(args ...interface{}) interface{} {
  111. defer func() {
  112. if err := recover(); err != nil {
  113. gopool.Add(o.Handler, args)
  114. logrus.Errorf("ItsMgr.Handler发生异常:%v", err)
  115. logrus.Errorf("ItsMgr.Handler发生异常:%s", string(debug.Stack()))
  116. }
  117. }()
  118. for {
  119. msg, ok, _ := o.queue.Get()
  120. if !ok {
  121. time.Sleep(1 * time.Second)
  122. continue
  123. }
  124. m, ok := msg.(*mqtt.Message)
  125. if !ok {
  126. continue
  127. }
  128. _, _, _, topic, err := ParseTopic(m.Topic())
  129. if err != nil {
  130. continue
  131. }
  132. switch topic {
  133. case protocol.TP_ITS_VEHICLESTATIC:
  134. o.HandlerTpItsVehicleStatic(m)
  135. case protocol.TP_ITSDEV_STATE:
  136. o.HandlerState(m)
  137. default:
  138. logrus.Warnf("ItsMgr.Handler:收到暂不支持的主题:%s", topic)
  139. }
  140. }
  141. }
  142. func (o *ItsMgr) HandlerState(m *mqtt.Message) {
  143. var obj protocol.Pack_IPCState
  144. if err := obj.DeCode(m.PayloadString()); err != nil {
  145. logrus.Errorf("数据解析失败,主题:%s,内容:%s,失败原因:%s", m.Topic(), m.PayloadString(), err.Error())
  146. return
  147. }
  148. t, err := util.MlParseTime(obj.Time)
  149. if err != nil {
  150. logrus.Errorf("时间[%s]解析错误:%s", obj.Time, err.Error())
  151. return
  152. }
  153. //0在线,1离线
  154. si, ok := o.mapItsState[obj.Id]
  155. if !ok {
  156. t, s, err := getState(obj.Id)
  157. if err != nil {
  158. cacheState(obj.Id, obj.Time, obj.Data.State)
  159. o.mapItsState[obj.Id] = &StateInfo{Time: t, State: obj.Data.State}
  160. return
  161. } else {
  162. si = &StateInfo{Time: t, State: s}
  163. o.mapItsState[obj.Id] = si
  164. }
  165. }
  166. if si.State != obj.Data.State {
  167. if obj.Data.State == 1 { //在线到离线
  168. GetEventMgr().PushEvent(&EventObject{ID: obj.Id, EventType: models.ET_OFFLINE, Time: t})
  169. } else { //离线到在线
  170. GetEventMgr().PushEvent(&EventObject{ID: obj.Id, EventType: models.ET_ONLINE, Time: t})
  171. }
  172. }
  173. cacheState(obj.Id, obj.Time, obj.Data.State)
  174. o.mapItsState[obj.Id].State = obj.Data.State
  175. o.mapItsState[obj.Id].Time = t
  176. }