mqttmgr.go 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180
  1. package main
  2. import (
  3. "sync"
  4. "time"
  5. "lc/common/mqtt"
  6. "lc/common/util"
  7. )
  8. type OptType uint8
  9. const (
  10. ToAll OptType = 0 //发布和订阅边缘端与云端的消息
  11. ToCloud OptType = 1 //发布和订阅云端的消息
  12. ToEdge OptType = 2 //发布和订阅边缘端的消息
  13. )
  14. var mqttMgrOnce sync.Once
  15. var mqttMgrSingle *MQTTMgr
  16. // GetMQTTMgr 单态
  17. func GetMQTTMgr() *MQTTMgr {
  18. mqttMgrOnce.Do(func() {
  19. mqttMgrSingle = _newMQTTMgr()
  20. })
  21. return mqttMgrSingle
  22. }
  23. type MQTTMgr struct {
  24. Cloud *MqttClient
  25. Edge *MqttClient
  26. Queue *util.MlQueue
  27. }
  28. // 建两个client
  29. func _newMQTTMgr() *MQTTMgr {
  30. mgr := &MQTTMgr{
  31. Queue: util.NewQueue(2000),
  32. }
  33. if appConfig.Edge.Mqtt.Server != "" {
  34. mgr.Edge = NewMqttClient(appConfig.Edge.Mqtt.Server,
  35. appConfig.GID+"@"+appName+appVersion,
  36. appConfig.Edge.Mqtt.User,
  37. appConfig.Edge.Mqtt.Password,
  38. appConfig.Edge.Mqtt.Timeout,
  39. &EmptyMqttOnline{})
  40. }
  41. if appConfig.Cloud.Mqtt.Server != "" {
  42. mgr.Cloud = NewMqttClient(appConfig.Cloud.Mqtt.Server,
  43. appConfig.GID+"@"+appName+appVersion,
  44. appConfig.Cloud.Mqtt.User,
  45. appConfig.Cloud.Mqtt.Password,
  46. appConfig.Cloud.Mqtt.Timeout,
  47. &EmptyMqttOnline{})
  48. }
  49. return mgr
  50. }
  51. // Subscribe 定阅
  52. func (o *MQTTMgr) Subscribe(topic string, qos mqtt.QOS, handler mqtt.MessageHandler, tp OptType) {
  53. switch tp {
  54. case ToAll:
  55. if o.Cloud != nil {
  56. o.Cloud.Handle(topic, handler)
  57. o.Cloud.Subscribe(topic, qos)
  58. }
  59. if o.Edge != nil {
  60. o.Edge.Handle(topic, handler)
  61. o.Edge.Subscribe(topic, qos)
  62. }
  63. case ToCloud:
  64. if o.Cloud != nil {
  65. o.Cloud.Handle(topic, handler)
  66. o.Cloud.Subscribe(topic, qos)
  67. }
  68. case ToEdge:
  69. if o.Edge != nil {
  70. o.Edge.Handle(topic, handler)
  71. o.Edge.Subscribe(topic, qos)
  72. }
  73. }
  74. }
  75. // UnSubscribe 退定
  76. func (o *MQTTMgr) UnSubscribe(topic string, tp OptType) {
  77. switch tp {
  78. case ToAll:
  79. if o.Cloud != nil {
  80. o.Cloud.Unsubscribe(topic)
  81. }
  82. if o.Edge != nil {
  83. o.Edge.Unsubscribe(topic)
  84. }
  85. case ToCloud:
  86. if o.Cloud != nil {
  87. o.Cloud.Unsubscribe(topic)
  88. }
  89. case ToEdge:
  90. if o.Edge != nil {
  91. o.Edge.Unsubscribe(topic)
  92. }
  93. }
  94. }
  95. // Publish 发布进队列
  96. func (o *MQTTMgr) Publish(topic string, payload []byte, qos mqtt.QOS, tp OptType) {
  97. msg := MQTTMessage{
  98. topic: topic,
  99. payload: payload,
  100. qos: qos,
  101. tp: tp,
  102. }
  103. o.Queue.Put(&msg)
  104. }
  105. // 发布低
  106. func (o *MQTTMgr) _publish(msg *MQTTMessage) error {
  107. var err error
  108. switch msg.tp {
  109. case ToAll:
  110. if o.Cloud != nil {
  111. err = o.Cloud.Publish(msg.topic, msg.payload, msg.qos)
  112. }
  113. if o.Edge != nil {
  114. o.Edge.Publish(msg.topic, msg.payload, msg.qos)
  115. }
  116. case ToCloud:
  117. if o.Cloud != nil {
  118. err = o.Cloud.Publish(msg.topic, msg.payload, msg.qos)
  119. }
  120. case ToEdge:
  121. if o.Edge != nil {
  122. o.Edge.Publish(msg.topic, msg.payload, msg.qos)
  123. }
  124. }
  125. return err
  126. }
  127. // MQTTConnectMgr 连接保持
  128. func (o *MQTTMgr) MQTTConnectMgr(args ...interface{}) interface{} {
  129. for {
  130. time.Sleep(10 * time.Second)
  131. if o.Cloud != nil {
  132. o.Cloud.Connect()
  133. }
  134. if o.Edge != nil {
  135. o.Edge.Connect()
  136. }
  137. }
  138. }
  139. func (o *MQTTMgr) MQTTMessageHandle(args ...interface{}) interface{} {
  140. defer func() {
  141. recover()
  142. pool.Add(o.MQTTMessageHandle, args)
  143. }()
  144. var err error
  145. for { //队列中所有发布
  146. if m, ok, _ := o.Queue.Get(); ok {
  147. if msg, ok := m.(*MQTTMessage); ok {
  148. RETRY:
  149. err = o._publish(msg)
  150. if err != nil {
  151. time.Sleep(time.Second)
  152. goto RETRY
  153. }
  154. }
  155. } else {
  156. time.Sleep(200 * time.Millisecond)
  157. }
  158. }
  159. }
  160. type MQTTMessage struct {
  161. topic string
  162. payload []byte
  163. qos mqtt.QOS
  164. tp OptType
  165. }