ipcmgr.go 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246
  1. package main
  2. import (
  3. "runtime"
  4. "runtime/debug"
  5. "sync"
  6. "time"
  7. "github.com/sirupsen/logrus"
  8. "lc/common/models"
  9. "lc/common/mqtt"
  10. "lc/common/protocol"
  11. "lc/common/util"
  12. )
  13. var _IpcMgrOnce sync.Once
  14. var _IpcMgrSingle *IPCMgr
  15. func GetIPCMgr() *IPCMgr {
  16. _IpcMgrOnce.Do(func() {
  17. _IpcMgrSingle = &IPCMgr{
  18. queue: util.NewQueue(10000),
  19. //mapSosAlarm: make(map[string]int64),
  20. mapIpcState: make(map[string]*StateInfo),
  21. }
  22. })
  23. return _IpcMgrSingle
  24. }
  25. type StateInfo struct {
  26. Time time.Time
  27. State uint8
  28. }
  29. type IPCMgr struct {
  30. queue *util.MlQueue
  31. //mapSosAlarm map[string]int64 //主题到数据库表记录id
  32. mapIpcState map[string]*StateInfo ////0在线,1离线
  33. }
  34. func (o *IPCMgr) SubscribeTopics() {
  35. GetMQTTMgr().Subscribe(GetVagueTopic(protocol.DT_IPC, protocol.TP_ONVIF_ALARM), mqtt.AtMostOnce, o.HandlerData)
  36. GetMQTTMgr().Subscribe(GetVagueTopic(protocol.DT_IPC, protocol.TP_ONVIF_STATE), mqtt.AtMostOnce, o.HandlerData)
  37. GetMQTTMgr().Subscribe(GetVagueTopic(protocol.DT_IPC, protocol.TP_ONVIF_PRESETS_ACK), mqtt.AtMostOnce, o.HandlerData)
  38. GetMQTTMgr().Subscribe(GetVagueTopic(protocol.DT_IPC, protocol.TP_ONVIF_PRESET_ACK), mqtt.AtMostOnce, o.HandlerData)
  39. }
  40. func (o *IPCMgr) Handler(args ...interface{}) interface{} {
  41. defer func() {
  42. if err := recover(); err != nil {
  43. gopool.Add(o.Handler, args)
  44. logrus.Errorf("IPCMgr.Handler发生异常:%v", err)
  45. logrus.Errorf("IPCMgr.Handler发生异常:%s", string(debug.Stack()))
  46. }
  47. }()
  48. timer := time.NewTicker(time.Duration(CheckOfflineInterval) * time.Minute)
  49. for {
  50. select {
  51. case <-timer.C: //每隔5分钟执行一次
  52. o.UpdateState()
  53. default:
  54. msg, ok, quantity := o.queue.Get()
  55. if !ok {
  56. time.Sleep(100 * time.Millisecond)
  57. continue
  58. } else if quantity > 1000 {
  59. logrus.Warnf("IPCMgr.Handler:数据队列累积过多,请注意优化,当前队列条数:%d", quantity)
  60. }
  61. m, ok := msg.(*mqtt.Message)
  62. if !ok {
  63. continue
  64. }
  65. _, _, _, topic, err := ParseTopic(m.Topic())
  66. if err != nil {
  67. continue
  68. }
  69. switch topic {
  70. case protocol.TP_ONVIF_STATE:
  71. o.HandlerState(m)
  72. case protocol.TP_ONVIF_ALARM:
  73. o.HandlerAlarm(m)
  74. case protocol.TP_ONVIF_PRESETS_ACK:
  75. o.HandlerGetPreset(m)
  76. case protocol.TP_ONVIF_PRESET_ACK:
  77. o.HandlerSetPreset(m)
  78. default:
  79. logrus.Warnf("IPCMgr.Handler:收到暂不支持的主题:%s", topic)
  80. }
  81. }
  82. }
  83. }
  84. func (o *IPCMgr) HandlerData(m mqtt.Message) {
  85. for {
  86. ok, cnt := o.queue.Put(&m)
  87. if ok {
  88. break
  89. } else {
  90. logrus.Errorf("IPCMgr.HandlerData:查询队列失败,队列消息数量:%d", cnt)
  91. runtime.Gosched()
  92. }
  93. }
  94. }
  95. func (o *IPCMgr) HandlerAlarm(m *mqtt.Message) {
  96. var obj protocol.Pack_OnvifAlarm
  97. if err := obj.DeCode(m.PayloadString()); err != nil {
  98. logrus.Errorf("数据解析失败,主题:%s,内容:%s,失败原因:%s", m.Topic(), m.PayloadString(), err.Error())
  99. return
  100. }
  101. t, err := util.MlParseTime(obj.Time)
  102. if err != nil {
  103. logrus.Errorf("时间[%s]解析错误:%s", obj.Time, err.Error())
  104. return
  105. }
  106. if obj.Data.Alarm.AlarmTopic == "tns1:RuleEngine/LineDetector/Crossed" {
  107. oo := models.IPCAlarm{
  108. DID: obj.Id,
  109. TStart: t,
  110. TEnd: t,
  111. AType: "Crossed",
  112. Content: "遮挡告警",
  113. }
  114. if err := models.G_db.Create(&oo).Error; err != nil {
  115. logrus.Errorf("遮挡告警数据:%v", obj)
  116. logrus.Errorf("遮挡告警数据入库失败:%s", err.Error())
  117. }
  118. }
  119. }
  120. func (o *IPCMgr) HandlerState(m *mqtt.Message) {
  121. var obj protocol.Pack_IPCState
  122. if err := obj.DeCode(m.PayloadString()); err != nil {
  123. logrus.Errorf("数据解析失败,主题:%s,内容:%s,失败原因:%s", m.Topic(), m.PayloadString(), err.Error())
  124. return
  125. }
  126. t, err := util.MlParseTime(obj.Time)
  127. if err != nil {
  128. logrus.Errorf("时间[%s]解析错误:%s", obj.Time, err.Error())
  129. return
  130. }
  131. //0在线,1离线
  132. si, ok := o.mapIpcState[obj.Id]
  133. if !ok {
  134. t, s, err := getState(obj.Id)
  135. if err != nil {
  136. cacheState(obj.Id, obj.Time, obj.Data.State)
  137. o.mapIpcState[obj.Id] = &StateInfo{Time: t, State: obj.Data.State}
  138. return
  139. } else {
  140. si = &StateInfo{Time: t, State: s}
  141. o.mapIpcState[obj.Id] = si
  142. }
  143. }
  144. if si.State != obj.Data.State {
  145. if obj.Data.State == 1 { //在线到离线
  146. GetEventMgr().PushEvent(&EventObject{ID: obj.Id, EventType: models.ET_OFFLINE, Time: t})
  147. } else { //离线到在线
  148. GetEventMgr().PushEvent(&EventObject{ID: obj.Id, EventType: models.ET_ONLINE, Time: t})
  149. }
  150. }
  151. cacheState(obj.Id, obj.Time, obj.Data.State)
  152. o.mapIpcState[obj.Id].State = obj.Data.State
  153. o.mapIpcState[obj.Id].Time = t
  154. }
  155. func (o *IPCMgr) UpdateState() {
  156. t := util.MlNow()
  157. for k, v := range o.mapIpcState {
  158. if v.State == 0 && t.Sub(v.Time).Minutes() > OfflineInterval { //只检查当前还在线的
  159. GetEventMgr().PushEvent(&EventObject{ID: k, EventType: models.ET_OFFLINE, Time: t})
  160. cacheState(k, t.Format("2006-01-02 15:04:05"), 1)
  161. o.mapIpcState[k].State = 1
  162. o.mapIpcState[k].Time = t
  163. }
  164. }
  165. }
  166. func (o *IPCMgr) HandlerGetPreset(m *mqtt.Message) {
  167. var obj protocol.Pack_PresetInfo
  168. if err := obj.DeCode(m.PayloadString()); err != nil {
  169. logrus.Errorf("数据解析失败,主题:%s,内容:%s,失败原因:%s", m.Topic(), m.PayloadString(), err.Error())
  170. return
  171. }
  172. if obj.Data.State == protocol.FAILED {
  173. logrus.Errorf("读取设备[%s]预置点失败:%s", obj.Id, obj.Data.Error)
  174. return
  175. }
  176. //所有预置点不存库,用于调试
  177. if obj.Data.Flag == 6 {
  178. logrus.Debugf("设备[%s]预置点:%v", obj.Id, obj.Data.Presets)
  179. return
  180. }
  181. if len(obj.Data.Presets) == 0 {
  182. return
  183. }
  184. t := util.MlNow()
  185. for _, v := range obj.Data.Presets {
  186. ps := models.IpcPresets{
  187. ID: obj.Id,
  188. Token: v.Token,
  189. Name: v.Name,
  190. X: v.X,
  191. Y: v.Y,
  192. Z: v.Z,
  193. CreatedAt: t,
  194. }
  195. if err := ps.SaveFromGateway(); err != nil {
  196. logrus.Errorf("保存设置预置位信息失败:%s", err.Error())
  197. logrus.Debugf("预置位信息:%v", ps)
  198. }
  199. }
  200. }
  201. func (o *IPCMgr) HandlerSetPreset(m *mqtt.Message) {
  202. var obj protocol.Pack_IPCSetPresetACK
  203. if err := obj.DeCode(m.PayloadString()); err != nil {
  204. logrus.Errorf("数据解析失败,主题:%s,内容:%s,失败原因:%s", m.Topic(), m.PayloadString(), err.Error())
  205. return
  206. }
  207. if obj.Data.State == protocol.FAILED {
  208. logrus.Errorf("对设备[%s]设置预置点失败:%s", obj.Id, obj.Data.Error)
  209. return
  210. }
  211. ps := models.IpcPresets{
  212. ID: obj.Id,
  213. Token: obj.Data.Token,
  214. Name: obj.Data.Name,
  215. X: obj.Data.X,
  216. Y: obj.Data.Y,
  217. Z: obj.Data.Z,
  218. File: obj.Data.File,
  219. }
  220. if err := ps.SaveFromGateway2(); err != nil {
  221. logrus.Errorf("保存设置预置位信息失败:%s", err.Error())
  222. logrus.Debugf("预置位信息:%v", ps)
  223. }
  224. }