mqttmgr.go 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180
  1. package main
  2. import (
  3. "runtime/debug"
  4. "sync"
  5. "time"
  6. "github.com/sirupsen/logrus"
  7. "lc/common/mqtt"
  8. "lc/common/util"
  9. )
  10. type OptType uint8
  11. const (
  12. ToAll OptType = 0 //发布和订阅边缘端与云端的消息
  13. ToCloud OptType = 1 //发布和订阅云端的消息
  14. ToEdge OptType = 2 //发布和订阅边缘端的消息
  15. )
  16. var _mqttMgronce sync.Once
  17. var _mqttMgrsingle *MQTTMgr
  18. func GetMQTTMgr() *MQTTMgr {
  19. _mqttMgronce.Do(func() {
  20. _mqttMgrsingle = _newMQTTMgr()
  21. })
  22. return _mqttMgrsingle
  23. }
  24. type MQTTMgr struct {
  25. Cloud *MqttClient
  26. Edge *MqttClient
  27. Queue *util.MlQueue
  28. }
  29. func _newMQTTMgr() *MQTTMgr {
  30. mgr := &MQTTMgr{
  31. Queue: util.NewQueue(2000),
  32. }
  33. if appConfig.Edge.Mqtt.Server != "" {
  34. mgr.Edge = NewMqttClient(appConfig.Edge.Mqtt.Server,
  35. appConfig.GID+"@"+appname+version,
  36. appConfig.Edge.Mqtt.User,
  37. appConfig.Edge.Mqtt.Password,
  38. appConfig.Edge.Mqtt.Timeout,
  39. &EmptyMqttOnline{})
  40. }
  41. if appConfig.Cloud.Mqtt.Server != "" {
  42. mgr.Cloud = NewMqttClient(appConfig.Cloud.Mqtt.Server,
  43. appConfig.GID+"@"+appname+version,
  44. appConfig.Cloud.Mqtt.User,
  45. appConfig.Cloud.Mqtt.Password,
  46. appConfig.Cloud.Mqtt.Timeout,
  47. &EmptyMqttOnline{})
  48. }
  49. return mgr
  50. }
  51. func (o *MQTTMgr) Subscribe(topic string, qos mqtt.QOS, handler mqtt.MessageHandler, tp OptType) {
  52. switch tp {
  53. case ToAll:
  54. if o.Cloud != nil {
  55. o.Cloud.Handle(topic, handler)
  56. o.Cloud.Subscribe(topic, qos)
  57. }
  58. if o.Edge != nil {
  59. o.Edge.Handle(topic, handler)
  60. o.Edge.Subscribe(topic, qos)
  61. }
  62. case ToCloud:
  63. if o.Cloud != nil {
  64. o.Cloud.Handle(topic, handler)
  65. o.Cloud.Subscribe(topic, qos)
  66. }
  67. case ToEdge:
  68. if o.Edge != nil {
  69. o.Edge.Handle(topic, handler)
  70. o.Edge.Subscribe(topic, qos)
  71. }
  72. }
  73. }
  74. func (o *MQTTMgr) UnSubscribe(topic string, tp OptType) {
  75. switch tp {
  76. case ToAll:
  77. if o.Cloud != nil {
  78. o.Cloud.Unsubscribe(topic)
  79. }
  80. if o.Edge != nil {
  81. o.Edge.Unsubscribe(topic)
  82. }
  83. case ToCloud:
  84. if o.Cloud != nil {
  85. o.Cloud.Unsubscribe(topic)
  86. }
  87. case ToEdge:
  88. if o.Edge != nil {
  89. o.Edge.Unsubscribe(topic)
  90. }
  91. }
  92. }
  93. func (o *MQTTMgr) Publish(topic string, payload string, qos mqtt.QOS, tp OptType) {
  94. msg := MQTTMessage{
  95. topic: topic,
  96. payload: payload,
  97. qos: qos,
  98. tp: tp,
  99. }
  100. o.Queue.Put(&msg)
  101. }
  102. func (o *MQTTMgr) _publish(msg *MQTTMessage) error {
  103. var err error
  104. switch msg.tp {
  105. case ToAll:
  106. if o.Cloud != nil {
  107. err = o.Cloud.PublishString(msg.topic, msg.payload, msg.qos)
  108. }
  109. if o.Edge != nil {
  110. o.Edge.PublishString(msg.topic, msg.payload, msg.qos)
  111. }
  112. case ToCloud:
  113. if o.Cloud != nil {
  114. err = o.Cloud.PublishString(msg.topic, msg.payload, msg.qos)
  115. }
  116. case ToEdge:
  117. if o.Edge != nil {
  118. o.Edge.PublishString(msg.topic, msg.payload, msg.qos)
  119. }
  120. }
  121. return err
  122. }
  123. func (o *MQTTMgr) MQTTConnectMgr(args ...interface{}) interface{} {
  124. for {
  125. time.Sleep(10 * time.Second)
  126. if o.Cloud != nil {
  127. o.Cloud.Connect()
  128. }
  129. if o.Edge != nil {
  130. o.Edge.Connect()
  131. }
  132. }
  133. }
  134. func (o *MQTTMgr) MQTTMessageHandle(args ...interface{}) interface{} {
  135. defer func() {
  136. if err := recover(); err != nil {
  137. logrus.Errorf("MQTTMgr.MQTTMessageHandle发生异常:%v", err)
  138. logrus.Errorf("MQTTMgr.MQTTMessageHandle发生异常,堆栈信息:%s", string(debug.Stack()))
  139. gopool.Add(o.MQTTMessageHandle, args)
  140. }
  141. }()
  142. var err error
  143. for {
  144. if m, ok, _ := o.Queue.Get(); ok {
  145. if msg, ok := m.(*MQTTMessage); ok {
  146. RETRY:
  147. err = o._publish(msg)
  148. if err != nil {
  149. logrus.Errorf("发布主题为%s的消息失败,原因:%s", msg.topic, err.Error())
  150. time.Sleep(time.Second)
  151. goto RETRY
  152. }
  153. }
  154. } else {
  155. time.Sleep(200 * time.Millisecond)
  156. }
  157. }
  158. }
  159. type MQTTMessage struct {
  160. topic string
  161. payload string
  162. qos mqtt.QOS
  163. tp OptType
  164. }