package controllers import ( "context" "sync" "time" "github.com/astaxie/beego" "lc/common/mqtt" ) //海蓝灯控的mqtt var _mqtthandleroncehl sync.Once var _mqtthandlersinglehl *MqttHandler func GetMqttHandlerHL() *MqttHandler { _mqtthandleroncehl.Do(func() { _mqtthandlersinglehl = NewMqttHandler(beego.AppConfig.String("hlmqtt.server"), beego.AppConfig.String("hlmqtt.user"), beego.AppConfig.String("hlmqtt.password"), 3000) }) return _mqtthandlersinglehl } type MqttHandlerhl struct { mqtt *mqtt.Client mu sync.Mutex mapTopics map[string]mqtt.QOS Timeout uint //超时时间,毫秒为单位 } func NewMqttHandlerhl(server, user, password string, timeout uint) *MqttHandlerhl { o := MqttHandlerhl{ mapTopics: make(map[string]mqtt.QOS), Timeout: timeout, } client, err := mqtt.NewClient(mqtt.ClientOptions{ Servers: []string{server}, ClientID: beego.AppConfig.String("mqtt.id"), Username: user, Password: password, AutoReconnect: true, }, &o) if client == nil || err != nil { beego.Error("未配置 MQTT Server") panic("未配置 MQTT Server") return nil } o.mqtt = client err = client.Connect(o.Ctx()) return &o } func (o *MqttHandlerhl) ConnectionLostHandler(err error) { beego.Error("MqttHandlerhl.ConnectionLostHandler:MQTT连接已经断开,原因:", err) } func (o *MqttHandlerhl) OnConnectHandler() { beego.Debug("MqttHandlerhl.OnConnectHandler:MQTT连接成功") //连接成功则订阅主题 for k, v := range o.mapTopics { o.Subscribe(k, v) } } func (o *MqttHandlerhl) GetWill() (string, string) { return "", "" } func (o *MqttHandlerhl) Connect() error { if !o.mqtt.IsConnected() { return o.mqtt.Connect(o.Ctx()) } return nil } func (o *MqttHandlerhl) IsConnected() bool { return o.mqtt.IsConnected() } func (o *MqttHandlerhl) Publish(topic string, payload []byte, qos mqtt.QOS) error { return o.mqtt.Publish(o.Ctx(), topic, payload, qos) } func (o *MqttHandlerhl) PublishString(topic string, payload string, qos mqtt.QOS) error { return o.mqtt.PublishString(o.Ctx(), topic, payload, qos) } func (o *MqttHandlerhl) PublishJSON(topic string, payload interface{}, qos mqtt.QOS) error { return o.mqtt.PublishJSON(o.Ctx(), topic, payload, qos) } func (o *MqttHandlerhl) Subscribe(topic string, qos mqtt.QOS) error { o.mu.Lock() defer o.mu.Unlock() if _, ok := o.mapTopics[topic]; !ok { o.mapTopics[topic] = qos } return o.mqtt.Subscribe(o.Ctx(), topic, qos) } func (o *MqttHandlerhl) Unsubscribe(ctx context.Context, topic string) error { o.mu.Lock() defer o.mu.Unlock() if _, ok := o.mapTopics[topic]; ok { delete(o.mapTopics, topic) } return o.mqtt.Unsubscribe(o.Ctx(), topic) } func (o *MqttHandlerhl) Handle(topic string, handler mqtt.MessageHandler) mqtt.Route { return o.mqtt.Handle(topic, handler) } func (o *MqttHandlerhl) Ctx() context.Context { ctx, _ := context.WithTimeout(context.Background(), time.Millisecond*time.Duration(o.Timeout)) return ctx }