mqtt.go 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164
  1. package mqtt
  2. import (
  3. "context"
  4. "crypto/tls"
  5. "errors"
  6. paho "github.com/eclipse/paho.mqtt.golang"
  7. "github.com/google/uuid"
  8. )
  9. type ConnHandler interface {
  10. ConnectionLostHandler(err error)
  11. OnConnectHandler()
  12. GetWill() (topic string, payload string)
  13. }
  14. // Client for talking using mqtt
  15. type Client struct {
  16. Options ClientOptions // The options that were used to create this client
  17. client paho.Client
  18. router *router
  19. connhandler ConnHandler
  20. }
  21. // ClientOptions is the list of options used to create a client
  22. type ClientOptions struct {
  23. Servers []string // The list of broker hostnames to connect to
  24. ClientID string // If left empty a uuid will automatically be generated
  25. Username string // If not set then authentication will not be used
  26. Password string // Will only be used if the username is set
  27. AutoReconnect bool // If the client should automatically try to reconnect when the connection is lost
  28. }
  29. // QOS describes the quality of service of an mqtt publish
  30. type QOS byte
  31. const (
  32. // AtMostOnce means the broker will deliver at most once to every subscriber - this means message delivery is not guaranteed
  33. AtMostOnce QOS = iota
  34. // AtLeastOnce means the broker will deliver a message at least once to every subscriber
  35. AtLeastOnce
  36. // ExactlyOnce means the broker will deliver a message exactly once to every subscriber
  37. ExactlyOnce
  38. )
  39. var (
  40. // ErrMinimumOneServer means that at least one server should be specified in the client options
  41. ErrMinimumOneServer = errors.New("mqtt: at least one server needs to be specified")
  42. )
  43. func handle(callback MessageHandler) paho.MessageHandler {
  44. return func(client paho.Client, message paho.Message) {
  45. if callback != nil {
  46. callback(Message{message: message})
  47. }
  48. }
  49. }
  50. // NewClient creates a new client with the specified options
  51. func NewClient(options ClientOptions, connhandler ConnHandler) (*Client, error) {
  52. pahoOptions := paho.NewClientOptions()
  53. // brokers
  54. if options.Servers != nil && len(options.Servers) > 0 {
  55. for _, server := range options.Servers {
  56. pahoOptions.AddBroker(server)
  57. }
  58. } else {
  59. return nil, ErrMinimumOneServer
  60. }
  61. // client id
  62. if options.ClientID == "" {
  63. options.ClientID = uuid.New().String()
  64. }
  65. pahoOptions.SetClientID(options.ClientID)
  66. tls := &tls.Config{
  67. InsecureSkipVerify: true,
  68. }
  69. pahoOptions.SetTLSConfig(tls)
  70. // auth
  71. if options.Username != "" {
  72. pahoOptions.SetUsername(options.Username)
  73. pahoOptions.SetPassword(options.Password)
  74. }
  75. // auto reconnect
  76. pahoOptions.SetAutoReconnect(options.AutoReconnect)
  77. pahoOptions.SetCleanSession(false)
  78. var client Client
  79. pahoOptions.SetConnectionLostHandler(client.ConnectionLostHandler) //断连
  80. pahoOptions.SetOnConnectHandler(client.OnConnectHandler) //连接
  81. if t, m := connhandler.GetWill(); t != "" {
  82. pahoOptions.SetWill(t, m, 0, false) //遗嘱消息
  83. }
  84. pahoClient := paho.NewClient(pahoOptions)
  85. router := newRouter()
  86. pahoClient.AddRoute("#", handle(func(message Message) {
  87. routes := router.match(&message)
  88. for _, route := range routes {
  89. m := message
  90. m.vars = route.vars(&message)
  91. route.handler(m)
  92. }
  93. }))
  94. client.client = pahoClient
  95. client.Options = options
  96. client.router = router
  97. client.connhandler = connhandler
  98. return &client, nil
  99. }
  100. // Connect tries to establish a connection with the mqtt servers
  101. func (c *Client) Connect(ctx context.Context) error {
  102. // try to connect to the client
  103. token := c.client.Connect()
  104. return tokenWithContext(ctx, token)
  105. }
  106. // Connect tries to establish a connection with the mqtt servers
  107. func (c *Client) IsConnected() bool {
  108. // try to connect to the client
  109. return c.client.IsConnected()
  110. }
  111. // DisconnectImmediately will immediately close the connection with the mqtt servers
  112. func (c *Client) DisconnectImmediately() {
  113. c.client.Disconnect(0)
  114. }
  115. func tokenWithContext(ctx context.Context, token paho.Token) error {
  116. completer := make(chan error)
  117. // TODO: This go routine will not be removed up if the ctx is cancelled or a the ctx timeout passes
  118. go func() {
  119. token.Wait()
  120. completer <- token.Error()
  121. }()
  122. for {
  123. select {
  124. case <-ctx.Done():
  125. return ctx.Err()
  126. case err := <-completer:
  127. return err
  128. }
  129. }
  130. }
  131. func (c *Client) ConnectionLostHandler(client paho.Client, err error) {
  132. c.connhandler.ConnectionLostHandler(err)
  133. }
  134. func (c *Client) OnConnectHandler(client paho.Client) {
  135. c.connhandler.OnConnectHandler()
  136. }