package main import ( "context" "errors" "lc/common/mqtt" "lc/common/util" "net/url" "strings" "sync" "time" "lc/common/protocol" ) var _once sync.Once var _single *LcDeviceMgr // GetLcDeviceMgr 单例 func GetLcDeviceMgr() *LcDeviceMgr { _once.Do(func() { ctx, cancel := context.WithCancel(context.Background()) _single = &LcDeviceMgr{ ctx: ctx, cancel: cancel, downQueue: util.NewQueue(100), mapTopicHandle: make(map[string]func(m mqtt.Message)), mapLcDevice: make(map[string]*LcDevice), } }) return _single } type LcDeviceMgr struct { ctx context.Context cancel context.CancelFunc downQueue *util.MlQueue mapTopicHandle map[string]func(m mqtt.Message) rwMutex sync.RWMutex mapLcDevice map[string]*LcDevice } func (o *LcDeviceMgr) initAllLcDevice() error { if err := LoadOnvifDevConfig(); err != nil { return err } for _, v := range onvifDevConfig.Rtu { OnvifDev := v o.mapLcDevice[v.IP] = NewLcDevice(&OnvifDev) } o.mapTopicHandle[GetTopic(protocol.DT_GATEWAY, appConfig.GID, protocol.TP_GW_ONVIFDEV)] = o.HandleOnvifDev GetMQTTMgr().Subscribe(GetTopic(protocol.DT_GATEWAY, appConfig.GID, protocol.TP_GW_ONVIFDEV), mqtt.AtMostOnce, o.HandleMessage, ToAll) return nil } func (o *LcDeviceMgr) Update(id, XAddr string) error { //解析出IP var ip string if parse, err := url.Parse(XAddr); err == nil { ss := strings.Split(parse.Host, ":") if len(ss) > 0 { ip = ss[0] } } dev, ok := o.mapLcDevice[ip] if !ok { return errors.New("未找到该设备的配置") } dev.Update(XAddr) return nil } func (o *LcDeviceMgr) HandleOnvifDev(m mqtt.Message) { var obj protocol.Pack_IDObject if err := obj.DeCode(m.PayloadString()); err != nil { return } if obj.Gid != appConfig.GID { return } var ret protocol.Pack_OnvifDev if str, err := ret.EnCode(appConfig.GID, GetNextUint64(), onvifDevConfig.Rtu); err == nil { GetMQTTMgr().Publish(GetTopic(protocol.DT_GATEWAY, appConfig.GID, protocol.TP_GW_ONVIFDEV_ACK), str, 0, ToCloud) } } func (o *LcDeviceMgr) HandleMessage(m mqtt.Message) { o.downQueue.Put(m) } func (o *LcDeviceMgr) Handle(args ...interface{}) interface{} { defer func() { recover() go o.Handle(args) }() for { select { case <-o.ctx.Done(): return 0 default: //从队列钟获取指令执行 if m, ok, _ := o.downQueue.Get(); ok { if mm, ok := m.(mqtt.Message); ok { if fn, ok := o.mapTopicHandle[mm.Topic()]; ok { fn(mm) } } } else { time.Sleep(1 * time.Second) } } } }