devicemgr.go 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113
  1. package main
  2. import (
  3. "context"
  4. "errors"
  5. "lc/common/mqtt"
  6. "lc/common/util"
  7. "net/url"
  8. "strings"
  9. "sync"
  10. "time"
  11. "lc/common/protocol"
  12. )
  13. var _once sync.Once
  14. var _single *LcDeviceMgr
  15. // GetLcDeviceMgr 单例
  16. func GetLcDeviceMgr() *LcDeviceMgr {
  17. _once.Do(func() {
  18. ctx, cancel := context.WithCancel(context.Background())
  19. _single = &LcDeviceMgr{
  20. ctx: ctx,
  21. cancel: cancel,
  22. downQueue: util.NewQueue(100),
  23. mapTopicHandle: make(map[string]func(m mqtt.Message)),
  24. mapLcDevice: make(map[string]*LcDevice),
  25. }
  26. })
  27. return _single
  28. }
  29. type LcDeviceMgr struct {
  30. ctx context.Context
  31. cancel context.CancelFunc
  32. downQueue *util.MlQueue
  33. mapTopicHandle map[string]func(m mqtt.Message)
  34. rwMutex sync.RWMutex
  35. mapLcDevice map[string]*LcDevice
  36. }
  37. func (o *LcDeviceMgr) initAllLcDevice() error {
  38. if err := LoadOnvifDevConfig(); err != nil {
  39. return err
  40. }
  41. for _, v := range onvifDevConfig.Rtu {
  42. OnvifDev := v
  43. o.mapLcDevice[v.IP] = NewLcDevice(&OnvifDev)
  44. }
  45. o.mapTopicHandle[GetTopic(protocol.DT_GATEWAY, appConfig.GID, protocol.TP_GW_ONVIFDEV)] = o.HandleOnvifDev
  46. GetMQTTMgr().Subscribe(GetTopic(protocol.DT_GATEWAY, appConfig.GID, protocol.TP_GW_ONVIFDEV), mqtt.AtMostOnce, o.HandleMessage, ToAll)
  47. return nil
  48. }
  49. func (o *LcDeviceMgr) Update(id, XAddr string) error {
  50. //解析出IP
  51. var ip string
  52. if parse, err := url.Parse(XAddr); err == nil {
  53. ss := strings.Split(parse.Host, ":")
  54. if len(ss) > 0 {
  55. ip = ss[0]
  56. }
  57. }
  58. dev, ok := o.mapLcDevice[ip]
  59. if !ok {
  60. return errors.New("未找到该设备的配置")
  61. }
  62. dev.Update(XAddr)
  63. return nil
  64. }
  65. func (o *LcDeviceMgr) HandleOnvifDev(m mqtt.Message) {
  66. var obj protocol.Pack_IDObject
  67. if err := obj.DeCode(m.PayloadString()); err != nil {
  68. return
  69. }
  70. if obj.Gid != appConfig.GID {
  71. return
  72. }
  73. var ret protocol.Pack_OnvifDev
  74. if str, err := ret.EnCode(appConfig.GID, GetNextUint64(), onvifDevConfig.Rtu); err == nil {
  75. GetMQTTMgr().Publish(GetTopic(protocol.DT_GATEWAY, appConfig.GID, protocol.TP_GW_ONVIFDEV_ACK), str, 0, ToCloud)
  76. }
  77. }
  78. func (o *LcDeviceMgr) HandleMessage(m mqtt.Message) {
  79. o.downQueue.Put(m)
  80. }
  81. func (o *LcDeviceMgr) Handle(args ...interface{}) interface{} {
  82. defer func() {
  83. recover()
  84. go o.Handle(args)
  85. }()
  86. for {
  87. select {
  88. case <-o.ctx.Done():
  89. return 0
  90. default:
  91. //从队列钟获取指令执行
  92. if m, ok, _ := o.downQueue.Get(); ok {
  93. if mm, ok := m.(mqtt.Message); ok {
  94. if fn, ok := o.mapTopicHandle[mm.Topic()]; ok {
  95. fn(mm)
  96. }
  97. }
  98. } else {
  99. time.Sleep(1 * time.Second)
  100. }
  101. }
  102. }
  103. }