| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217 |
- package main
- import (
- "context"
- "runtime/debug"
- "sync"
- "time"
- "github.com/sirupsen/logrus"
- "lc/common/mqtt"
- "lc/common/protocol"
- "lc/common/util"
- )
- var _onceITSDeviceMgr sync.Once
- var _singleITSDeviceMgr *ITSDeviceMgr
- func GetITSDeviceMgr() *ITSDeviceMgr {
- _onceITSDeviceMgr.Do(func() {
- ctx, cancel := context.WithCancel(context.Background())
- _singleITSDeviceMgr = &ITSDeviceMgr{
- Queue: util.NewQueue(2000),
- MapITSDevice: make(map[string]*ITSDevice),
- ctx: ctx,
- cancel: cancel,
- downQueue: util.NewQueue(100),
- mapTopicHandle: make(map[string]func(m mqtt.Message)),
- }
- })
- return _singleITSDeviceMgr
- }
- type ITSDeviceMgr struct {
- Queue *util.MlQueue //数据队列,来自SDK
- MapITSDevice map[string]*ITSDevice //TollgateID->ITSDevice
- logouts []string //退出登陆的userid
- logoutsmu sync.Mutex
- ctx context.Context
- cancel context.CancelFunc
- downQueue *util.MlQueue
- mapTopicHandle map[string]func(m mqtt.Message)
- }
- func (o *ITSDeviceMgr) LoadITSDevice() {
- for _, v := range itsConfig.Its {
- o.MapITSDevice[v.TollgateID] = NewITSDevice(v.TollgateID, v.ID, v.User, v.Password, v.IP, v.Port)
- }
- o.mapTopicHandle[GetTopic(protocol.DT_GATEWAY, appConfig.GID, protocol.TP_GW_ITS)] = o.HandleTpGwIts
- GetMQTTMgr().Subscribe(GetTopic(protocol.DT_GATEWAY, appConfig.GID, protocol.TP_GW_ITS), mqtt.AtMostOnce, o.HandleMessage, ToAll)
- }
- func (o *ITSDeviceMgr) HandleMessage(m mqtt.Message) {
- o.downQueue.Put(m)
- }
- // HandleTpGwIts 上报配置信息
- func (o *ITSDeviceMgr) HandleTpGwIts(m mqtt.Message) {
- var obj protocol.Pack_IDObject
- if err := obj.DeCode(m.PayloadString()); err != nil {
- return
- }
- if obj.Gid != appConfig.GID {
- return
- }
- var ret protocol.Pack_ITSDev
- if str, err := ret.EnCode(appConfig.GID, obj.Seq, &itsConfig); err == nil {
- GetMQTTMgr().Publish(GetTopic(protocol.DT_GATEWAY, appConfig.GID, protocol.TP_GW_ITS_ACK), str, 0, ToCloud)
- }
- }
- func (o *ITSDeviceMgr) CheckOnline(dev *ITSDevice, online uint8) {
- var obj protocol.Pack_IPCState
- if str, err := obj.EnCode(dev.CamID, appConfig.GID, GetNextUint64(), util.MlNow(), online); err == nil {
- GetMQTTMgr().Publish(GetTopic(protocol.DT_ITS, dev.CamID, protocol.TP_ITSDEV_STATE), str, mqtt.AtMostOnce, ToCloud)
- }
- }
- func (o *ITSDeviceMgr) CheckLogin() {
- for _, v := range o.MapITSDevice {
- if v.UserID != "" {
- o.CheckOnline(v, protocol.SUCCESS)
- continue
- }
- if err := v.Login(); err == nil {
- logrus.Infof("抓拍单元[%s]SDK登陆成功", v.CamID)
- if bdi, err := v.GetDeviceStatus(); err == nil {
- logrus.Infof("获取抓拍单元[%s]信息成功,信息内容:%v", v.CamID, bdi)
- } else {
- logrus.Errorf("获取抓拍单元[%s]信息失败:%s", v.CamID, err.Error())
- }
- if err := v.SetPicStreamDataCallback(); err != nil {
- logrus.Errorf("抓拍单元[%s]执行SetPicStreamDataCallback失败:%s", v.CamID, err.Error())
- }
- //if err := v.Reboot(); err != nil {
- // logrus.Errorf("设备[%s]重启失败:%s", err.Error())
- //}
- v.Reconn = 0
- o.CheckOnline(v, protocol.SUCCESS)
- } else {
- v.Reconn++
- o.CheckOnline(v, protocol.FAILED)
- logrus.Errorf("抓拍单元[%s]SDK登陆失败:%s,已重连%d次", v.CamID, err.Error(), v.Reconn)
- }
- }
- }
- func (o *ITSDeviceMgr) HandleLogout() {
- o.logoutsmu.Lock()
- defer o.logoutsmu.Unlock()
- for _, v := range o.logouts {
- for _, dev := range o.MapITSDevice {
- if v == dev.UserID {
- dev.StopMediaStream()
- dev.UserID = ""
- logrus.Warnf("%s logout.", dev.CamID)
- break
- }
- }
- }
- o.logouts = nil
- }
- func (o *ITSDeviceMgr) NotifyLogout(userid string) {
- o.logoutsmu.Lock()
- defer o.logoutsmu.Unlock()
- o.logouts = append(o.logouts, userid)
- }
- func (o *ITSDeviceMgr) PushData(data *tagVehicleInfo) {
- o.Queue.Put(data)
- }
- func (o *ITSDeviceMgr) HandleData(data *tagVehicleInfo) {
- s, ok := o.MapITSDevice[data.TollgateID]
- if ok {
- data.CamID = s.CamID
- s.ToStatic(data)
- }
- GetWebSvr().Update(data)
- if itsConfig.RemotePub > 0 {
- o.RemotePub(data)
- }
- }
- func (o *ITSDeviceMgr) SendData(t time.Time) {
- for _, v := range o.MapITSDevice {
- v.SendStatic(t)
- }
- }
- func (o *ITSDeviceMgr) RemotePub(data *tagVehicleInfo) {
- var obj protocol.Pack_VehicleSpeed
- vs := protocol.VehicleSpeed{
- Plate: data.VehiclePlate,
- Time: protocol.BJTime(data.PassTime),
- Type: data.VehicleType,
- Speed: data.VehicleSpeed,
- }
- if str, err := obj.EnCode(data.CamID, appConfig.GID, GetNextUint64(), &vs); err == nil {
- topic := GetTopic(protocol.DT_ITS, data.CamID, protocol.TP_ITS_VEHICLESPEED)
- GetMQTTMgr().Publish(topic, str, 0, ToCloud)
- }
- }
- func (o *ITSDeviceMgr) Handle(args ...interface{}) interface{} {
- defer func() {
- if err := recover(); err != nil {
- logrus.Errorf("ITSDeviceMgr.HandleData发生异常:%v", err)
- logrus.Errorf("ITSDeviceMgr.HandleData发生异常,堆栈信息:%s", string(debug.Stack()))
- gopool.Add(o.Handle, args)
- }
- }()
- o.CheckLogin()
- timer := time.NewTicker(1 * time.Minute)
- currHour := util.New(util.MlNow()).BeginningOfHour() //data time
- LastHour := currHour.Add(1 * time.Hour) //send time
- exit := false
- for {
- select {
- case <-o.ctx.Done():
- exit = true
- case <-timer.C:
- o.HandleLogout()
- o.CheckLogin()
- default:
- if util.MlNow().After(LastHour) {
- o.SendData(currHour)
- currHour = currHour.Add(1 * time.Hour)
- LastHour = currHour.Add(1 * time.Hour)
- }
- //从队列钟获取指令执行
- if m, ok, _ := o.downQueue.Get(); ok {
- if mm, ok := m.(mqtt.Message); ok {
- if fn, ok := o.mapTopicHandle[mm.Topic()]; ok {
- fn(mm)
- } else {
- logrus.Errorf("ITSDeviceMgr.Handle:不支持的主题:%s", mm.Topic())
- }
- }
- }
- if v, ok, _ := o.Queue.Get(); ok {
- if vi, ok := v.(*tagVehicleInfo); ok {
- o.HandleData(vi)
- }
- } else {
- if exit {
- logrus.Warnf("ITSDeviceMgr.HandleData退出,原因:%v", o.ctx.Err())
- return 0
- }
- time.Sleep(50 * time.Millisecond)
- }
- }
- }
- }
|