package lc import ( "runtime/debug" "sync" "time" "github.com/sirupsen/logrus" "lc-smartX/util" "lc-smartX/util/mqtt" ) type OptType uint8 const ( ToCloud OptType = 1 //发布和订阅云端的消息 ) var _mqttMgronce sync.Once var _mqttMgrsingle *MQTTMgr // GetMQTTMgr 单态 func GetMQTTMgr() *MQTTMgr { _mqttMgronce.Do(func() { _mqttMgrsingle = _newMQTTMgr() }) return _mqttMgrsingle } type MQTTMgr struct { Cloud *MqttClient Queue *util.MlQueue } // 建两个client func _newMQTTMgr() *MQTTMgr { mgr := &MQTTMgr{ Queue: util.NewQueue(2000), } mgr.Cloud = NewMqttClient(util.Config.Mqtt.Server, "demo_client", util.Config.Mqtt.User, util.Config.Mqtt.Password, util.Config.Mqtt.Timeout, &EmptyMqttOnline{}) return mgr } // Subscribe 定阅 func (o *MQTTMgr) Subscribe(topic string, qos mqtt.QOS, handler mqtt.MessageHandler, tp OptType) { if o.Cloud != nil { o.Cloud.Handle(topic, handler) o.Cloud.Subscribe(topic, qos) } } // UnSubscribe 退定 func (o *MQTTMgr) UnSubscribe(topic string, tp OptType) { if o.Cloud != nil { o.Cloud.Unsubscribe(topic) } } // Publish 发布进队列 func (o *MQTTMgr) Publish(topic string, payload []byte, qos mqtt.QOS, tp OptType) { msg := MQTTMessage{ topic: topic, payload: payload, qos: qos, tp: tp, } o.Queue.Put(&msg) } // 发布低 func (o *MQTTMgr) _publish(msg *MQTTMessage) error { var err error if o.Cloud != nil { err = o.Cloud.Publish(msg.topic, msg.payload, msg.qos) } return err } // MQTTConnectMgr 连接保持 func (o *MQTTMgr) MQTTConnectMgr(args ...interface{}) interface{} { for { time.Sleep(10 * time.Second) if o.Cloud != nil { o.Cloud.Connect() } } } func (o *MQTTMgr) MQTTMessageHandle(args ...interface{}) interface{} { defer func() { if err := recover(); err != nil { logrus.Errorf("MQTTMgr.MQTTMessageHandle发生异常:%v", err) logrus.Errorf("MQTTMgr.MQTTMessageHandle发生异常,堆栈信息:%s", string(debug.Stack())) go o.MQTTMessageHandle(args) } }() var err error for { //队列中所有发布 if m, ok, _ := o.Queue.Get(); ok { if msg, ok := m.(*MQTTMessage); ok { err = o._publish(msg) if err != nil { logrus.Errorf("发布主题为%s的消息失败,原因:%s", msg.topic, err.Error()) } } } else { time.Sleep(200 * time.Millisecond) } } } type MQTTMessage struct { topic string payload []byte qos mqtt.QOS tp OptType }