package main import ( "math" "runtime/debug" "strconv" "strings" "sync" "time" "github.com/sirupsen/logrus" "lc/common/models" "lc/common/util" ) var _BizAlarmMgronce sync.Once var _BizAlarmMgrsingle *BizAlarmMgr func GetBizAlarmMgr() *BizAlarmMgr { _BizAlarmMgronce.Do(func() { _BizAlarmMgrsingle = &BizAlarmMgr{ queue: util.NewQueue(10000), mapID: make(map[string]bool), mapIDLock: &sync.RWMutex{}, mapAssociate: make(map[string][]Associate), mapStrategy: make(map[int]models.AlarmStrategy), mapPendingAlarm: make(map[string]*PendingAlarm), } }) return _BizAlarmMgrsingle } type Associate struct { ID string Mid int Sid int Cid int } type BizValue struct { ID string Time time.Time Tid uint16 Data map[uint16]float64 //sid->值 } type PendingAlarm struct { Alarm *models.DeviceAlarm Duration uint8 //持续时间 } type BizAlarmMgr struct { queue *util.MlQueue //待处理数据 mapID map[string]bool //设备id->bool,用于判断数据是否要入队列 mapIDLock *sync.RWMutex mapAssociate map[string][]Associate //设备id->关联信息 mapStrategy map[int]models.AlarmStrategy //cid->告警策略 mapPendingAlarm map[string]*PendingAlarm //key=设备id_sid_cid } func (o *BizAlarmMgr) PushData(value *BizValue) { o.mapIDLock.RLock() defer o.mapIDLock.RUnlock() if _, ok := o.mapID[value.ID]; ok { o.queue.Put(value) } } // Handler 定时更新告警策略 func (o *BizAlarmMgr) Handler(args ...interface{}) interface{} { defer func() { if err := recover(); err != nil { gopool.Add(o.Handler, args) logrus.Errorf("BizAlarmMgr.Handler发生异常:%s", string(debug.Stack())) } }() o.loadConfig() o.LoadAlarm() timer := time.NewTicker(10 * time.Minute) timer2 := time.NewTicker(1 * time.Minute) for { select { case <-timer.C: //每隔10分钟更新告警配置 o.loadConfig() case <-timer2.C: //每隔1分钟检查告警入库情况 o.SaveAlarm() default: quantity := o.HandleOneData() if quantity == 0 { time.Sleep(10 * time.Millisecond) } } } return 0 } func (o *BizAlarmMgr) HandleOneData() uint32 { bv, ok, quantity := o.queue.Get() if !ok { return 0 } bizvalue, ok := bv.(*BizValue) if !ok { return quantity } //有配置,则处理 var isAlarm uint8 ass, ok := o.mapAssociate[bizvalue.ID] if !ok { return quantity } for _, v := range ass { if v.Mid != int(bizvalue.Tid) { continue } val, ok2 := bizvalue.Data[uint16(v.Sid)] if !ok2 { continue } c, ok3 := o.mapStrategy[v.Cid] if !ok3 { continue } val = Precision(val, 2, false) isAlarm = 0 akey := bizvalue.ID + "_" + strconv.Itoa(v.Sid) + "_" + strconv.Itoa(v.Cid) if !IsEqual(float64(c.LowLimit), -9999.0) { if val < float64(c.LowLimit) { //告警状态,低于下限 isAlarm = 1 } } if !IsEqual(float64(c.UpLimit), -9999.0) && isAlarm == 0 { if val > float64(c.UpLimit) { //告警状态,超过上限 isAlarm = 2 } } aa, ok := o.mapPendingAlarm[akey] if isAlarm == 0 && ok { //之前有告警,当前数据表明告警结束 if aa.Alarm.TStart.IsZero() || bizvalue.Time.Sub(aa.Alarm.TStart).Seconds() >= float64(aa.Duration) { aa.Alarm.EValue = float32(val) aa.Alarm.TEnd = bizvalue.Time o.mapPendingAlarm[akey] = aa } else { delete(o.mapPendingAlarm, akey) } } else if isAlarm > 0 && !ok { //之前无告警,当前数据表明告警开始 var content string var limitvalue float64 if isAlarm == 1 { content = "低于下限(" + strconv.FormatFloat(float64(c.LowLimit), 'f', 2, 64) + ")" limitvalue = float64(c.LowLimit) } else if isAlarm == 2 { content = "高于上限(" + strconv.FormatFloat(float64(c.UpLimit), 'f', 2, 64) + ")" limitvalue = float64(c.UpLimit) } aA := models.DeviceAlarm{ DID: bizvalue.ID, TStart: bizvalue.Time, Threshold: float32(limitvalue), SValue: float32(val), Content: content, AlarmType: uint16(v.Cid), Level: 1, Sid: uint16(v.Sid), Cid: uint16(v.Cid), Cname: c.Name, } pa := PendingAlarm{ Alarm: &aA, Duration: c.Duration, } o.mapPendingAlarm[akey] = &pa } } return quantity } func (o *BizAlarmMgr) SaveAlarm() { for k, v := range o.mapPendingAlarm { if v.Alarm.ID == 0 { //还未入过库 if (v.Alarm.TEnd.IsZero() && util.MlNow().Sub(v.Alarm.TStart).Seconds() >= float64(v.Duration)) || (!v.Alarm.TEnd.IsZero() && v.Alarm.TEnd.Sub(v.Alarm.TStart).Seconds() >= float64(v.Duration)) { err := models.G_db.Create(v.Alarm).Error if err != nil { logrus.Errorf("告警信息[%v]入库失败:%s", v.Alarm, err.Error()) } else { if v.Alarm.TEnd.IsZero() { if err := redisCltRawData.HSet(DeviceAlarmId, k, v.Alarm.ID).Err(); err != nil { logrus.Errorf("设备[%s]告警数据[%d]缓存失败:%s", v.Alarm.DID, v.Alarm.ID, err.Error()) } } } } } else { //已经入过库 //判断是否已经结束,结束则更新告警结束时间、以及结束时的值 if !v.Alarm.TEnd.IsZero() { err := v.Alarm.Update() if err != nil { logrus.Errorf("告警信息[%v]告警结束更新失败:%s", v.Alarm, err.Error()) } else { delete(o.mapPendingAlarm, k) if err := redisCltRawData.HDel(DeviceAlarmId, k).Err(); err != nil { logrus.Errorf("设备[%s]告警数据[%d]缓存失败:%s", v.Alarm.DID, v.Alarm.ID, err.Error()) } } } } } } func (o *BizAlarmMgr) LoadAlarm() { m, err := redisCltRawData.HGetAll(DeviceAlarmId).Result() if err != nil { logrus.Errorf("从redis缓存中加载业务告警信息失败:%s", err.Error()) return } for k, v := range m { strlist := strings.Split(k, "_") //key规则:设备编码_sid_cid if len(strlist) != 3 { continue } cid, err := strconv.Atoi(strlist[2]) if err != nil { continue } id, err2 := strconv.Atoi(v) if err2 != nil { continue } if as, ok := o.mapStrategy[cid]; ok { pa := PendingAlarm{ Alarm: &models.DeviceAlarm{ID: int64(id)}, Duration: as.Duration, } o.mapPendingAlarm[k] = &pa } } } func (o *BizAlarmMgr) loadConfig() { //重新读取策略 arr, err := models.GetAllAlarmStrategy() if err == nil { o.mapStrategy = make(map[int]models.AlarmStrategy) for _, v := range arr { o.mapStrategy[int(v.ID)] = v } } //重新读取告警关联信息 arr2, err2 := models.GetAllAlarmAssociate() if err2 == nil { o.mapAssociate = make(map[string][]Associate) mapID_ := make(map[string]bool) for _, v := range arr2 { x := Associate{ID: v.ID, Mid: v.Mid, Sid: v.Sid, Cid: v.Cid} o.mapAssociate[v.ID] = append(o.mapAssociate[v.ID], x) mapID_[v.ID] = true } o.mapIDLock.Lock() o.mapID = mapID_ o.mapIDLock.Unlock() } } const MIN = 0.000001 func IsEqual(f1, f2 float64) bool { return math.Dim(f1, f2) < MIN } func Precision(f float64, prec int, round bool) float64 { pow10_n := math.Pow10(prec) if round { return math.Trunc((f+0.5/pow10_n)*pow10_n) / pow10_n } return math.Trunc((f)*pow10_n) / pow10_n }