123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124 |
- package lc
- import (
- "runtime/debug"
- "sync"
- "time"
- "github.com/sirupsen/logrus"
- "lc-smartX/util"
- "lc-smartX/util/mqtt"
- )
- type OptType uint8
- const (
- ToCloud OptType = 1 //发布和订阅云端的消息
- )
- var _mqttMgronce sync.Once
- var _mqttMgrsingle *MQTTMgr
- // GetMQTTMgr 单态
- func GetMQTTMgr() *MQTTMgr {
- _mqttMgronce.Do(func() {
- _mqttMgrsingle = _newMQTTMgr()
- })
- return _mqttMgrsingle
- }
- type MQTTMgr struct {
- Cloud *MqttClient
- Queue *util.MlQueue
- }
- // 建两个client
- func _newMQTTMgr() *MQTTMgr {
- mgr := &MQTTMgr{
- Queue: util.NewQueue(2000),
- }
- mgr.Cloud = NewMqttClient(util.Config.Mqtt.Server,
- "demo_client",
- util.Config.Mqtt.User,
- util.Config.Mqtt.Password,
- util.Config.Mqtt.Timeout,
- &EmptyMqttOnline{})
- return mgr
- }
- // Subscribe 定阅
- func (o *MQTTMgr) Subscribe(topic string, qos mqtt.QOS, handler mqtt.MessageHandler, tp OptType) {
- if o.Cloud != nil {
- o.Cloud.Handle(topic, handler)
- o.Cloud.Subscribe(topic, qos)
- }
- }
- // UnSubscribe 退定
- func (o *MQTTMgr) UnSubscribe(topic string, tp OptType) {
- if o.Cloud != nil {
- o.Cloud.Unsubscribe(topic)
- }
- }
- // Publish 发布进队列
- func (o *MQTTMgr) Publish(topic string, payload []byte, qos mqtt.QOS, tp OptType) {
- msg := MQTTMessage{
- topic: topic,
- payload: payload,
- qos: qos,
- tp: tp,
- }
- o.Queue.Put(&msg)
- }
- // 发布低
- func (o *MQTTMgr) _publish(msg *MQTTMessage) error {
- var err error
- if o.Cloud != nil {
- err = o.Cloud.Publish(msg.topic, msg.payload, msg.qos)
- }
- return err
- }
- // MQTTConnectMgr 连接保持
- func (o *MQTTMgr) MQTTConnectMgr(args ...interface{}) interface{} {
- for {
- time.Sleep(10 * time.Second)
- if o.Cloud != nil {
- o.Cloud.Connect()
- }
- }
- }
- func (o *MQTTMgr) MQTTMessageHandle(args ...interface{}) interface{} {
- defer func() {
- if err := recover(); err != nil {
- logrus.Errorf("MQTTMgr.MQTTMessageHandle发生异常:%v", err)
- logrus.Errorf("MQTTMgr.MQTTMessageHandle发生异常,堆栈信息:%s", string(debug.Stack()))
- go o.MQTTMessageHandle(args)
- }
- }()
- var err error
- for { //队列中所有发布
- if m, ok, _ := o.Queue.Get(); ok {
- if msg, ok := m.(*MQTTMessage); ok {
- err = o._publish(msg)
- if err != nil {
- logrus.Errorf("发布主题为%s的消息失败,原因:%s", msg.topic, err.Error())
- }
- }
- } else {
- time.Sleep(200 * time.Millisecond)
- }
- }
- }
- type MQTTMessage struct {
- topic string
- payload []byte
- qos mqtt.QOS
- tp OptType
- }
|