mqttmgr.go 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124
  1. package lc
  2. import (
  3. "runtime/debug"
  4. "sync"
  5. "time"
  6. "github.com/sirupsen/logrus"
  7. "lc-smartX/util"
  8. "lc-smartX/util/mqtt"
  9. )
  10. type OptType uint8
  11. const (
  12. ToCloud OptType = 1 //发布和订阅云端的消息
  13. )
  14. var _mqttMgronce sync.Once
  15. var _mqttMgrsingle *MQTTMgr
  16. // GetMQTTMgr 单态
  17. func GetMQTTMgr() *MQTTMgr {
  18. _mqttMgronce.Do(func() {
  19. _mqttMgrsingle = _newMQTTMgr()
  20. })
  21. return _mqttMgrsingle
  22. }
  23. type MQTTMgr struct {
  24. Cloud *MqttClient
  25. Queue *util.MlQueue
  26. }
  27. // 建两个client
  28. func _newMQTTMgr() *MQTTMgr {
  29. mgr := &MQTTMgr{
  30. Queue: util.NewQueue(2000),
  31. }
  32. mgr.Cloud = NewMqttClient(util.Config.Mqtt.Server,
  33. "demo_client",
  34. util.Config.Mqtt.User,
  35. util.Config.Mqtt.Password,
  36. util.Config.Mqtt.Timeout,
  37. &EmptyMqttOnline{})
  38. return mgr
  39. }
  40. // Subscribe 定阅
  41. func (o *MQTTMgr) Subscribe(topic string, qos mqtt.QOS, handler mqtt.MessageHandler, tp OptType) {
  42. if o.Cloud != nil {
  43. o.Cloud.Handle(topic, handler)
  44. o.Cloud.Subscribe(topic, qos)
  45. }
  46. }
  47. // UnSubscribe 退定
  48. func (o *MQTTMgr) UnSubscribe(topic string, tp OptType) {
  49. if o.Cloud != nil {
  50. o.Cloud.Unsubscribe(topic)
  51. }
  52. }
  53. // Publish 发布进队列
  54. func (o *MQTTMgr) Publish(topic string, payload []byte, qos mqtt.QOS, tp OptType) {
  55. msg := MQTTMessage{
  56. topic: topic,
  57. payload: payload,
  58. qos: qos,
  59. tp: tp,
  60. }
  61. o.Queue.Put(&msg)
  62. }
  63. // 发布低
  64. func (o *MQTTMgr) _publish(msg *MQTTMessage) error {
  65. var err error
  66. if o.Cloud != nil {
  67. err = o.Cloud.Publish(msg.topic, msg.payload, msg.qos)
  68. }
  69. return err
  70. }
  71. // MQTTConnectMgr 连接保持
  72. func (o *MQTTMgr) MQTTConnectMgr(args ...interface{}) interface{} {
  73. for {
  74. time.Sleep(10 * time.Second)
  75. if o.Cloud != nil {
  76. o.Cloud.Connect()
  77. }
  78. }
  79. }
  80. func (o *MQTTMgr) MQTTMessageHandle(args ...interface{}) interface{} {
  81. defer func() {
  82. if err := recover(); err != nil {
  83. logrus.Errorf("MQTTMgr.MQTTMessageHandle发生异常:%v", err)
  84. logrus.Errorf("MQTTMgr.MQTTMessageHandle发生异常,堆栈信息:%s", string(debug.Stack()))
  85. go o.MQTTMessageHandle(args)
  86. }
  87. }()
  88. var err error
  89. for { //队列中所有发布
  90. if m, ok, _ := o.Queue.Get(); ok {
  91. if msg, ok := m.(*MQTTMessage); ok {
  92. err = o._publish(msg)
  93. if err != nil {
  94. logrus.Errorf("发布主题为%s的消息失败,原因:%s", msg.topic, err.Error())
  95. }
  96. }
  97. } else {
  98. time.Sleep(200 * time.Millisecond)
  99. }
  100. }
  101. }
  102. type MQTTMessage struct {
  103. topic string
  104. payload []byte
  105. qos mqtt.QOS
  106. tp OptType
  107. }