mqttmgr.go 1.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364
  1. package main
  2. import (
  3. "lc/common/mqtt"
  4. "sync"
  5. )
  6. var _mqttMgronce sync.Once
  7. var _mqttMgrsingle *MQTTMgr
  8. func GetMQTTMgr() *MQTTMgr {
  9. _mqttMgronce.Do(func() {
  10. _mqttMgrsingle = _newMQTTMgr()
  11. })
  12. return _mqttMgrsingle
  13. }
  14. type MQTTMgr struct {
  15. FromCloud *MqttClient
  16. ToCloud *MqttClient
  17. }
  18. func _newMQTTMgr() *MQTTMgr {
  19. return &MQTTMgr{
  20. FromCloud: NewMqttClient(FromMQTTConfig.Mqtt_Server,
  21. FromMQTTConfig.Mqtt_Id,
  22. FromMQTTConfig.Mqtt_User,
  23. FromMQTTConfig.Mqtt_Password,
  24. 3000, &EmptyMqttOnline{}),
  25. ToCloud: NewMqttClient(ToMQTTConfig.Mqtt_Server,
  26. ToMQTTConfig.Mqtt_Id,
  27. ToMQTTConfig.Mqtt_User,
  28. ToMQTTConfig.Mqtt_Password,
  29. 3000, &EmptyMqttOnline{}),
  30. }
  31. }
  32. func (o *MQTTMgr) Subscribe(topic string, qos mqtt.QOS, handler mqtt.MessageHandler) {
  33. o.FromCloud.Handle(topic, handler)
  34. err := o.FromCloud.Subscribe(topic, qos)
  35. if err != nil {
  36. return
  37. }
  38. }
  39. func (o *MQTTMgr) UnSubscribe(topic string) {
  40. err := o.FromCloud.Unsubscribe(topic)
  41. if err != nil {
  42. return
  43. }
  44. }
  45. func (o *MQTTMgr) Publish(topic string, payload string, qos mqtt.QOS) error {
  46. return o.ToCloud.PublishString(topic, payload, qos)
  47. }
  48. func (o *MQTTMgr) HandlerData(m mqtt.Message) {
  49. topic := m.Topic()
  50. //newTopic := strings.ReplaceAll(topic, "000000", "100000")
  51. newTopic := topic
  52. err := o.Publish(newTopic, m.PayloadString(), m.QOS())
  53. if err != nil {
  54. return
  55. }
  56. }