package main import ( "sync" "time" "lc/common/mqtt" "lc/common/util" ) type OptType uint8 const ( ToAll OptType = 0 //发布和订阅边缘端与云端的消息 ToCloud OptType = 1 //发布和订阅云端的消息 ToEdge OptType = 2 //发布和订阅边缘端的消息 ) var mqttMgrOnce sync.Once var mqttMgrSingle *MQTTMgr // GetMQTTMgr 单态 func GetMQTTMgr() *MQTTMgr { mqttMgrOnce.Do(func() { mqttMgrSingle = _newMQTTMgr() }) return mqttMgrSingle } type MQTTMgr struct { Cloud *MqttClient Edge *MqttClient Queue *util.MlQueue } // 建两个client func _newMQTTMgr() *MQTTMgr { mgr := &MQTTMgr{ Queue: util.NewQueue(2000), } if appConfig.Edge.Mqtt.Server != "" { mgr.Edge = NewMqttClient(appConfig.Edge.Mqtt.Server, appConfig.GID+"@"+appName+appVersion, appConfig.Edge.Mqtt.User, appConfig.Edge.Mqtt.Password, appConfig.Edge.Mqtt.Timeout, &EmptyMqttOnline{}) } if appConfig.Cloud.Mqtt.Server != "" { mgr.Cloud = NewMqttClient(appConfig.Cloud.Mqtt.Server, appConfig.GID+"@"+appName+appVersion, appConfig.Cloud.Mqtt.User, appConfig.Cloud.Mqtt.Password, appConfig.Cloud.Mqtt.Timeout, &EmptyMqttOnline{}) } return mgr } // Subscribe 定阅 func (o *MQTTMgr) Subscribe(topic string, qos mqtt.QOS, handler mqtt.MessageHandler, tp OptType) { switch tp { case ToAll: if o.Cloud != nil { o.Cloud.Handle(topic, handler) o.Cloud.Subscribe(topic, qos) } if o.Edge != nil { o.Edge.Handle(topic, handler) o.Edge.Subscribe(topic, qos) } case ToCloud: if o.Cloud != nil { o.Cloud.Handle(topic, handler) o.Cloud.Subscribe(topic, qos) } case ToEdge: if o.Edge != nil { o.Edge.Handle(topic, handler) o.Edge.Subscribe(topic, qos) } } } // UnSubscribe 退定 func (o *MQTTMgr) UnSubscribe(topic string, tp OptType) { switch tp { case ToAll: if o.Cloud != nil { o.Cloud.Unsubscribe(topic) } if o.Edge != nil { o.Edge.Unsubscribe(topic) } case ToCloud: if o.Cloud != nil { o.Cloud.Unsubscribe(topic) } case ToEdge: if o.Edge != nil { o.Edge.Unsubscribe(topic) } } } // Publish 发布进队列 func (o *MQTTMgr) Publish(topic string, payload string, qos mqtt.QOS, tp OptType) { msg := MQTTMessage{ topic: topic, payload: payload, qos: qos, tp: tp, } o.Queue.Put(&msg) } // 发布低 func (o *MQTTMgr) _publish(msg *MQTTMessage) error { var err error switch msg.tp { case ToAll: if o.Cloud != nil { err = o.Cloud.PublishString(msg.topic, msg.payload, msg.qos) } if o.Edge != nil { o.Edge.PublishString(msg.topic, msg.payload, msg.qos) } case ToCloud: if o.Cloud != nil { err = o.Cloud.PublishString(msg.topic, msg.payload, msg.qos) } case ToEdge: if o.Edge != nil { o.Edge.PublishString(msg.topic, msg.payload, msg.qos) } } return err } // MQTTConnectMgr 连接保持 func (o *MQTTMgr) MQTTConnectMgr(args ...interface{}) interface{} { for { time.Sleep(10 * time.Second) if o.Cloud != nil { o.Cloud.Connect() } if o.Edge != nil { o.Edge.Connect() } } } func (o *MQTTMgr) MQTTMessageHandle(args ...interface{}) interface{} { defer func() { recover() pool.Add(o.MQTTMessageHandle, args) }() var err error for { //队列中所有发布 if m, ok, _ := o.Queue.Get(); ok { if msg, ok := m.(*MQTTMessage); ok { RETRY: err = o._publish(msg) if err != nil { time.Sleep(time.Second) goto RETRY } } } else { time.Sleep(200 * time.Millisecond) } } } type MQTTMessage struct { topic string payload string qos mqtt.QOS tp OptType }