devicemgr.go 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123
  1. package main
  2. import (
  3. "context"
  4. "errors"
  5. "lc/common/mqtt"
  6. "lc/common/util"
  7. "net/url"
  8. "runtime/debug"
  9. "strings"
  10. "sync"
  11. "time"
  12. "github.com/sirupsen/logrus"
  13. "lc/common/protocol"
  14. )
  15. var _once sync.Once
  16. var _single *LcDeviceMgr
  17. // GetLcDeviceMgr 单例
  18. func GetLcDeviceMgr() *LcDeviceMgr {
  19. _once.Do(func() {
  20. ctx, cancel := context.WithCancel(context.Background())
  21. _single = &LcDeviceMgr{
  22. ctx: ctx,
  23. cancel: cancel,
  24. downQueue: util.NewQueue(100),
  25. mapTopicHandle: make(map[string]func(m mqtt.Message)),
  26. mapLcDevice: make(map[string]*LcDevice),
  27. }
  28. })
  29. return _single
  30. }
  31. type LcDeviceMgr struct {
  32. ctx context.Context
  33. cancel context.CancelFunc
  34. downQueue *util.MlQueue
  35. mapTopicHandle map[string]func(m mqtt.Message)
  36. wrlock sync.RWMutex
  37. mapLcDevice map[string]*LcDevice
  38. }
  39. func (o *LcDeviceMgr) initAllLcDevice() error {
  40. if err := LoadOnvifDevConfig(); err != nil {
  41. logrus.Errorf("加载配置文件失败:%s", err.Error())
  42. return err
  43. }
  44. for _, v := range onvifDevConfig.Rtu {
  45. OnvifDev := v
  46. o.mapLcDevice[v.IP] = NewLcDevice(&OnvifDev)
  47. }
  48. o.mapTopicHandle[GetTopic(protocol.DT_GATEWAY, appConfig.GID, protocol.TP_GW_ONVIFDEV)] = o.HandleTpGwOnvifdev
  49. GetMQTTMgr().Subscribe(GetTopic(protocol.DT_GATEWAY, appConfig.GID, protocol.TP_GW_ONVIFDEV), mqtt.AtMostOnce, o.HandleMessage, ToAll)
  50. return nil
  51. }
  52. func (o *LcDeviceMgr) Update(id, XAddr string) error {
  53. //解析出IP
  54. var ip string
  55. if url, err := url.Parse(XAddr); err == nil {
  56. ss := strings.Split(url.Host, ":")
  57. if len(ss) > 0 {
  58. ip = ss[0]
  59. }
  60. }
  61. dev, ok := o.mapLcDevice[ip]
  62. if !ok {
  63. return errors.New("未找到该设备的配置")
  64. }
  65. dev.Update(XAddr)
  66. return nil
  67. }
  68. func (o *LcDeviceMgr) HandleTpGwOnvifdev(m mqtt.Message) {
  69. var obj protocol.Pack_IDObject
  70. if err := obj.DeCode(m.PayloadString()); err != nil {
  71. return
  72. }
  73. if obj.Gid != appConfig.GID {
  74. return
  75. }
  76. var ret protocol.Pack_OnvifDev
  77. if str, err := ret.EnCode(appConfig.GID, GetNextUint64(), onvifDevConfig.Rtu); err == nil {
  78. GetMQTTMgr().Publish(GetTopic(protocol.DT_GATEWAY, appConfig.GID, protocol.TP_GW_ONVIFDEV_ACK), str, 0, ToCloud)
  79. }
  80. }
  81. func (o *LcDeviceMgr) HandleMessage(m mqtt.Message) {
  82. o.downQueue.Put(m)
  83. }
  84. func (o *LcDeviceMgr) Handle(args ...interface{}) interface{} {
  85. defer func() {
  86. if err := recover(); err != nil {
  87. logrus.Errorf("LcDeviceMgr.Handle发生异常:%v", err)
  88. logrus.Errorf("LcDeviceMgr.Handle发生异常,堆栈信息:%s", string(debug.Stack()))
  89. go o.Handle(args)
  90. }
  91. }()
  92. for {
  93. select {
  94. case <-o.ctx.Done():
  95. logrus.Errorf("LcDeviceMgr.Handle退出,原因:%v", o.ctx.Err())
  96. return 0
  97. default:
  98. //从队列钟获取指令执行
  99. if m, ok, _ := o.downQueue.Get(); ok {
  100. if mm, ok := m.(mqtt.Message); ok {
  101. if fn, ok := o.mapTopicHandle[mm.Topic()]; ok {
  102. fn(mm)
  103. } else {
  104. logrus.Errorf("LcDevice.Handle:不支持的主题:%s", mm.Topic())
  105. }
  106. }
  107. } else {
  108. time.Sleep(1 * time.Second)
  109. }
  110. }
  111. }
  112. }