| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123 |
- package main
- import (
- "context"
- "errors"
- "lc/common/mqtt"
- "lc/common/util"
- "net/url"
- "runtime/debug"
- "strings"
- "sync"
- "time"
- "github.com/sirupsen/logrus"
- "lc/common/protocol"
- )
- var _once sync.Once
- var _single *LcDeviceMgr
- // GetLcDeviceMgr 单例
- func GetLcDeviceMgr() *LcDeviceMgr {
- _once.Do(func() {
- ctx, cancel := context.WithCancel(context.Background())
- _single = &LcDeviceMgr{
- ctx: ctx,
- cancel: cancel,
- downQueue: util.NewQueue(100),
- mapTopicHandle: make(map[string]func(m mqtt.Message)),
- mapLcDevice: make(map[string]*LcDevice),
- }
- })
- return _single
- }
- type LcDeviceMgr struct {
- ctx context.Context
- cancel context.CancelFunc
- downQueue *util.MlQueue
- mapTopicHandle map[string]func(m mqtt.Message)
- wrlock sync.RWMutex
- mapLcDevice map[string]*LcDevice
- }
- func (o *LcDeviceMgr) initAllLcDevice() error {
- if err := LoadOnvifDevConfig(); err != nil {
- logrus.Errorf("加载配置文件失败:%s", err.Error())
- return err
- }
- for _, v := range onvifDevConfig.Rtu {
- OnvifDev := v
- o.mapLcDevice[v.IP] = NewLcDevice(&OnvifDev)
- }
- o.mapTopicHandle[GetTopic(protocol.DT_GATEWAY, appConfig.GID, protocol.TP_GW_ONVIFDEV)] = o.HandleTpGwOnvifdev
- GetMQTTMgr().Subscribe(GetTopic(protocol.DT_GATEWAY, appConfig.GID, protocol.TP_GW_ONVIFDEV), mqtt.AtMostOnce, o.HandleMessage, ToAll)
- return nil
- }
- func (o *LcDeviceMgr) Update(id, XAddr string) error {
- //解析出IP
- var ip string
- if url, err := url.Parse(XAddr); err == nil {
- ss := strings.Split(url.Host, ":")
- if len(ss) > 0 {
- ip = ss[0]
- }
- }
- dev, ok := o.mapLcDevice[ip]
- if !ok {
- return errors.New("未找到该设备的配置")
- }
- dev.Update(XAddr)
- return nil
- }
- func (o *LcDeviceMgr) HandleTpGwOnvifdev(m mqtt.Message) {
- var obj protocol.Pack_IDObject
- if err := obj.DeCode(m.PayloadString()); err != nil {
- return
- }
- if obj.Gid != appConfig.GID {
- return
- }
- var ret protocol.Pack_OnvifDev
- if str, err := ret.EnCode(appConfig.GID, GetNextUint64(), onvifDevConfig.Rtu); err == nil {
- GetMQTTMgr().Publish(GetTopic(protocol.DT_GATEWAY, appConfig.GID, protocol.TP_GW_ONVIFDEV_ACK), str, 0, ToCloud)
- }
- }
- func (o *LcDeviceMgr) HandleMessage(m mqtt.Message) {
- o.downQueue.Put(m)
- }
- func (o *LcDeviceMgr) Handle(args ...interface{}) interface{} {
- defer func() {
- if err := recover(); err != nil {
- logrus.Errorf("LcDeviceMgr.Handle发生异常:%v", err)
- logrus.Errorf("LcDeviceMgr.Handle发生异常,堆栈信息:%s", string(debug.Stack()))
- go o.Handle(args)
- }
- }()
- for {
- select {
- case <-o.ctx.Done():
- logrus.Errorf("LcDeviceMgr.Handle退出,原因:%v", o.ctx.Err())
- return 0
- default:
- //从队列钟获取指令执行
- if m, ok, _ := o.downQueue.Get(); ok {
- if mm, ok := m.(mqtt.Message); ok {
- if fn, ok := o.mapTopicHandle[mm.Topic()]; ok {
- fn(mm)
- } else {
- logrus.Errorf("LcDevice.Handle:不支持的主题:%s", mm.Topic())
- }
- }
- } else {
- time.Sleep(1 * time.Second)
- }
- }
- }
- }
|