package mqtt import ( "context" "fmt" "server/edge/util/logger" "sync" "time" ) type BaseMqttOnline interface { GetOnlineMsg() (string, string) GetWillMsg() (string, string) } type EmptyMqttOnline struct { } func (o *EmptyMqttOnline) GetOnlineMsg() (string, string) { return "", "" } func (o *EmptyMqttOnline) GetWillMsg() (string, string) { return "", "" } type MClient struct { mqtt *Client mu sync.Mutex //保护mapTopics mapTopics map[string]QOS //订阅的主题 timeout uint //超时时间,毫秒为单位 MqttOnline BaseMqttOnline //是否发布上线消息&遗嘱消息 } func NewMqttClient(server, clientId, user, password string, timeout uint, mqttOnline BaseMqttOnline) *MClient { o := MClient{ mapTopics: make(map[string]QOS), timeout: timeout, MqttOnline: mqttOnline, } client, err := NewClient(ClientOptions{ Servers: []string{server}, ClientID: clientId, Username: user, Password: password, AutoReconnect: true, }, &o) if err != nil { panic(fmt.Sprintf("MQTT错误: %s", err.Error())) return nil } o.mqtt = client err = client.Connect(o.Ctx()) return &o } func (o *MClient) ConnectionLostHandler(err error) { logger.Logger.Errorln("MClient.ConnectionLostHandler:MQTT连接已经断开,原因:", err) } func (o *MClient) OnConnectHandler() { logger.Logger.Infoln("MClient.OnConnectHandler:MQTT连接成功") //连接成功则订阅主题 for k, v := range o.mapTopics { err := o.Subscribe(k, v) if err != nil { return } } topic, str := o.MqttOnline.GetOnlineMsg() if topic != "" { err := o.PublishString(topic, str, 0) if err != nil { return } } } func (o *MClient) GetWill() (topic string, payload string) { return o.MqttOnline.GetWillMsg() } func (o *MClient) Connect() error { return o.mqtt.Connect(o.Ctx()) } func (o *MClient) IsConnected() bool { return o.mqtt.IsConnected() } func (o *MClient) Publish(topic string, payload interface{}, qos QOS) error { return o.mqtt.Publish(o.Ctx(), topic, payload, qos) } func (o *MClient) PublishString(topic string, payload string, qos QOS) error { return o.mqtt.PublishString(o.Ctx(), topic, payload, qos) } func (o *MClient) PublishJSON(topic string, payload interface{}, qos QOS) error { return o.mqtt.PublishJSON(o.Ctx(), topic, payload, qos) } func (o *MClient) Subscribe(topic string, qos QOS) error { o.mu.Lock() defer o.mu.Unlock() if _, ok := o.mapTopics[topic]; !ok { o.mapTopics[topic] = qos } return o.mqtt.Subscribe(o.Ctx(), topic, qos) } func (o *MClient) Unsubscribe(topic string) error { o.mu.Lock() defer o.mu.Unlock() if _, ok := o.mapTopics[topic]; ok { delete(o.mapTopics, topic) } return o.mqtt.Unsubscribe(o.Ctx(), topic) } func (o *MClient) Handle(topic string, handler MessageHandler) Route { return o.mqtt.Handle(topic, handler) } func (o *MClient) Ctx() context.Context { ctx, _ := context.WithTimeout(context.Background(), time.Millisecond*time.Duration(o.timeout)) return ctx }