mqttmgr.go 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177
  1. package main
  2. import (
  3. "sync"
  4. "time"
  5. "github.com/sirupsen/logrus"
  6. "lc/common/mqtt"
  7. "lc/common/util"
  8. )
  9. type OptType uint8
  10. const (
  11. ToAll OptType = 0 //发布和订阅边缘端与云端的消息
  12. ToCloud OptType = 1 //发布和订阅云端的消息
  13. ToEdge OptType = 2 //发布和订阅边缘端的消息
  14. )
  15. var _mqttMgrOnce sync.Once
  16. var mqttMgrSingle *MQTTMgr
  17. func GetMQTTMgr() *MQTTMgr {
  18. _mqttMgrOnce.Do(func() {
  19. mqttMgrSingle = _newMQTTMgr()
  20. })
  21. return mqttMgrSingle
  22. }
  23. type MQTTMgr struct {
  24. Cloud *MqttClient
  25. Edge *MqttClient
  26. Queue *util.MlQueue
  27. }
  28. func _newMQTTMgr() *MQTTMgr {
  29. mgr := &MQTTMgr{
  30. Queue: util.NewQueue(2000),
  31. }
  32. if appConfig.Edge.Mqtt.Server != "" {
  33. mgr.Edge = NewMqttClient(appConfig.Edge.Mqtt.Server,
  34. appConfig.GID+"@"+appName+version,
  35. appConfig.Edge.Mqtt.User,
  36. appConfig.Edge.Mqtt.Password,
  37. appConfig.Edge.Mqtt.Timeout,
  38. &EmptyMqttOnline{})
  39. }
  40. if appConfig.Cloud.Mqtt.Server != "" {
  41. mgr.Cloud = NewMqttClient(appConfig.Cloud.Mqtt.Server,
  42. appConfig.GID+"@"+appName+version,
  43. appConfig.Cloud.Mqtt.User,
  44. appConfig.Cloud.Mqtt.Password,
  45. appConfig.Cloud.Mqtt.Timeout,
  46. &MqttOnline{})
  47. }
  48. return mgr
  49. }
  50. func (o *MQTTMgr) Subscribe(topic string, qos mqtt.QOS, handler mqtt.MessageHandler, tp OptType) {
  51. switch tp {
  52. case ToAll:
  53. if o.Cloud != nil {
  54. o.Cloud.Handle(topic, handler)
  55. o.Cloud.Subscribe(topic, qos)
  56. }
  57. if o.Edge != nil {
  58. o.Edge.Handle(topic, handler)
  59. o.Edge.Subscribe(topic, qos)
  60. }
  61. case ToCloud:
  62. if o.Cloud != nil {
  63. o.Cloud.Handle(topic, handler)
  64. o.Cloud.Subscribe(topic, qos)
  65. }
  66. case ToEdge:
  67. if o.Edge != nil {
  68. o.Edge.Handle(topic, handler)
  69. o.Edge.Subscribe(topic, qos)
  70. }
  71. }
  72. }
  73. func (o *MQTTMgr) UnSubscribe(topic string, tp OptType) {
  74. switch tp {
  75. case ToAll:
  76. if o.Cloud != nil {
  77. o.Cloud.Unsubscribe(topic)
  78. }
  79. if o.Edge != nil {
  80. o.Edge.Unsubscribe(topic)
  81. }
  82. case ToCloud:
  83. if o.Cloud != nil {
  84. o.Cloud.Unsubscribe(topic)
  85. }
  86. case ToEdge:
  87. if o.Edge != nil {
  88. o.Edge.Unsubscribe(topic)
  89. }
  90. }
  91. }
  92. func (o *MQTTMgr) Publish(topic string, payload string, qos mqtt.QOS, tp OptType) {
  93. msg := MQTTMessage{
  94. topic: topic,
  95. payload: payload,
  96. qos: qos,
  97. tp: tp,
  98. }
  99. o.Queue.Put(&msg)
  100. }
  101. func (o *MQTTMgr) _publish(msg *MQTTMessage) error {
  102. var err error
  103. switch msg.tp {
  104. case ToAll:
  105. if o.Cloud != nil {
  106. err = o.Cloud.PublishString(msg.topic, msg.payload, msg.qos)
  107. }
  108. if o.Edge != nil {
  109. o.Edge.PublishString(msg.topic, msg.payload, msg.qos)
  110. }
  111. case ToCloud:
  112. if o.Cloud != nil {
  113. err = o.Cloud.PublishString(msg.topic, msg.payload, msg.qos)
  114. }
  115. case ToEdge:
  116. if o.Edge != nil {
  117. o.Edge.PublishString(msg.topic, msg.payload, msg.qos)
  118. }
  119. }
  120. return err
  121. }
  122. func (o *MQTTMgr) MQTTConnectMgr(args ...interface{}) interface{} {
  123. for {
  124. time.Sleep(10 * time.Second)
  125. if o.Cloud != nil {
  126. o.Cloud.Connect()
  127. }
  128. if o.Edge != nil {
  129. o.Edge.Connect()
  130. }
  131. }
  132. }
  133. func (o *MQTTMgr) MQTTMessageHandle(args ...interface{}) interface{} {
  134. defer func() {
  135. recover()
  136. pool.Add(o.MQTTMessageHandle, args)
  137. }()
  138. var err error
  139. for {
  140. if m, ok, _ := o.Queue.Get(); ok {
  141. if msg, ok := m.(*MQTTMessage); ok {
  142. RETRY:
  143. err = o._publish(msg)
  144. if err != nil {
  145. logrus.Errorf("发布主题为%s的消息失败,原因:%s", msg.topic, err.Error())
  146. time.Sleep(time.Second)
  147. goto RETRY
  148. }
  149. }
  150. } else {
  151. time.Sleep(200 * time.Millisecond)
  152. }
  153. }
  154. }
  155. type MQTTMessage struct {
  156. topic string
  157. payload string
  158. qos mqtt.QOS
  159. tp OptType
  160. }