sosmgr.go 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189
  1. package main
  2. import (
  3. "runtime"
  4. "runtime/debug"
  5. "sync"
  6. "time"
  7. "github.com/sirupsen/logrus"
  8. "lc/common/models"
  9. "lc/common/mqtt"
  10. "lc/common/protocol"
  11. "lc/common/util"
  12. )
  13. // 一键报警管理
  14. var _SosMgrOnce sync.Once
  15. var _SosMgrSingle *SosMgr
  16. func GetSosMgr() *SosMgr {
  17. _SosMgrOnce.Do(func() {
  18. _SosMgrSingle = &SosMgr{
  19. queue: util.NewQueue(10000),
  20. mapSosAlarm: make(map[string]int64),
  21. mapSosState: make(map[string]*StateInfo),
  22. }
  23. })
  24. return _SosMgrSingle
  25. }
  26. type SosMgr struct {
  27. queue *util.MlQueue
  28. mapSosAlarm map[string]int64 //主题到数据库表记录id
  29. mapSosState map[string]*StateInfo ////0在线,1离线
  30. }
  31. func (o *SosMgr) SubscribeTopics() {
  32. GetMQTTMgr().Subscribe(GetVagueTopic(protocol.DT_SOS, protocol.TP_ONVIF_ALARM), mqtt.AtMostOnce, o.HandlerData)
  33. GetMQTTMgr().Subscribe(GetVagueTopic(protocol.DT_SOS, protocol.TP_ONVIF_STATE), mqtt.AtMostOnce, o.HandlerData)
  34. }
  35. func (o *SosMgr) Handler(args ...interface{}) interface{} {
  36. defer func() {
  37. if err := recover(); err != nil {
  38. gopool.Add(o.Handler, args)
  39. logrus.Errorf("SosMgr.Handler发生异常:%s", string(debug.Stack()))
  40. }
  41. }()
  42. timer := time.NewTicker(time.Duration(CheckOfflineInterval) * time.Minute)
  43. for {
  44. select {
  45. case <-timer.C: //每隔5分钟执行一次
  46. o.UpdateState()
  47. default:
  48. msg, ok, quantity := o.queue.Get()
  49. if !ok {
  50. time.Sleep(100 * time.Millisecond)
  51. continue
  52. } else if quantity > 1000 {
  53. logrus.Warnf("SosMgr.Handler:数据队列累积过多,请注意优化,当前队列条数:%d", quantity)
  54. }
  55. m, ok := msg.(*mqtt.Message)
  56. if !ok {
  57. continue
  58. }
  59. _, _, _, topic, err := ParseTopic(m.Topic())
  60. if err != nil {
  61. continue
  62. }
  63. switch topic {
  64. case protocol.TP_ONVIF_STATE:
  65. o.HandlerState(m)
  66. case protocol.TP_ONVIF_ALARM:
  67. o.HandlerAlarm(m)
  68. default:
  69. logrus.Warnf("SosMgr.Handler:收到暂不支持的主题:%s", topic)
  70. }
  71. }
  72. }
  73. }
  74. func (o *SosMgr) HandlerData(m mqtt.Message) {
  75. for {
  76. ok, cnt := o.queue.Put(&m)
  77. if ok {
  78. break
  79. } else {
  80. logrus.Errorf("SosMgr.HandlerData:查询队列失败,队列消息数量:%d", cnt)
  81. runtime.Gosched()
  82. }
  83. }
  84. }
  85. func (o *SosMgr) HandlerAlarm(m *mqtt.Message) {
  86. var obj protocol.Pack_OnvifAlarm
  87. if err := obj.DeCode(m.PayloadString()); err != nil {
  88. logrus.Errorf("数据解析失败,主题:%s,内容:%s,失败原因:%s", m.Topic(), m.PayloadString(), err.Error())
  89. return
  90. }
  91. t, err := util.MlParseTime(obj.Time)
  92. if err != nil {
  93. logrus.Errorf("时间[%s]解析错误:%s", obj.Time, err.Error())
  94. return
  95. }
  96. if obj.Data.Alarm.AlarmTopic == "tns1:Device/Trigger/tnshik:AlarmIn" { //海康威视一键报警
  97. state, ok2 := obj.Data.Alarm.Data["State"]
  98. if !ok2 {
  99. return
  100. }
  101. //更新数据库告警结束时间
  102. if state == "false" {
  103. if id, ok := o.mapSosAlarm[obj.Id+obj.Data.Alarm.AlarmTopic]; ok {
  104. oo := models.SosAlarm{ID: id, TEnd: t}
  105. if err := oo.Update(); err != nil {
  106. logrus.Errorf("一键告警数据:%v", obj)
  107. logrus.Errorf("一键告警数据更新失败:%s", err.Error())
  108. }
  109. delete(o.mapSosAlarm, obj.Id+obj.Data.Alarm.AlarmTopic)
  110. }
  111. } else { //新建告警记录
  112. oo := models.SosAlarm{
  113. DID: obj.Id,
  114. TStart: t,
  115. AType: "SOS",
  116. Content: "一键求助",
  117. }
  118. if err := models.G_db.Create(&oo).Error; err != nil {
  119. logrus.Errorf("一键告警数据:%v", obj)
  120. logrus.Errorf("一键告警数据入库失败:%s", err.Error())
  121. } else {
  122. o.mapSosAlarm[obj.Id+obj.Data.Alarm.AlarmTopic] = oo.ID
  123. }
  124. }
  125. }
  126. }
  127. func (o *SosMgr) HandlerState(m *mqtt.Message) {
  128. var obj protocol.Pack_IPCState
  129. if err := obj.DeCode(m.PayloadString()); err != nil {
  130. logrus.Errorf("数据解析失败,主题:%s,内容:%s,失败原因:%s", m.Topic(), m.PayloadString(), err.Error())
  131. return
  132. }
  133. t, err := util.MlParseTime(obj.Time)
  134. if err != nil {
  135. logrus.Errorf("时间[%s]解析错误:%s", obj.Time, err.Error())
  136. return
  137. }
  138. //0在线,1离线
  139. si, ok := o.mapSosState[obj.Id]
  140. if !ok {
  141. t, s, err := getState(obj.Id)
  142. if err != nil {
  143. cacheState(obj.Id, obj.Time, obj.Data.State)
  144. o.mapSosState[obj.Id] = &StateInfo{Time: t, State: obj.Data.State}
  145. return
  146. } else {
  147. si = &StateInfo{Time: t, State: s}
  148. o.mapSosState[obj.Id] = si
  149. }
  150. }
  151. if si.State != obj.Data.State {
  152. if obj.Data.State == 1 { //在线到离线
  153. GetEventMgr().PushEvent(&EventObject{ID: obj.Id, EventType: models.ET_OFFLINE, Time: t})
  154. } else { //离线到在线
  155. GetEventMgr().PushEvent(&EventObject{ID: obj.Id, EventType: models.ET_ONLINE, Time: t})
  156. }
  157. }
  158. cacheState(obj.Id, obj.Time, obj.Data.State)
  159. o.mapSosState[obj.Id].State = obj.Data.State
  160. o.mapSosState[obj.Id].Time = t
  161. }
  162. func (o *SosMgr) UpdateState() {
  163. t := util.MlNow()
  164. for k, v := range o.mapSosState {
  165. if v.State == 0 && t.Sub(v.Time).Minutes() > OfflineInterval { //只检查当前还在线的
  166. GetEventMgr().PushEvent(&EventObject{ID: k, EventType: models.ET_OFFLINE, Time: t})
  167. cacheState(k, t.Format("2006-01-02 15:04:05"), 1)
  168. o.mapSosState[k].State = 1
  169. o.mapSosState[k].Time = t
  170. }
  171. }
  172. }