package mqtt import ( "context" "crypto/tls" "errors" paho "github.com/eclipse/paho.mqtt.golang" "github.com/google/uuid" ) type ConnHandler interface { ConnectionLostHandler(err error) OnConnectHandler() GetWill() (topic string, payload string) } // Client for talking using mqtt type Client struct { Options ClientOptions // The options that were used to create this client client paho.Client router *router connhandler ConnHandler } // ClientOptions is the list of options used to create a client type ClientOptions struct { Servers []string // The list of broker hostnames to connect to ClientID string // If left empty a uuid will automatically be generated Username string // If not set then authentication will not be used Password string // Will only be used if the username is set AutoReconnect bool // If the client should automatically try to reconnect when the connection is lost } // QOS describes the quality of service of an mqtt publish type QOS byte const ( // AtMostOnce means the broker will deliver at most once to every subscriber - this means message delivery is not guaranteed AtMostOnce QOS = iota // AtLeastOnce means the broker will deliver a message at least once to every subscriber AtLeastOnce // ExactlyOnce means the broker will deliver a message exactly once to every subscriber ExactlyOnce ) var ( // ErrMinimumOneServer means that at least one server should be specified in the client options ErrMinimumOneServer = errors.New("mqtt: at least one server needs to be specified") ) func handle(callback MessageHandler) paho.MessageHandler { return func(client paho.Client, message paho.Message) { if callback != nil { callback(Message{message: message}) } } } // NewClient creates a new client with the specified options func NewClient(options ClientOptions, connhandler ConnHandler) (*Client, error) { pahoOptions := paho.NewClientOptions() // brokers if options.Servers != nil && len(options.Servers) > 0 { for _, server := range options.Servers { pahoOptions.AddBroker(server) } } else { return nil, ErrMinimumOneServer } // client id if options.ClientID == "" { options.ClientID = uuid.New().String() } pahoOptions.SetClientID(options.ClientID) tls := &tls.Config{ InsecureSkipVerify: true, } pahoOptions.SetTLSConfig(tls) // auth if options.Username != "" { pahoOptions.SetUsername(options.Username) pahoOptions.SetPassword(options.Password) } // auto reconnect pahoOptions.SetAutoReconnect(options.AutoReconnect) pahoOptions.SetCleanSession(false) var client Client pahoOptions.SetConnectionLostHandler(client.ConnectionLostHandler) //断连 pahoOptions.SetOnConnectHandler(client.OnConnectHandler) //连接 if t, m := connhandler.GetWill(); t != "" { pahoOptions.SetWill(t, m, 0, false) //遗嘱消息 } pahoClient := paho.NewClient(pahoOptions) router := newRouter() pahoClient.AddRoute("#", handle(func(message Message) { routes := router.match(&message) for _, route := range routes { m := message m.vars = route.vars(&message) route.handler(m) } })) client.client = pahoClient client.Options = options client.router = router client.connhandler = connhandler return &client, nil } // Connect tries to establish a connection with the mqtt servers func (c *Client) Connect(ctx context.Context) error { // try to connect to the client token := c.client.Connect() return tokenWithContext(ctx, token) } // Connect tries to establish a connection with the mqtt servers func (c *Client) IsConnected() bool { // try to connect to the client return c.client.IsConnected() } // DisconnectImmediately will immediately close the connection with the mqtt servers func (c *Client) DisconnectImmediately() { c.client.Disconnect(0) } func tokenWithContext(ctx context.Context, token paho.Token) error { completer := make(chan error) // TODO: This go routine will not be removed up if the ctx is cancelled or a the ctx timeout passes go func() { token.Wait() completer <- token.Error() }() for { select { case <-ctx.Done(): return ctx.Err() case err := <-completer: return err } } } func (c *Client) ConnectionLostHandler(client paho.Client, err error) { c.connhandler.ConnectionLostHandler(err) } func (c *Client) OnConnectHandler(client paho.Client) { c.connhandler.OnConnectHandler() }