123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163 |
- package mqtt
- import (
- "context"
- "crypto/tls"
- "errors"
- paho "github.com/eclipse/paho.mqtt.golang"
- "github.com/google/uuid"
- )
- type ConnHandler interface {
- ConnectionLostHandler(err error)
- OnConnectHandler()
- GetWill() (topic string, payload string)
- }
- type Client struct {
- Options ClientOptions
- client paho.Client
- router *router
- connHandler ConnHandler
- }
- type ClientOptions struct {
- Servers []string
- ClientID string
- Username string
- Password string
- AutoReconnect bool
- }
- type QOS byte
- const (
-
- AtMostOnce QOS = iota
-
- AtLeastOnce
-
- ExactlyOnce
- )
- var (
-
- ErrMinimumOneServer = errors.New("mqtt: at least one server needs to be specified")
- )
- func handle(callback MessageHandler) paho.MessageHandler {
- return func(client paho.Client, message paho.Message) {
- if callback != nil {
- callback(Message{message: message})
- }
- }
- }
- func NewClient(options ClientOptions, connhandler ConnHandler) (*Client, error) {
- pahoOptions := paho.NewClientOptions()
-
- if options.Servers != nil && len(options.Servers) > 0 {
- for _, server := range options.Servers {
- pahoOptions.AddBroker(server)
- }
- } else {
- return nil, ErrMinimumOneServer
- }
-
- if options.ClientID == "" {
- options.ClientID = uuid.New().String()
- }
- pahoOptions.SetClientID(options.ClientID)
- t := &tls.Config{
- InsecureSkipVerify: true,
- }
- pahoOptions.SetTLSConfig(t)
-
- if options.Username != "" {
- pahoOptions.SetUsername(options.Username)
- pahoOptions.SetPassword(options.Password)
- }
-
- pahoOptions.SetAutoReconnect(options.AutoReconnect)
- pahoOptions.SetCleanSession(false)
- var client Client
- pahoOptions.SetConnectionLostHandler(client.ConnectionLostHandler)
- pahoOptions.SetOnConnectHandler(client.OnConnectHandler)
- if t, m := connhandler.GetWill(); t != "" {
- pahoOptions.SetWill(t, m, 0, false)
- }
- pahoClient := paho.NewClient(pahoOptions)
- router := newRouter()
- pahoClient.AddRoute("#", handle(func(message Message) {
- routes := router.match(&message)
- for _, route := range routes {
- m := message
- m.vars = route.vars(&message)
- route.handler(m)
- }
- }))
- client.client = pahoClient
- client.Options = options
- client.router = router
- client.connHandler = connhandler
- return &client, nil
- }
- func (c *Client) Connect(ctx context.Context) error {
-
- token := c.client.Connect()
- return tokenWithContext(ctx, token)
- }
- func (c *Client) IsConnected() bool {
-
- return c.client.IsConnected()
- }
- func (c *Client) DisconnectImmediately() {
- c.client.Disconnect(0)
- }
- func tokenWithContext(ctx context.Context, token paho.Token) error {
- completer := make(chan error)
-
- go func() {
- token.Wait()
- completer <- token.Error()
- }()
- for {
- select {
- case <-ctx.Done():
- return ctx.Err()
- case err := <-completer:
- return err
- }
- }
- }
- func (c *Client) ConnectionLostHandler(client paho.Client, err error) {
- c.connHandler.ConnectionLostHandler(err)
- }
- func (c *Client) OnConnectHandler(client paho.Client) {
- c.connHandler.OnConnectHandler()
- }
|