package main import ( "runtime" "runtime/debug" "sync" "time" "github.com/sirupsen/logrus" "lc/common/models" "lc/common/mqtt" "lc/common/protocol" "lc/common/util" ) var _IpcMgrOnce sync.Once var _IpcMgrSingle *IPCMgr func GetIPCMgr() *IPCMgr { _IpcMgrOnce.Do(func() { _IpcMgrSingle = &IPCMgr{ queue: util.NewQueue(10000), //mapSosAlarm: make(map[string]int64), mapIpcState: make(map[string]*StateInfo), } }) return _IpcMgrSingle } type StateInfo struct { Time time.Time State uint8 } type IPCMgr struct { queue *util.MlQueue //mapSosAlarm map[string]int64 //主题到数据库表记录id mapIpcState map[string]*StateInfo ////0在线,1离线 } func (o *IPCMgr) SubscribeTopics() { GetMQTTMgr().Subscribe(GetVagueTopic(protocol.DT_IPC, protocol.TP_ONVIF_ALARM), mqtt.AtMostOnce, o.HandlerData) GetMQTTMgr().Subscribe(GetVagueTopic(protocol.DT_IPC, protocol.TP_ONVIF_STATE), mqtt.AtMostOnce, o.HandlerData) GetMQTTMgr().Subscribe(GetVagueTopic(protocol.DT_IPC, protocol.TP_ONVIF_PRESETS_ACK), mqtt.AtMostOnce, o.HandlerData) GetMQTTMgr().Subscribe(GetVagueTopic(protocol.DT_IPC, protocol.TP_ONVIF_PRESET_ACK), mqtt.AtMostOnce, o.HandlerData) } func (o *IPCMgr) Handler(args ...interface{}) interface{} { defer func() { if err := recover(); err != nil { gopool.Add(o.Handler, args) logrus.Errorf("IPCMgr.Handler发生异常:%v", err) logrus.Errorf("IPCMgr.Handler发生异常:%s", string(debug.Stack())) } }() timer := time.NewTicker(time.Duration(CheckOfflineInterval) * time.Minute) for { select { case <-timer.C: //每隔5分钟执行一次 o.UpdateState() default: msg, ok, quantity := o.queue.Get() if !ok { time.Sleep(100 * time.Millisecond) continue } else if quantity > 1000 { logrus.Warnf("IPCMgr.Handler:数据队列累积过多,请注意优化,当前队列条数:%d", quantity) } m, ok := msg.(*mqtt.Message) if !ok { continue } _, _, _, topic, err := ParseTopic(m.Topic()) if err != nil { continue } switch topic { case protocol.TP_ONVIF_STATE: o.HandlerState(m) case protocol.TP_ONVIF_ALARM: o.HandlerAlarm(m) case protocol.TP_ONVIF_PRESETS_ACK: o.HandlerGetPreset(m) case protocol.TP_ONVIF_PRESET_ACK: o.HandlerSetPreset(m) default: logrus.Warnf("IPCMgr.Handler:收到暂不支持的主题:%s", topic) } } } } func (o *IPCMgr) HandlerData(m mqtt.Message) { for { ok, cnt := o.queue.Put(&m) if ok { break } else { logrus.Errorf("IPCMgr.HandlerData:查询队列失败,队列消息数量:%d", cnt) runtime.Gosched() } } } func (o *IPCMgr) HandlerAlarm(m *mqtt.Message) { var obj protocol.Pack_OnvifAlarm if err := obj.DeCode(m.PayloadString()); err != nil { logrus.Errorf("数据解析失败,主题:%s,内容:%s,失败原因:%s", m.Topic(), m.PayloadString(), err.Error()) return } t, err := util.MlParseTime(obj.Time) if err != nil { logrus.Errorf("时间[%s]解析错误:%s", obj.Time, err.Error()) return } if obj.Data.Alarm.AlarmTopic == "tns1:RuleEngine/LineDetector/Crossed" { oo := models.IPCAlarm{ DID: obj.Id, TStart: t, TEnd: t, AType: "Crossed", Content: "遮挡告警", } if err := models.G_db.Create(&oo).Error; err != nil { logrus.Errorf("遮挡告警数据:%v", obj) logrus.Errorf("遮挡告警数据入库失败:%s", err.Error()) } } } func (o *IPCMgr) HandlerState(m *mqtt.Message) { var obj protocol.Pack_IPCState if err := obj.DeCode(m.PayloadString()); err != nil { logrus.Errorf("数据解析失败,主题:%s,内容:%s,失败原因:%s", m.Topic(), m.PayloadString(), err.Error()) return } t, err := util.MlParseTime(obj.Time) if err != nil { logrus.Errorf("时间[%s]解析错误:%s", obj.Time, err.Error()) return } //0在线,1离线 si, ok := o.mapIpcState[obj.Id] if !ok { t, s, err := getState(obj.Id) if err != nil { cacheState(obj.Id, obj.Time, obj.Data.State) o.mapIpcState[obj.Id] = &StateInfo{Time: t, State: obj.Data.State} return } else { si = &StateInfo{Time: t, State: s} o.mapIpcState[obj.Id] = si } } if si.State != obj.Data.State { if obj.Data.State == 1 { //在线到离线 GetEventMgr().PushEvent(&EventObject{ID: obj.Id, EventType: models.ET_OFFLINE, Time: t}) } else { //离线到在线 GetEventMgr().PushEvent(&EventObject{ID: obj.Id, EventType: models.ET_ONLINE, Time: t}) } } cacheState(obj.Id, obj.Time, obj.Data.State) o.mapIpcState[obj.Id].State = obj.Data.State o.mapIpcState[obj.Id].Time = t } func (o *IPCMgr) UpdateState() { t := util.MlNow() for k, v := range o.mapIpcState { if v.State == 0 && t.Sub(v.Time).Minutes() > OfflineInterval { //只检查当前还在线的 GetEventMgr().PushEvent(&EventObject{ID: k, EventType: models.ET_OFFLINE, Time: t}) cacheState(k, t.Format("2006-01-02 15:04:05"), 1) o.mapIpcState[k].State = 1 o.mapIpcState[k].Time = t } } } func (o *IPCMgr) HandlerGetPreset(m *mqtt.Message) { var obj protocol.Pack_PresetInfo if err := obj.DeCode(m.PayloadString()); err != nil { logrus.Errorf("数据解析失败,主题:%s,内容:%s,失败原因:%s", m.Topic(), m.PayloadString(), err.Error()) return } if obj.Data.State == protocol.FAILED { logrus.Errorf("读取设备[%s]预置点失败:%s", obj.Id, obj.Data.Error) return } //所有预置点不存库,用于调试 if obj.Data.Flag == 6 { logrus.Debugf("设备[%s]预置点:%v", obj.Id, obj.Data.Presets) return } if len(obj.Data.Presets) == 0 { return } t := util.MlNow() for _, v := range obj.Data.Presets { ps := models.IpcPresets{ ID: obj.Id, Token: v.Token, Name: v.Name, X: v.X, Y: v.Y, Z: v.Z, CreatedAt: t, } if err := ps.SaveFromGateway(); err != nil { logrus.Errorf("保存设置预置位信息失败:%s", err.Error()) logrus.Debugf("预置位信息:%v", ps) } } } func (o *IPCMgr) HandlerSetPreset(m *mqtt.Message) { var obj protocol.Pack_IPCSetPresetACK if err := obj.DeCode(m.PayloadString()); err != nil { logrus.Errorf("数据解析失败,主题:%s,内容:%s,失败原因:%s", m.Topic(), m.PayloadString(), err.Error()) return } if obj.Data.State == protocol.FAILED { logrus.Errorf("对设备[%s]设置预置点失败:%s", obj.Id, obj.Data.Error) return } ps := models.IpcPresets{ ID: obj.Id, Token: obj.Data.Token, Name: obj.Data.Name, X: obj.Data.X, Y: obj.Data.Y, Z: obj.Data.Z, File: obj.Data.File, } if err := ps.SaveFromGateway2(); err != nil { logrus.Errorf("保存设置预置位信息失败:%s", err.Error()) logrus.Debugf("预置位信息:%v", ps) } }