| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246 |
- package main
- import (
- "runtime"
- "runtime/debug"
- "sync"
- "time"
- "github.com/sirupsen/logrus"
- "lc/common/models"
- "lc/common/mqtt"
- "lc/common/protocol"
- "lc/common/util"
- )
- var _IpcMgrOnce sync.Once
- var _IpcMgrSingle *IPCMgr
- func GetIPCMgr() *IPCMgr {
- _IpcMgrOnce.Do(func() {
- _IpcMgrSingle = &IPCMgr{
- queue: util.NewQueue(10000),
- //mapSosAlarm: make(map[string]int64),
- mapIpcState: make(map[string]*StateInfo),
- }
- })
- return _IpcMgrSingle
- }
- type StateInfo struct {
- Time time.Time
- State uint8
- }
- type IPCMgr struct {
- queue *util.MlQueue
- //mapSosAlarm map[string]int64 //主题到数据库表记录id
- mapIpcState map[string]*StateInfo ////0在线,1离线
- }
- func (o *IPCMgr) SubscribeTopics() {
- GetMQTTMgr().Subscribe(GetVagueTopic(protocol.DT_IPC, protocol.TP_ONVIF_ALARM), mqtt.AtMostOnce, o.HandlerData)
- GetMQTTMgr().Subscribe(GetVagueTopic(protocol.DT_IPC, protocol.TP_ONVIF_STATE), mqtt.AtMostOnce, o.HandlerData)
- GetMQTTMgr().Subscribe(GetVagueTopic(protocol.DT_IPC, protocol.TP_ONVIF_PRESETS_ACK), mqtt.AtMostOnce, o.HandlerData)
- GetMQTTMgr().Subscribe(GetVagueTopic(protocol.DT_IPC, protocol.TP_ONVIF_PRESET_ACK), mqtt.AtMostOnce, o.HandlerData)
- }
- func (o *IPCMgr) Handler(args ...interface{}) interface{} {
- defer func() {
- if err := recover(); err != nil {
- gopool.Add(o.Handler, args)
- logrus.Errorf("IPCMgr.Handler发生异常:%v", err)
- logrus.Errorf("IPCMgr.Handler发生异常:%s", string(debug.Stack()))
- }
- }()
- timer := time.NewTicker(time.Duration(CheckOfflineInterval) * time.Minute)
- for {
- select {
- case <-timer.C: //每隔5分钟执行一次
- o.UpdateState()
- default:
- msg, ok, quantity := o.queue.Get()
- if !ok {
- time.Sleep(100 * time.Millisecond)
- continue
- } else if quantity > 1000 {
- logrus.Warnf("IPCMgr.Handler:数据队列累积过多,请注意优化,当前队列条数:%d", quantity)
- }
- m, ok := msg.(*mqtt.Message)
- if !ok {
- continue
- }
- _, _, _, topic, err := ParseTopic(m.Topic())
- if err != nil {
- continue
- }
- switch topic {
- case protocol.TP_ONVIF_STATE:
- o.HandlerState(m)
- case protocol.TP_ONVIF_ALARM:
- o.HandlerAlarm(m)
- case protocol.TP_ONVIF_PRESETS_ACK:
- o.HandlerGetPreset(m)
- case protocol.TP_ONVIF_PRESET_ACK:
- o.HandlerSetPreset(m)
- default:
- logrus.Warnf("IPCMgr.Handler:收到暂不支持的主题:%s", topic)
- }
- }
- }
- }
- func (o *IPCMgr) HandlerData(m mqtt.Message) {
- for {
- ok, cnt := o.queue.Put(&m)
- if ok {
- break
- } else {
- logrus.Errorf("IPCMgr.HandlerData:查询队列失败,队列消息数量:%d", cnt)
- runtime.Gosched()
- }
- }
- }
- func (o *IPCMgr) HandlerAlarm(m *mqtt.Message) {
- var obj protocol.Pack_OnvifAlarm
- if err := obj.DeCode(m.PayloadString()); err != nil {
- logrus.Errorf("数据解析失败,主题:%s,内容:%s,失败原因:%s", m.Topic(), m.PayloadString(), err.Error())
- return
- }
- t, err := util.MlParseTime(obj.Time)
- if err != nil {
- logrus.Errorf("时间[%s]解析错误:%s", obj.Time, err.Error())
- return
- }
- if obj.Data.Alarm.AlarmTopic == "tns1:RuleEngine/LineDetector/Crossed" {
- oo := models.IPCAlarm{
- DID: obj.Id,
- TStart: t,
- TEnd: t,
- AType: "Crossed",
- Content: "遮挡告警",
- }
- if err := models.G_db.Create(&oo).Error; err != nil {
- logrus.Errorf("遮挡告警数据:%v", obj)
- logrus.Errorf("遮挡告警数据入库失败:%s", err.Error())
- }
- }
- }
- func (o *IPCMgr) HandlerState(m *mqtt.Message) {
- var obj protocol.Pack_IPCState
- if err := obj.DeCode(m.PayloadString()); err != nil {
- logrus.Errorf("数据解析失败,主题:%s,内容:%s,失败原因:%s", m.Topic(), m.PayloadString(), err.Error())
- return
- }
- t, err := util.MlParseTime(obj.Time)
- if err != nil {
- logrus.Errorf("时间[%s]解析错误:%s", obj.Time, err.Error())
- return
- }
- //0在线,1离线
- si, ok := o.mapIpcState[obj.Id]
- if !ok {
- t, s, err := getState(obj.Id)
- if err != nil {
- cacheState(obj.Id, obj.Time, obj.Data.State)
- o.mapIpcState[obj.Id] = &StateInfo{Time: t, State: obj.Data.State}
- return
- } else {
- si = &StateInfo{Time: t, State: s}
- o.mapIpcState[obj.Id] = si
- }
- }
- if si.State != obj.Data.State {
- if obj.Data.State == 1 { //在线到离线
- GetEventMgr().PushEvent(&EventObject{ID: obj.Id, EventType: models.ET_OFFLINE, Time: t})
- } else { //离线到在线
- GetEventMgr().PushEvent(&EventObject{ID: obj.Id, EventType: models.ET_ONLINE, Time: t})
- }
- }
- cacheState(obj.Id, obj.Time, obj.Data.State)
- o.mapIpcState[obj.Id].State = obj.Data.State
- o.mapIpcState[obj.Id].Time = t
- }
- func (o *IPCMgr) UpdateState() {
- t := util.MlNow()
- for k, v := range o.mapIpcState {
- if v.State == 0 && t.Sub(v.Time).Minutes() > OfflineInterval { //只检查当前还在线的
- GetEventMgr().PushEvent(&EventObject{ID: k, EventType: models.ET_OFFLINE, Time: t})
- cacheState(k, t.Format("2006-01-02 15:04:05"), 1)
- o.mapIpcState[k].State = 1
- o.mapIpcState[k].Time = t
- }
- }
- }
- func (o *IPCMgr) HandlerGetPreset(m *mqtt.Message) {
- var obj protocol.Pack_PresetInfo
- if err := obj.DeCode(m.PayloadString()); err != nil {
- logrus.Errorf("数据解析失败,主题:%s,内容:%s,失败原因:%s", m.Topic(), m.PayloadString(), err.Error())
- return
- }
- if obj.Data.State == protocol.FAILED {
- logrus.Errorf("读取设备[%s]预置点失败:%s", obj.Id, obj.Data.Error)
- return
- }
- //所有预置点不存库,用于调试
- if obj.Data.Flag == 6 {
- logrus.Debugf("设备[%s]预置点:%v", obj.Id, obj.Data.Presets)
- return
- }
- if len(obj.Data.Presets) == 0 {
- return
- }
- t := util.MlNow()
- for _, v := range obj.Data.Presets {
- ps := models.IpcPresets{
- ID: obj.Id,
- Token: v.Token,
- Name: v.Name,
- X: v.X,
- Y: v.Y,
- Z: v.Z,
- CreatedAt: t,
- }
- if err := ps.SaveFromGateway(); err != nil {
- logrus.Errorf("保存设置预置位信息失败:%s", err.Error())
- logrus.Debugf("预置位信息:%v", ps)
- }
- }
- }
- func (o *IPCMgr) HandlerSetPreset(m *mqtt.Message) {
- var obj protocol.Pack_IPCSetPresetACK
- if err := obj.DeCode(m.PayloadString()); err != nil {
- logrus.Errorf("数据解析失败,主题:%s,内容:%s,失败原因:%s", m.Topic(), m.PayloadString(), err.Error())
- return
- }
- if obj.Data.State == protocol.FAILED {
- logrus.Errorf("对设备[%s]设置预置点失败:%s", obj.Id, obj.Data.Error)
- return
- }
- ps := models.IpcPresets{
- ID: obj.Id,
- Token: obj.Data.Token,
- Name: obj.Data.Name,
- X: obj.Data.X,
- Y: obj.Data.Y,
- Z: obj.Data.Z,
- File: obj.Data.File,
- }
- if err := ps.SaveFromGateway2(); err != nil {
- logrus.Errorf("保存设置预置位信息失败:%s", err.Error())
- logrus.Debugf("预置位信息:%v", ps)
- }
- }
|