| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183 |
- package main
- import (
- "runtime/debug"
- "sync"
- "time"
- "github.com/sirupsen/logrus"
- "lc/common/mqtt"
- "lc/common/util"
- )
- type OptType uint8
- const (
- ToAll OptType = 0 //发布和订阅边缘端与云端的消息
- ToCloud OptType = 1 //发布和订阅云端的消息
- ToEdge OptType = 2 //发布和订阅边缘端的消息
- )
- var _mqttMgronce sync.Once
- var _mqttMgrsingle *MQTTMgr
- func GetMQTTMgr() *MQTTMgr {
- _mqttMgronce.Do(func() {
- _mqttMgrsingle = _newMQTTMgr()
- })
- return _mqttMgrsingle
- }
- type MQTTMgr struct {
- Cloud *MqttClient
- Edge *MqttClient
- Queue *util.MlQueue
- }
- func _newMQTTMgr() *MQTTMgr {
- mgr := &MQTTMgr{
- Queue: util.NewQueue(2000),
- }
- if appConfig.Edge.Mqtt.Server != "" {
- mgr.Edge = NewMqttClient(appConfig.Edge.Mqtt.Server,
- appConfig.GID+"@"+appname+version,
- appConfig.Edge.Mqtt.User,
- appConfig.Edge.Mqtt.Password,
- appConfig.Edge.Mqtt.Timeout,
- &EmptyMqttOnline{})
- }
- if appConfig.Cloud.Mqtt.Server != "" {
- mgr.Cloud = NewMqttClient(appConfig.Cloud.Mqtt.Server,
- appConfig.GID+"@"+appname+version,
- appConfig.Cloud.Mqtt.User,
- appConfig.Cloud.Mqtt.Password,
- appConfig.Cloud.Mqtt.Timeout,
- &MqttOnline{})
- }
- return mgr
- }
- func (o *MQTTMgr) Subscribe(topic string, qos mqtt.QOS, handler mqtt.MessageHandler, tp OptType) {
- switch tp {
- case ToAll:
- if o.Cloud != nil {
- o.Cloud.Handle(topic, handler)
- o.Cloud.Subscribe(topic, qos)
- }
- if o.Edge != nil {
- o.Edge.Handle(topic, handler)
- o.Edge.Subscribe(topic, qos)
- }
- case ToCloud:
- if o.Cloud != nil {
- o.Cloud.Handle(topic, handler)
- o.Cloud.Subscribe(topic, qos)
- }
- case ToEdge:
- if o.Edge != nil {
- o.Edge.Handle(topic, handler)
- o.Edge.Subscribe(topic, qos)
- }
- }
- }
- func (o *MQTTMgr) UnSubscribe(topic string, tp OptType) {
- switch tp {
- case ToAll:
- if o.Cloud != nil {
- o.Cloud.Unsubscribe(topic)
- }
- if o.Edge != nil {
- o.Edge.Unsubscribe(topic)
- }
- case ToCloud:
- if o.Cloud != nil {
- o.Cloud.Unsubscribe(topic)
- }
- case ToEdge:
- if o.Edge != nil {
- o.Edge.Unsubscribe(topic)
- }
- }
- }
- func (o *MQTTMgr) Publish(topic string, payload string, qos mqtt.QOS, tp OptType) {
- msg := MQTTMessage{
- topic: topic,
- payload: payload,
- qos: qos,
- tp: tp,
- }
- o.Queue.Put(&msg)
- }
- func (o *MQTTMgr) _publish(msg *MQTTMessage) error {
- var err error
- switch msg.tp {
- case ToAll:
- if o.Cloud != nil {
- err = o.Cloud.PublishString(msg.topic, msg.payload, msg.qos)
- }
- if o.Edge != nil {
- o.Edge.PublishString(msg.topic, msg.payload, msg.qos)
- }
- case ToCloud:
- if o.Cloud != nil {
- err = o.Cloud.PublishString(msg.topic, msg.payload, msg.qos)
- }
- case ToEdge:
- if o.Edge != nil {
- o.Edge.PublishString(msg.topic, msg.payload, msg.qos)
- }
- }
- return err
- }
- func (o *MQTTMgr) MQTTConnectMgr(args ...interface{}) interface{} {
- for {
- time.Sleep(10 * time.Second)
- //GetMonitorStatus().MQTTConnectMgr = time.Now() //更新状态
- if o.Cloud != nil {
- o.Cloud.Connect()
- }
- if o.Edge != nil {
- o.Edge.Connect()
- }
- }
- }
- func (o *MQTTMgr) MQTTMessageHandle(args ...interface{}) interface{} {
- defer func() {
- if err := recover(); err != nil {
- logrus.Errorf("MQTTMgr.MQTTMessageHandle发生异常:%v", err)
- logrus.Errorf("MQTTMgr.MQTTMessageHandle发生异常,堆栈信息:%s", string(debug.Stack()))
- gopool.Add(o.MQTTMessageHandle, args)
- }
- }()
- var err error
- for {
- if m, ok, _ := o.Queue.Get(); ok {
- if msg, ok := m.(*MQTTMessage); ok {
- RETRY:
- err = o._publish(msg)
- if err != nil {
- logrus.Errorf("发布主题为%s的消息失败,原因:%s", msg.topic, err.Error())
- time.Sleep(time.Second)
- goto RETRY
- }
- }
- } else {
- time.Sleep(200 * time.Millisecond)
- //更新状态时间
- //GetMonitorStatus().MQTTMessageHandle = time.Now()
- }
- }
- }
- type MQTTMessage struct {
- topic string
- payload string
- qos mqtt.QOS
- tp OptType
- }
|