123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164 |
- package mqtt
- import (
- "context"
- "crypto/tls"
- "errors"
- paho "github.com/eclipse/paho.mqtt.golang"
- "github.com/google/uuid"
- )
- type ConnHandler interface {
- ConnectionLostHandler(err error)
- OnConnectHandler()
- GetWill() (topic string, payload string)
- }
- // Client for talking using mqtt
- type Client struct {
- Options ClientOptions // The options that were used to create this client
- client paho.Client
- router *router
- connhandler ConnHandler
- }
- // ClientOptions is the list of options used to create a client
- type ClientOptions struct {
- Servers []string // The list of broker hostnames to connect to
- ClientID string // If left empty a uuid will automatically be generated
- Username string // If not set then authentication will not be used
- Password string // Will only be used if the username is set
- AutoReconnect bool // If the client should automatically try to reconnect when the connection is lost
- }
- // QOS describes the quality of service of an mqtt publish
- type QOS byte
- const (
- // AtMostOnce means the broker will deliver at most once to every subscriber - this means message delivery is not guaranteed
- AtMostOnce QOS = iota
- // AtLeastOnce means the broker will deliver a message at least once to every subscriber
- AtLeastOnce
- // ExactlyOnce means the broker will deliver a message exactly once to every subscriber
- ExactlyOnce
- )
- var (
- // ErrMinimumOneServer means that at least one server should be specified in the client options
- ErrMinimumOneServer = errors.New("mqtt: at least one server needs to be specified")
- )
- func handle(callback MessageHandler) paho.MessageHandler {
- return func(client paho.Client, message paho.Message) {
- if callback != nil {
- callback(Message{message: message})
- }
- }
- }
- // NewClient creates a new client with the specified options
- func NewClient(options ClientOptions, connhandler ConnHandler) (*Client, error) {
- pahoOptions := paho.NewClientOptions()
- // brokers
- if options.Servers != nil && len(options.Servers) > 0 {
- for _, server := range options.Servers {
- pahoOptions.AddBroker(server)
- }
- } else {
- return nil, ErrMinimumOneServer
- }
- // client id
- if options.ClientID == "" {
- options.ClientID = uuid.New().String()
- }
- pahoOptions.SetClientID(options.ClientID)
- tls := &tls.Config{
- InsecureSkipVerify: true,
- }
- pahoOptions.SetTLSConfig(tls)
- // auth
- if options.Username != "" {
- pahoOptions.SetUsername(options.Username)
- pahoOptions.SetPassword(options.Password)
- }
- // auto reconnect
- pahoOptions.SetAutoReconnect(options.AutoReconnect)
- pahoOptions.SetCleanSession(false)
- var client Client
- pahoOptions.SetConnectionLostHandler(client.ConnectionLostHandler) //断连
- pahoOptions.SetOnConnectHandler(client.OnConnectHandler) //连接
- if t, m := connhandler.GetWill(); t != "" {
- pahoOptions.SetWill(t, m, 0, false) //遗嘱消息
- }
- pahoClient := paho.NewClient(pahoOptions)
- router := newRouter()
- pahoClient.AddRoute("#", handle(func(message Message) {
- routes := router.match(&message)
- for _, route := range routes {
- m := message
- m.vars = route.vars(&message)
- route.handler(m)
- }
- }))
- client.client = pahoClient
- client.Options = options
- client.router = router
- client.connhandler = connhandler
- return &client, nil
- }
- // Connect tries to establish a connection with the mqtt servers
- func (c *Client) Connect(ctx context.Context) error {
- // try to connect to the client
- token := c.client.Connect()
- return tokenWithContext(ctx, token)
- }
- // Connect tries to establish a connection with the mqtt servers
- func (c *Client) IsConnected() bool {
- // try to connect to the client
- return c.client.IsConnected()
- }
- // DisconnectImmediately will immediately close the connection with the mqtt servers
- func (c *Client) DisconnectImmediately() {
- c.client.Disconnect(0)
- }
- func tokenWithContext(ctx context.Context, token paho.Token) error {
- completer := make(chan error)
- // TODO: This go routine will not be removed up if the ctx is cancelled or a the ctx timeout passes
- go func() {
- token.Wait()
- completer <- token.Error()
- }()
- for {
- select {
- case <-ctx.Done():
- return ctx.Err()
- case err := <-completer:
- return err
- }
- }
- }
- func (c *Client) ConnectionLostHandler(client paho.Client, err error) {
- c.connhandler.ConnectionLostHandler(err)
- }
- func (c *Client) OnConnectHandler(client paho.Client) {
- c.connhandler.OnConnectHandler()
- }
|