eventmgr.go 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147
  1. package controllers
  2. import (
  3. "fmt"
  4. "runtime/debug"
  5. "strconv"
  6. "sync"
  7. "time"
  8. "github.com/astaxie/beego"
  9. "lc/common/models"
  10. "lc/common/util"
  11. )
  12. // DeviceAlarmId 告警处理
  13. var DeviceAlarmId = "device_alarm_id"
  14. var _EventMgrOnce sync.Once
  15. var _EventMgrSingle *EventMgr
  16. func GetEventMgr() *EventMgr {
  17. _EventMgrOnce.Do(func() {
  18. _EventMgrSingle = &EventMgr{
  19. MapAlarm: make(map[string]int64),
  20. MapLastData: make(map[string]time.Time),
  21. }
  22. })
  23. return _EventMgrSingle
  24. }
  25. type EventObject struct {
  26. ID string
  27. Etype models.EventType
  28. Time time.Time
  29. Value float64
  30. }
  31. type EventMgr struct {
  32. mu sync.Mutex
  33. MapLastData map[string]time.Time
  34. MapAlarm map[string]int64
  35. }
  36. func (o *EventMgr) Update(code string) {
  37. if code != "" {
  38. o.mu.Lock()
  39. o.MapLastData[code] = util.MlNow()
  40. o.mu.Unlock()
  41. }
  42. }
  43. func (o *EventMgr) Handler() {
  44. defer func() {
  45. if err := recover(); err != nil {
  46. go o.Handler()
  47. beego.Error(fmt.Sprintf("EventMgr.Handler发生异常:%v", err))
  48. beego.Error("EventMgr.Handler发生异常:%s", string(debug.Stack()))
  49. }
  50. }()
  51. timer := time.NewTicker(1 * time.Minute)
  52. _mapLastData := make(map[string]time.Time)
  53. _mapRedis := make(map[string]bool)
  54. for {
  55. select {
  56. case <-timer.C: //每隔5分钟执行一次
  57. o.mu.Lock()
  58. for k, v := range o.MapLastData {
  59. _mapLastData[k] = v
  60. }
  61. o.mu.Unlock()
  62. for k, v := range _mapLastData {
  63. if util.MlNow().Sub(v).Minutes() > 5 { //离线
  64. aid, ok := o.MapAlarm[k]
  65. if !ok {
  66. //试图从redis查找,看是不是已有离线告警,有则忽略,无则处理,新建告警
  67. if _, ok := _mapRedis[k]; !ok {
  68. if str, err := redisCltRawdata.HGet(DeviceAlarmId, k).Result(); err == nil {
  69. if id, err := strconv.Atoi(str); err == nil {
  70. aid = int64(id)
  71. }
  72. _mapRedis[k] = true
  73. }
  74. }
  75. if aid > 0 {
  76. continue
  77. }
  78. oo := models.DeviceAlarm{
  79. DID: k,
  80. TStart: v,
  81. Threshold: 0,
  82. SValue: 0,
  83. Content: "离线",
  84. AlarmType: 0,
  85. Level: 2, //设备离线为严重告警
  86. }
  87. if err := models.G_db.Create(&oo).Error; err != nil {
  88. beego.Error(fmt.Sprintf("告警信息[%v]入库失败:%s", oo, err.Error()))
  89. } else {
  90. o.MapAlarm[k] = oo.ID
  91. if err := redisCltRawdata.HSet(DeviceAlarmId, k, oo.ID).Err(); err != nil {
  92. beego.Error(fmt.Sprintf("设备[%s]告警数据[%d]缓存失败:%s", k, oo.ID, err.Error()))
  93. }
  94. }
  95. o.SaveEventObject(&EventObject{ID: k, Etype: models.ET_OFFLINE, Time: v, Value: 0})
  96. }
  97. redisCltRawdata.HMSet(DevStatusPrefix+k, map[string]interface{}{TLast: util.MlNow().Format("2006-01-02 15:04:05"), ONLINE: 0})
  98. } else { //在线
  99. aid, ok := o.MapAlarm[k]
  100. if !ok {
  101. //查找过redis,则不再从redis查找
  102. if _, ok := _mapRedis[k]; !ok {
  103. if str, err := redisCltRawdata.HGet(DeviceAlarmId, k).Result(); err == nil {
  104. if id, err := strconv.Atoi(str); err == nil {
  105. aid = int64(id)
  106. }
  107. _mapRedis[k] = true
  108. }
  109. }
  110. }
  111. if aid > 0 {
  112. oo := models.DeviceAlarm{ID: aid, TEnd: v, EValue: float32(0)}
  113. if err := oo.Update(); err != nil {
  114. beego.Error(fmt.Sprintf("更新告警[%d]信息失败:%s", aid, err.Error()))
  115. }
  116. delete(o.MapAlarm, k)
  117. if err := redisCltRawdata.HDel(DeviceAlarmId, k).Err(); err != nil {
  118. beego.Error(fmt.Sprintf("更新告警[%d]信息失败:%s", aid, err.Error()))
  119. }
  120. o.SaveEventObject(&EventObject{ID: k, Etype: models.ET_ONLINE, Time: v, Value: 0})
  121. }
  122. redisCltRawdata.HMSet(DevStatusPrefix+k, map[string]interface{}{TLast: util.MlNow().Format("2006-01-02 15:04:05"), ONLINE: 1})
  123. }
  124. }
  125. default:
  126. time.Sleep(time.Minute)
  127. }
  128. }
  129. }
  130. func (o *EventMgr) SaveEventObject(eo *EventObject) {
  131. oo := models.DeviceEvents{DID: eo.ID, EType: eo.Etype, Time: eo.Time}
  132. if err := models.G_db.Create(&oo).Error; err != nil {
  133. beego.Error(fmt.Sprintf("设备[%s]事件入库失败:%s", eo.ID, err.Error()))
  134. }
  135. }