| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118 |
- package controllers
- import (
- "context"
- "sync"
- "time"
- "github.com/astaxie/beego"
- "lc/common/mqtt"
- )
- //海蓝灯控的mqtt
- var _mqtthandleroncehl sync.Once
- var _mqtthandlersinglehl *MqttHandler
- func GetMqttHandlerHL() *MqttHandler {
- _mqtthandleroncehl.Do(func() {
- _mqtthandlersinglehl = NewMqttHandler(beego.AppConfig.String("hlmqtt.server"), beego.AppConfig.String("hlmqtt.user"),
- beego.AppConfig.String("hlmqtt.password"), 3000)
- })
- return _mqtthandlersinglehl
- }
- type MqttHandlerhl struct {
- mqtt *mqtt.Client
- mu sync.Mutex
- mapTopics map[string]mqtt.QOS
- Timeout uint //超时时间,毫秒为单位
- }
- func NewMqttHandlerhl(server, user, password string, timeout uint) *MqttHandlerhl {
- o := MqttHandlerhl{
- mapTopics: make(map[string]mqtt.QOS),
- Timeout: timeout,
- }
- client, err := mqtt.NewClient(mqtt.ClientOptions{
- Servers: []string{server},
- ClientID: beego.AppConfig.String("mqtt.id"),
- Username: user,
- Password: password,
- AutoReconnect: true,
- }, &o)
- if client == nil || err != nil {
- beego.Error("未配置 MQTT Server")
- panic("未配置 MQTT Server")
- return nil
- }
- o.mqtt = client
- err = client.Connect(o.Ctx())
- return &o
- }
- func (o *MqttHandlerhl) ConnectionLostHandler(err error) {
- beego.Error("MqttHandlerhl.ConnectionLostHandler:MQTT连接已经断开,原因:", err)
- }
- func (o *MqttHandlerhl) OnConnectHandler() {
- beego.Debug("MqttHandlerhl.OnConnectHandler:MQTT连接成功")
- //连接成功则订阅主题
- for k, v := range o.mapTopics {
- o.Subscribe(k, v)
- }
- }
- func (o *MqttHandlerhl) GetWill() (string, string) {
- return "", ""
- }
- func (o *MqttHandlerhl) Connect() error {
- if !o.mqtt.IsConnected() {
- return o.mqtt.Connect(o.Ctx())
- }
- return nil
- }
- func (o *MqttHandlerhl) IsConnected() bool {
- return o.mqtt.IsConnected()
- }
- func (o *MqttHandlerhl) Publish(topic string, payload []byte, qos mqtt.QOS) error {
- return o.mqtt.Publish(o.Ctx(), topic, payload, qos)
- }
- func (o *MqttHandlerhl) PublishString(topic string, payload string, qos mqtt.QOS) error {
- return o.mqtt.PublishString(o.Ctx(), topic, payload, qos)
- }
- func (o *MqttHandlerhl) PublishJSON(topic string, payload interface{}, qos mqtt.QOS) error {
- return o.mqtt.PublishJSON(o.Ctx(), topic, payload, qos)
- }
- func (o *MqttHandlerhl) Subscribe(topic string, qos mqtt.QOS) error {
- o.mu.Lock()
- defer o.mu.Unlock()
- if _, ok := o.mapTopics[topic]; !ok {
- o.mapTopics[topic] = qos
- }
- return o.mqtt.Subscribe(o.Ctx(), topic, qos)
- }
- func (o *MqttHandlerhl) Unsubscribe(ctx context.Context, topic string) error {
- o.mu.Lock()
- defer o.mu.Unlock()
- if _, ok := o.mapTopics[topic]; ok {
- delete(o.mapTopics, topic)
- }
- return o.mqtt.Unsubscribe(o.Ctx(), topic)
- }
- func (o *MqttHandlerhl) Handle(topic string, handler mqtt.MessageHandler) mqtt.Route {
- return o.mqtt.Handle(topic, handler)
- }
- func (o *MqttHandlerhl) Ctx() context.Context {
- ctx, _ := context.WithTimeout(context.Background(), time.Millisecond*time.Duration(o.Timeout))
- return ctx
- }
|