package main import ( "runtime" "runtime/debug" "sync" "time" "github.com/sirupsen/logrus" "lc/common/models" "lc/common/mqtt" "lc/common/protocol" "lc/common/util" ) // 一键报警管理 var _SosMgrOnce sync.Once var _SosMgrSingle *SosMgr func GetSosMgr() *SosMgr { _SosMgrOnce.Do(func() { _SosMgrSingle = &SosMgr{ queue: util.NewQueue(10000), mapSosAlarm: make(map[string]int64), mapSosState: make(map[string]*StateInfo), } }) return _SosMgrSingle } type SosMgr struct { queue *util.MlQueue mapSosAlarm map[string]int64 //主题到数据库表记录id mapSosState map[string]*StateInfo ////0在线,1离线 } func (o *SosMgr) SubscribeTopics() { GetMQTTMgr().Subscribe(GetVagueTopic(protocol.DT_SOS, protocol.TP_ONVIF_ALARM), mqtt.AtMostOnce, o.HandlerData) GetMQTTMgr().Subscribe(GetVagueTopic(protocol.DT_SOS, protocol.TP_ONVIF_STATE), mqtt.AtMostOnce, o.HandlerData) } func (o *SosMgr) Handler(args ...interface{}) interface{} { defer func() { if err := recover(); err != nil { gopool.Add(o.Handler, args) logrus.Errorf("SosMgr.Handler发生异常:%s", string(debug.Stack())) } }() timer := time.NewTicker(time.Duration(CheckOfflineInterval) * time.Minute) for { select { case <-timer.C: //每隔5分钟执行一次 o.UpdateState() default: msg, ok, quantity := o.queue.Get() if !ok { time.Sleep(100 * time.Millisecond) continue } else if quantity > 1000 { logrus.Warnf("SosMgr.Handler:数据队列累积过多,请注意优化,当前队列条数:%d", quantity) } m, ok := msg.(*mqtt.Message) if !ok { continue } _, _, _, topic, err := ParseTopic(m.Topic()) if err != nil { continue } switch topic { case protocol.TP_ONVIF_STATE: o.HandlerState(m) case protocol.TP_ONVIF_ALARM: o.HandlerAlarm(m) default: logrus.Warnf("SosMgr.Handler:收到暂不支持的主题:%s", topic) } } } } func (o *SosMgr) HandlerData(m mqtt.Message) { for { ok, cnt := o.queue.Put(&m) if ok { break } else { logrus.Errorf("SosMgr.HandlerData:查询队列失败,队列消息数量:%d", cnt) runtime.Gosched() } } } func (o *SosMgr) HandlerAlarm(m *mqtt.Message) { var obj protocol.Pack_OnvifAlarm if err := obj.DeCode(m.PayloadString()); err != nil { logrus.Errorf("数据解析失败,主题:%s,内容:%s,失败原因:%s", m.Topic(), m.PayloadString(), err.Error()) return } t, err := util.MlParseTime(obj.Time) if err != nil { logrus.Errorf("时间[%s]解析错误:%s", obj.Time, err.Error()) return } if obj.Data.Alarm.AlarmTopic == "tns1:Device/Trigger/tnshik:AlarmIn" { //海康威视一键报警 state, ok2 := obj.Data.Alarm.Data["State"] if !ok2 { return } //更新数据库告警结束时间 if state == "false" { if id, ok := o.mapSosAlarm[obj.Id+obj.Data.Alarm.AlarmTopic]; ok { oo := models.SosAlarm{ID: id, TEnd: t} if err := oo.Update(); err != nil { logrus.Errorf("一键告警数据:%v", obj) logrus.Errorf("一键告警数据更新失败:%s", err.Error()) } delete(o.mapSosAlarm, obj.Id+obj.Data.Alarm.AlarmTopic) } } else { //新建告警记录 oo := models.SosAlarm{ DID: obj.Id, TStart: t, AType: "SOS", Content: "一键求助", } if err := models.G_db.Create(&oo).Error; err != nil { logrus.Errorf("一键告警数据:%v", obj) logrus.Errorf("一键告警数据入库失败:%s", err.Error()) } else { o.mapSosAlarm[obj.Id+obj.Data.Alarm.AlarmTopic] = oo.ID } } } } func (o *SosMgr) HandlerState(m *mqtt.Message) { var obj protocol.Pack_IPCState if err := obj.DeCode(m.PayloadString()); err != nil { logrus.Errorf("数据解析失败,主题:%s,内容:%s,失败原因:%s", m.Topic(), m.PayloadString(), err.Error()) return } t, err := util.MlParseTime(obj.Time) if err != nil { logrus.Errorf("时间[%s]解析错误:%s", obj.Time, err.Error()) return } //0在线,1离线 si, ok := o.mapSosState[obj.Id] if !ok { t, s, err := getState(obj.Id) if err != nil { cacheState(obj.Id, obj.Time, obj.Data.State) o.mapSosState[obj.Id] = &StateInfo{Time: t, State: obj.Data.State} return } else { si = &StateInfo{Time: t, State: s} o.mapSosState[obj.Id] = si } } if si.State != obj.Data.State { if obj.Data.State == 1 { //在线到离线 GetEventMgr().PushEvent(&EventObject{ID: obj.Id, EventType: models.ET_OFFLINE, Time: t}) } else { //离线到在线 GetEventMgr().PushEvent(&EventObject{ID: obj.Id, EventType: models.ET_ONLINE, Time: t}) } } cacheState(obj.Id, obj.Time, obj.Data.State) o.mapSosState[obj.Id].State = obj.Data.State o.mapSosState[obj.Id].Time = t } func (o *SosMgr) UpdateState() { t := util.MlNow() for k, v := range o.mapSosState { if v.State == 0 && t.Sub(v.Time).Minutes() > OfflineInterval { //只检查当前还在线的 GetEventMgr().PushEvent(&EventObject{ID: k, EventType: models.ET_OFFLINE, Time: t}) cacheState(k, t.Format("2006-01-02 15:04:05"), 1) o.mapSosState[k].State = 1 o.mapSosState[k].Time = t } } }