package main import ( "context" "runtime/debug" "sync" "time" "github.com/sirupsen/logrus" "lc/common/mqtt" "lc/common/protocol" "lc/common/util" ) var _onceITSDeviceMgr sync.Once var _singleITSDeviceMgr *ITSDeviceMgr func GetITSDeviceMgr() *ITSDeviceMgr { _onceITSDeviceMgr.Do(func() { ctx, cancel := context.WithCancel(context.Background()) _singleITSDeviceMgr = &ITSDeviceMgr{ Queue: util.NewQueue(2000), MapITSDevice: make(map[string]*ITSDevice), ctx: ctx, cancel: cancel, downQueue: util.NewQueue(100), mapTopicHandle: make(map[string]func(m mqtt.Message)), } }) return _singleITSDeviceMgr } type ITSDeviceMgr struct { Queue *util.MlQueue //数据队列,来自SDK MapITSDevice map[string]*ITSDevice //TollgateID->ITSDevice logouts []string //退出登陆的userid logoutsmu sync.Mutex ctx context.Context cancel context.CancelFunc downQueue *util.MlQueue mapTopicHandle map[string]func(m mqtt.Message) } func (o *ITSDeviceMgr) LoadITSDevice() { for _, v := range itsConfig.Its { o.MapITSDevice[v.TollgateID] = NewITSDevice(v.TollgateID, v.ID, v.User, v.Password, v.IP, v.Port) } o.mapTopicHandle[GetTopic(protocol.DT_GATEWAY, appConfig.GID, protocol.TP_GW_ITS)] = o.HandleTpGwIts GetMQTTMgr().Subscribe(GetTopic(protocol.DT_GATEWAY, appConfig.GID, protocol.TP_GW_ITS), mqtt.AtMostOnce, o.HandleMessage, ToAll) } func (o *ITSDeviceMgr) HandleMessage(m mqtt.Message) { o.downQueue.Put(m) } // HandleTpGwIts 上报配置信息 func (o *ITSDeviceMgr) HandleTpGwIts(m mqtt.Message) { var obj protocol.Pack_IDObject if err := obj.DeCode(m.PayloadString()); err != nil { return } if obj.Gid != appConfig.GID { return } var ret protocol.Pack_ITSDev if str, err := ret.EnCode(appConfig.GID, obj.Seq, &itsConfig); err == nil { GetMQTTMgr().Publish(GetTopic(protocol.DT_GATEWAY, appConfig.GID, protocol.TP_GW_ITS_ACK), str, 0, ToCloud) } } func (o *ITSDeviceMgr) CheckOnline(dev *ITSDevice, online uint8) { var obj protocol.Pack_IPCState if str, err := obj.EnCode(dev.CamID, appConfig.GID, GetNextUint64(), util.MlNow(), online); err == nil { GetMQTTMgr().Publish(GetTopic(protocol.DT_ITS, dev.CamID, protocol.TP_ITSDEV_STATE), str, mqtt.AtMostOnce, ToCloud) } } func (o *ITSDeviceMgr) CheckLogin() { for _, v := range o.MapITSDevice { if v.UserID != "" { o.CheckOnline(v, protocol.SUCCESS) continue } if err := v.Login(); err == nil { logrus.Infof("抓拍单元[%s]SDK登陆成功", v.CamID) if bdi, err := v.GetDeviceStatus(); err == nil { logrus.Infof("获取抓拍单元[%s]信息成功,信息内容:%v", v.CamID, bdi) } else { logrus.Errorf("获取抓拍单元[%s]信息失败:%s", v.CamID, err.Error()) } if err := v.SetPicStreamDataCallback(); err != nil { logrus.Errorf("抓拍单元[%s]执行SetPicStreamDataCallback失败:%s", v.CamID, err.Error()) } //if err := v.Reboot(); err != nil { // logrus.Errorf("设备[%s]重启失败:%s", err.Error()) //} v.Reconn = 0 o.CheckOnline(v, protocol.SUCCESS) } else { v.Reconn++ o.CheckOnline(v, protocol.FAILED) logrus.Errorf("抓拍单元[%s]SDK登陆失败:%s,已重连%d次", v.CamID, err.Error(), v.Reconn) } } } func (o *ITSDeviceMgr) HandleLogout() { o.logoutsmu.Lock() defer o.logoutsmu.Unlock() for _, v := range o.logouts { for _, dev := range o.MapITSDevice { if v == dev.UserID { dev.StopMediaStream() dev.UserID = "" logrus.Warnf("%s logout.", dev.CamID) break } } } o.logouts = nil } func (o *ITSDeviceMgr) NotifyLogout(userid string) { o.logoutsmu.Lock() defer o.logoutsmu.Unlock() o.logouts = append(o.logouts, userid) } func (o *ITSDeviceMgr) PushData(data *tagVehicleInfo) { o.Queue.Put(data) } func (o *ITSDeviceMgr) HandleData(data *tagVehicleInfo) { s, ok := o.MapITSDevice[data.TollgateID] if ok { data.CamID = s.CamID s.ToStatic(data) } GetWebSvr().Update(data) if itsConfig.RemotePub > 0 { o.RemotePub(data) } } func (o *ITSDeviceMgr) SendData(t time.Time) { for _, v := range o.MapITSDevice { v.SendStatic(t) } } func (o *ITSDeviceMgr) RemotePub(data *tagVehicleInfo) { var obj protocol.Pack_VehicleSpeed vs := protocol.VehicleSpeed{ Plate: data.VehiclePlate, Time: protocol.BJTime(data.PassTime), Type: data.VehicleType, Speed: data.VehicleSpeed, } if str, err := obj.EnCode(data.CamID, appConfig.GID, GetNextUint64(), &vs); err == nil { topic := GetTopic(protocol.DT_ITS, data.CamID, protocol.TP_ITS_VEHICLESPEED) GetMQTTMgr().Publish(topic, str, 0, ToCloud) } } func (o *ITSDeviceMgr) Handle(args ...interface{}) interface{} { defer func() { if err := recover(); err != nil { logrus.Errorf("ITSDeviceMgr.HandleData发生异常:%v", err) logrus.Errorf("ITSDeviceMgr.HandleData发生异常,堆栈信息:%s", string(debug.Stack())) gopool.Add(o.Handle, args) } }() o.CheckLogin() timer := time.NewTicker(1 * time.Minute) currHour := util.New(util.MlNow()).BeginningOfHour() //data time LastHour := currHour.Add(1 * time.Hour) //send time exit := false for { select { case <-o.ctx.Done(): exit = true case <-timer.C: o.HandleLogout() o.CheckLogin() default: if util.MlNow().After(LastHour) { o.SendData(currHour) currHour = currHour.Add(1 * time.Hour) LastHour = currHour.Add(1 * time.Hour) } //从队列钟获取指令执行 if m, ok, _ := o.downQueue.Get(); ok { if mm, ok := m.(mqtt.Message); ok { if fn, ok := o.mapTopicHandle[mm.Topic()]; ok { fn(mm) } else { logrus.Errorf("ITSDeviceMgr.Handle:不支持的主题:%s", mm.Topic()) } } } if v, ok, _ := o.Queue.Get(); ok { if vi, ok := v.(*tagVehicleInfo); ok { o.HandleData(vi) } } else { if exit { logrus.Warnf("ITSDeviceMgr.HandleData退出,原因:%v", o.ctx.Err()) return 0 } time.Sleep(50 * time.Millisecond) } } } }