| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278 |
- package main
- import (
- "math"
- "runtime/debug"
- "strconv"
- "strings"
- "sync"
- "time"
- "github.com/sirupsen/logrus"
- "lc/common/models"
- "lc/common/util"
- )
- var _BizAlarmMgronce sync.Once
- var _BizAlarmMgrsingle *BizAlarmMgr
- func GetBizAlarmMgr() *BizAlarmMgr {
- _BizAlarmMgronce.Do(func() {
- _BizAlarmMgrsingle = &BizAlarmMgr{
- queue: util.NewQueue(10000),
- mapID: make(map[string]bool),
- mapIDLock: &sync.RWMutex{},
- mapAssociate: make(map[string][]Associate),
- mapStrategy: make(map[int]models.AlarmStrategy),
- mapPendingAlarm: make(map[string]*PendingAlarm),
- }
- })
- return _BizAlarmMgrsingle
- }
- type Associate struct {
- ID string
- Mid int
- Sid int
- Cid int
- }
- type BizValue struct {
- ID string
- Time time.Time
- Tid uint16
- Data map[uint16]float64 //sid->值
- }
- type PendingAlarm struct {
- Alarm *models.DeviceAlarm
- Duration uint8 //持续时间
- }
- type BizAlarmMgr struct {
- queue *util.MlQueue //待处理数据
- mapID map[string]bool //设备id->bool,用于判断数据是否要入队列
- mapIDLock *sync.RWMutex
- mapAssociate map[string][]Associate //设备id->关联信息
- mapStrategy map[int]models.AlarmStrategy //cid->告警策略
- mapPendingAlarm map[string]*PendingAlarm //key=设备id_sid_cid
- }
- func (o *BizAlarmMgr) PushData(value *BizValue) {
- o.mapIDLock.RLock()
- defer o.mapIDLock.RUnlock()
- if _, ok := o.mapID[value.ID]; ok {
- o.queue.Put(value)
- }
- }
- // Handler 定时更新告警策略
- func (o *BizAlarmMgr) Handler(args ...interface{}) interface{} {
- defer func() {
- if err := recover(); err != nil {
- gopool.Add(o.Handler, args)
- logrus.Errorf("BizAlarmMgr.Handler发生异常:%s", string(debug.Stack()))
- }
- }()
- o.loadConfig()
- o.LoadAlarm()
- timer := time.NewTicker(10 * time.Minute)
- timer2 := time.NewTicker(1 * time.Minute)
- for {
- select {
- case <-timer.C: //每隔10分钟更新告警配置
- o.loadConfig()
- case <-timer2.C: //每隔1分钟检查告警入库情况
- o.SaveAlarm()
- default:
- quantity := o.HandleOneData()
- if quantity == 0 {
- time.Sleep(10 * time.Millisecond)
- }
- }
- }
- return 0
- }
- func (o *BizAlarmMgr) HandleOneData() uint32 {
- bv, ok, quantity := o.queue.Get()
- if !ok {
- return 0
- }
- bizvalue, ok := bv.(*BizValue)
- if !ok {
- return quantity
- }
- //有配置,则处理
- var isAlarm uint8
- ass, ok := o.mapAssociate[bizvalue.ID]
- if !ok {
- return quantity
- }
- for _, v := range ass {
- if v.Mid != int(bizvalue.Tid) {
- continue
- }
- val, ok2 := bizvalue.Data[uint16(v.Sid)]
- if !ok2 {
- continue
- }
- c, ok3 := o.mapStrategy[v.Cid]
- if !ok3 {
- continue
- }
- val = Precision(val, 2, false)
- isAlarm = 0
- akey := bizvalue.ID + "_" + strconv.Itoa(v.Sid) + "_" + strconv.Itoa(v.Cid)
- if !IsEqual(float64(c.LowLimit), -9999.0) {
- if val < float64(c.LowLimit) { //告警状态,低于下限
- isAlarm = 1
- }
- }
- if !IsEqual(float64(c.UpLimit), -9999.0) && isAlarm == 0 {
- if val > float64(c.UpLimit) { //告警状态,超过上限
- isAlarm = 2
- }
- }
- aa, ok := o.mapPendingAlarm[akey]
- if isAlarm == 0 && ok { //之前有告警,当前数据表明告警结束
- if aa.Alarm.TStart.IsZero() || bizvalue.Time.Sub(aa.Alarm.TStart).Seconds() >= float64(aa.Duration) {
- aa.Alarm.EValue = float32(val)
- aa.Alarm.TEnd = bizvalue.Time
- o.mapPendingAlarm[akey] = aa
- } else {
- delete(o.mapPendingAlarm, akey)
- }
- } else if isAlarm > 0 && !ok { //之前无告警,当前数据表明告警开始
- var content string
- var limitvalue float64
- if isAlarm == 1 {
- content = "低于下限(" + strconv.FormatFloat(float64(c.LowLimit), 'f', 2, 64) + ")"
- limitvalue = float64(c.LowLimit)
- } else if isAlarm == 2 {
- content = "高于上限(" + strconv.FormatFloat(float64(c.UpLimit), 'f', 2, 64) + ")"
- limitvalue = float64(c.UpLimit)
- }
- aA := models.DeviceAlarm{
- DID: bizvalue.ID,
- TStart: bizvalue.Time,
- Threshold: float32(limitvalue),
- SValue: float32(val),
- Content: content,
- AlarmType: uint16(v.Cid),
- Level: 1,
- Sid: uint16(v.Sid),
- Cid: uint16(v.Cid),
- Cname: c.Name,
- }
- pa := PendingAlarm{
- Alarm: &aA,
- Duration: c.Duration,
- }
- o.mapPendingAlarm[akey] = &pa
- }
- }
- return quantity
- }
- func (o *BizAlarmMgr) SaveAlarm() {
- for k, v := range o.mapPendingAlarm {
- if v.Alarm.ID == 0 { //还未入过库
- if (v.Alarm.TEnd.IsZero() && util.MlNow().Sub(v.Alarm.TStart).Seconds() >= float64(v.Duration)) ||
- (!v.Alarm.TEnd.IsZero() && v.Alarm.TEnd.Sub(v.Alarm.TStart).Seconds() >= float64(v.Duration)) {
- err := models.G_db.Create(v.Alarm).Error
- if err != nil {
- logrus.Errorf("告警信息[%v]入库失败:%s", v.Alarm, err.Error())
- } else {
- if v.Alarm.TEnd.IsZero() {
- if err := redisCltRawData.HSet(DeviceAlarmId, k, v.Alarm.ID).Err(); err != nil {
- logrus.Errorf("设备[%s]告警数据[%d]缓存失败:%s", v.Alarm.DID, v.Alarm.ID, err.Error())
- }
- }
- }
- }
- } else { //已经入过库
- //判断是否已经结束,结束则更新告警结束时间、以及结束时的值
- if !v.Alarm.TEnd.IsZero() {
- err := v.Alarm.Update()
- if err != nil {
- logrus.Errorf("告警信息[%v]告警结束更新失败:%s", v.Alarm, err.Error())
- } else {
- delete(o.mapPendingAlarm, k)
- if err := redisCltRawData.HDel(DeviceAlarmId, k).Err(); err != nil {
- logrus.Errorf("设备[%s]告警数据[%d]缓存失败:%s", v.Alarm.DID, v.Alarm.ID, err.Error())
- }
- }
- }
- }
- }
- }
- func (o *BizAlarmMgr) LoadAlarm() {
- m, err := redisCltRawData.HGetAll(DeviceAlarmId).Result()
- if err != nil {
- logrus.Errorf("从redis缓存中加载业务告警信息失败:%s", err.Error())
- return
- }
- for k, v := range m {
- strlist := strings.Split(k, "_") //key规则:设备编码_sid_cid
- if len(strlist) != 3 {
- continue
- }
- cid, err := strconv.Atoi(strlist[2])
- if err != nil {
- continue
- }
- id, err2 := strconv.Atoi(v)
- if err2 != nil {
- continue
- }
- if as, ok := o.mapStrategy[cid]; ok {
- pa := PendingAlarm{
- Alarm: &models.DeviceAlarm{ID: int64(id)},
- Duration: as.Duration,
- }
- o.mapPendingAlarm[k] = &pa
- }
- }
- }
- func (o *BizAlarmMgr) loadConfig() {
- //重新读取策略
- arr, err := models.GetAllAlarmStrategy()
- if err == nil {
- o.mapStrategy = make(map[int]models.AlarmStrategy)
- for _, v := range arr {
- o.mapStrategy[int(v.ID)] = v
- }
- }
- //重新读取告警关联信息
- arr2, err2 := models.GetAllAlarmAssociate()
- if err2 == nil {
- o.mapAssociate = make(map[string][]Associate)
- mapID_ := make(map[string]bool)
- for _, v := range arr2 {
- x := Associate{ID: v.ID, Mid: v.Mid, Sid: v.Sid, Cid: v.Cid}
- o.mapAssociate[v.ID] = append(o.mapAssociate[v.ID], x)
- mapID_[v.ID] = true
- }
- o.mapIDLock.Lock()
- o.mapID = mapID_
- o.mapIDLock.Unlock()
- }
- }
- const MIN = 0.000001
- func IsEqual(f1, f2 float64) bool {
- return math.Dim(f1, f2) < MIN
- }
- func Precision(f float64, prec int, round bool) float64 {
- pow10_n := math.Pow10(prec)
- if round {
- return math.Trunc((f+0.5/pow10_n)*pow10_n) / pow10_n
- }
- return math.Trunc((f)*pow10_n) / pow10_n
- }
|