bizalarm.go 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278
  1. package main
  2. import (
  3. "math"
  4. "runtime/debug"
  5. "strconv"
  6. "strings"
  7. "sync"
  8. "time"
  9. "github.com/sirupsen/logrus"
  10. "lc/common/models"
  11. "lc/common/util"
  12. )
  13. var _BizAlarmMgronce sync.Once
  14. var _BizAlarmMgrsingle *BizAlarmMgr
  15. func GetBizAlarmMgr() *BizAlarmMgr {
  16. _BizAlarmMgronce.Do(func() {
  17. _BizAlarmMgrsingle = &BizAlarmMgr{
  18. queue: util.NewQueue(10000),
  19. mapID: make(map[string]bool),
  20. mapIDLock: &sync.RWMutex{},
  21. mapAssociate: make(map[string][]Associate),
  22. mapStrategy: make(map[int]models.AlarmStrategy),
  23. mapPendingAlarm: make(map[string]*PendingAlarm),
  24. }
  25. })
  26. return _BizAlarmMgrsingle
  27. }
  28. type Associate struct {
  29. ID string
  30. Mid int
  31. Sid int
  32. Cid int
  33. }
  34. type BizValue struct {
  35. ID string
  36. Time time.Time
  37. Tid uint16
  38. Data map[uint16]float64 //sid->值
  39. }
  40. type PendingAlarm struct {
  41. Alarm *models.DeviceAlarm
  42. Duration uint8 //持续时间
  43. }
  44. type BizAlarmMgr struct {
  45. queue *util.MlQueue //待处理数据
  46. mapID map[string]bool //设备id->bool,用于判断数据是否要入队列
  47. mapIDLock *sync.RWMutex
  48. mapAssociate map[string][]Associate //设备id->关联信息
  49. mapStrategy map[int]models.AlarmStrategy //cid->告警策略
  50. mapPendingAlarm map[string]*PendingAlarm //key=设备id_sid_cid
  51. }
  52. func (o *BizAlarmMgr) PushData(value *BizValue) {
  53. o.mapIDLock.RLock()
  54. defer o.mapIDLock.RUnlock()
  55. if _, ok := o.mapID[value.ID]; ok {
  56. o.queue.Put(value)
  57. }
  58. }
  59. // Handler 定时更新告警策略
  60. func (o *BizAlarmMgr) Handler(args ...interface{}) interface{} {
  61. defer func() {
  62. if err := recover(); err != nil {
  63. gopool.Add(o.Handler, args)
  64. logrus.Errorf("BizAlarmMgr.Handler发生异常:%s", string(debug.Stack()))
  65. }
  66. }()
  67. o.loadConfig()
  68. o.LoadAlarm()
  69. timer := time.NewTicker(10 * time.Minute)
  70. timer2 := time.NewTicker(1 * time.Minute)
  71. for {
  72. select {
  73. case <-timer.C: //每隔10分钟更新告警配置
  74. o.loadConfig()
  75. case <-timer2.C: //每隔1分钟检查告警入库情况
  76. o.SaveAlarm()
  77. default:
  78. quantity := o.HandleOneData()
  79. if quantity == 0 {
  80. time.Sleep(10 * time.Millisecond)
  81. }
  82. }
  83. }
  84. return 0
  85. }
  86. func (o *BizAlarmMgr) HandleOneData() uint32 {
  87. bv, ok, quantity := o.queue.Get()
  88. if !ok {
  89. return 0
  90. }
  91. bizvalue, ok := bv.(*BizValue)
  92. if !ok {
  93. return quantity
  94. }
  95. //有配置,则处理
  96. var isAlarm uint8
  97. ass, ok := o.mapAssociate[bizvalue.ID]
  98. if !ok {
  99. return quantity
  100. }
  101. for _, v := range ass {
  102. if v.Mid != int(bizvalue.Tid) {
  103. continue
  104. }
  105. val, ok2 := bizvalue.Data[uint16(v.Sid)]
  106. if !ok2 {
  107. continue
  108. }
  109. c, ok3 := o.mapStrategy[v.Cid]
  110. if !ok3 {
  111. continue
  112. }
  113. val = Precision(val, 2, false)
  114. isAlarm = 0
  115. akey := bizvalue.ID + "_" + strconv.Itoa(v.Sid) + "_" + strconv.Itoa(v.Cid)
  116. if !IsEqual(float64(c.LowLimit), -9999.0) {
  117. if val < float64(c.LowLimit) { //告警状态,低于下限
  118. isAlarm = 1
  119. }
  120. }
  121. if !IsEqual(float64(c.UpLimit), -9999.0) && isAlarm == 0 {
  122. if val > float64(c.UpLimit) { //告警状态,超过上限
  123. isAlarm = 2
  124. }
  125. }
  126. aa, ok := o.mapPendingAlarm[akey]
  127. if isAlarm == 0 && ok { //之前有告警,当前数据表明告警结束
  128. if aa.Alarm.TStart.IsZero() || bizvalue.Time.Sub(aa.Alarm.TStart).Seconds() >= float64(aa.Duration) {
  129. aa.Alarm.EValue = float32(val)
  130. aa.Alarm.TEnd = bizvalue.Time
  131. o.mapPendingAlarm[akey] = aa
  132. } else {
  133. delete(o.mapPendingAlarm, akey)
  134. }
  135. } else if isAlarm > 0 && !ok { //之前无告警,当前数据表明告警开始
  136. var content string
  137. var limitvalue float64
  138. if isAlarm == 1 {
  139. content = "低于下限(" + strconv.FormatFloat(float64(c.LowLimit), 'f', 2, 64) + ")"
  140. limitvalue = float64(c.LowLimit)
  141. } else if isAlarm == 2 {
  142. content = "高于上限(" + strconv.FormatFloat(float64(c.UpLimit), 'f', 2, 64) + ")"
  143. limitvalue = float64(c.UpLimit)
  144. }
  145. aA := models.DeviceAlarm{
  146. DID: bizvalue.ID,
  147. TStart: bizvalue.Time,
  148. Threshold: float32(limitvalue),
  149. SValue: float32(val),
  150. Content: content,
  151. AlarmType: uint16(v.Cid),
  152. Level: 1,
  153. Sid: uint16(v.Sid),
  154. Cid: uint16(v.Cid),
  155. Cname: c.Name,
  156. }
  157. pa := PendingAlarm{
  158. Alarm: &aA,
  159. Duration: c.Duration,
  160. }
  161. o.mapPendingAlarm[akey] = &pa
  162. }
  163. }
  164. return quantity
  165. }
  166. func (o *BizAlarmMgr) SaveAlarm() {
  167. for k, v := range o.mapPendingAlarm {
  168. if v.Alarm.ID == 0 { //还未入过库
  169. if (v.Alarm.TEnd.IsZero() && util.MlNow().Sub(v.Alarm.TStart).Seconds() >= float64(v.Duration)) ||
  170. (!v.Alarm.TEnd.IsZero() && v.Alarm.TEnd.Sub(v.Alarm.TStart).Seconds() >= float64(v.Duration)) {
  171. err := models.G_db.Create(v.Alarm).Error
  172. if err != nil {
  173. logrus.Errorf("告警信息[%v]入库失败:%s", v.Alarm, err.Error())
  174. } else {
  175. if v.Alarm.TEnd.IsZero() {
  176. if err := redisCltRawData.HSet(DeviceAlarmId, k, v.Alarm.ID).Err(); err != nil {
  177. logrus.Errorf("设备[%s]告警数据[%d]缓存失败:%s", v.Alarm.DID, v.Alarm.ID, err.Error())
  178. }
  179. }
  180. }
  181. }
  182. } else { //已经入过库
  183. //判断是否已经结束,结束则更新告警结束时间、以及结束时的值
  184. if !v.Alarm.TEnd.IsZero() {
  185. err := v.Alarm.Update()
  186. if err != nil {
  187. logrus.Errorf("告警信息[%v]告警结束更新失败:%s", v.Alarm, err.Error())
  188. } else {
  189. delete(o.mapPendingAlarm, k)
  190. if err := redisCltRawData.HDel(DeviceAlarmId, k).Err(); err != nil {
  191. logrus.Errorf("设备[%s]告警数据[%d]缓存失败:%s", v.Alarm.DID, v.Alarm.ID, err.Error())
  192. }
  193. }
  194. }
  195. }
  196. }
  197. }
  198. func (o *BizAlarmMgr) LoadAlarm() {
  199. m, err := redisCltRawData.HGetAll(DeviceAlarmId).Result()
  200. if err != nil {
  201. logrus.Errorf("从redis缓存中加载业务告警信息失败:%s", err.Error())
  202. return
  203. }
  204. for k, v := range m {
  205. strlist := strings.Split(k, "_") //key规则:设备编码_sid_cid
  206. if len(strlist) != 3 {
  207. continue
  208. }
  209. cid, err := strconv.Atoi(strlist[2])
  210. if err != nil {
  211. continue
  212. }
  213. id, err2 := strconv.Atoi(v)
  214. if err2 != nil {
  215. continue
  216. }
  217. if as, ok := o.mapStrategy[cid]; ok {
  218. pa := PendingAlarm{
  219. Alarm: &models.DeviceAlarm{ID: int64(id)},
  220. Duration: as.Duration,
  221. }
  222. o.mapPendingAlarm[k] = &pa
  223. }
  224. }
  225. }
  226. func (o *BizAlarmMgr) loadConfig() {
  227. //重新读取策略
  228. arr, err := models.GetAllAlarmStrategy()
  229. if err == nil {
  230. o.mapStrategy = make(map[int]models.AlarmStrategy)
  231. for _, v := range arr {
  232. o.mapStrategy[int(v.ID)] = v
  233. }
  234. }
  235. //重新读取告警关联信息
  236. arr2, err2 := models.GetAllAlarmAssociate()
  237. if err2 == nil {
  238. o.mapAssociate = make(map[string][]Associate)
  239. mapID_ := make(map[string]bool)
  240. for _, v := range arr2 {
  241. x := Associate{ID: v.ID, Mid: v.Mid, Sid: v.Sid, Cid: v.Cid}
  242. o.mapAssociate[v.ID] = append(o.mapAssociate[v.ID], x)
  243. mapID_[v.ID] = true
  244. }
  245. o.mapIDLock.Lock()
  246. o.mapID = mapID_
  247. o.mapIDLock.Unlock()
  248. }
  249. }
  250. const MIN = 0.000001
  251. func IsEqual(f1, f2 float64) bool {
  252. return math.Dim(f1, f2) < MIN
  253. }
  254. func Precision(f float64, prec int, round bool) float64 {
  255. pow10_n := math.Pow10(prec)
  256. if round {
  257. return math.Trunc((f+0.5/pow10_n)*pow10_n) / pow10_n
  258. }
  259. return math.Trunc((f)*pow10_n) / pow10_n
  260. }