subscribe.go 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899
  1. package mqtt
  2. import (
  3. "context"
  4. "encoding/json"
  5. paho "github.com/eclipse/paho.mqtt.golang"
  6. )
  7. // A Message from or to the broker
  8. type Message struct {
  9. message paho.Message
  10. vars []string
  11. }
  12. // A MessageHandler to handle incoming messages
  13. type MessageHandler func(Message)
  14. // TopicVars is a list of all the message specific matches for a wildcard in a route topic.
  15. // If the route would be `config/+/full` and the messages topic is `config/server_1/full` then thous would return `[]string{"server_1"}`
  16. func (m *Message) TopicVars() []string {
  17. return m.vars
  18. }
  19. // Topic is the topic the message was recieved on
  20. func (m *Message) Topic() string {
  21. return m.message.Topic()
  22. }
  23. // QOS is the quality of service the message was recieved with
  24. func (m *Message) QOS() QOS {
  25. return QOS(m.message.Qos())
  26. }
  27. // IsDuplicate is true if this exact message has been recieved before (due to a AtLeastOnce QOS)
  28. func (m *Message) IsDuplicate() bool {
  29. return m.message.Duplicate()
  30. }
  31. // Acknowledge explicitly acknowledges to a broker that the message has been recieved
  32. func (m *Message) Acknowledge() {
  33. m.message.Ack()
  34. }
  35. // Payload returns the payload as a byte array
  36. func (m *Message) Payload() []byte {
  37. return m.message.Payload()
  38. }
  39. // PayloadString returns the payload as a string
  40. func (m *Message) PayloadString() string {
  41. return string(m.message.Payload())
  42. }
  43. // PayloadJSON unmarshals the payload into the provided interface using encoding/json and returns an error if anything fails
  44. func (m *Message) PayloadJSON(v interface{}) error {
  45. return json.Unmarshal(m.message.Payload(), v)
  46. }
  47. // Handle adds a handler for a certain topic. This handler gets called if any message arrives that matches the topic.
  48. // Also returns a route that can be used to unsubsribe. Does not automatically subscribe.
  49. func (c *Client) Handle(topic string, handler MessageHandler) Route {
  50. return c.router.addRoute(topic, handler)
  51. }
  52. // Listen returns a stream of messages that match the topic.
  53. // Also returns a route that can be used to unsubsribe. Does not automatically subscribe.
  54. func (c *Client) Listen(topic string) (chan Message, Route) {
  55. queue := make(chan Message)
  56. route := c.router.addRoute(topic, func(message Message) {
  57. queue <- message
  58. })
  59. return queue, route
  60. }
  61. // Subscribe subscribes to a certain topic and errors if this fails.
  62. func (c *Client) Subscribe(ctx context.Context, topic string, qos QOS) error {
  63. token := c.client.Subscribe(topic, byte(qos), nil)
  64. err := tokenWithContext(ctx, token)
  65. return err
  66. }
  67. // SubscribeMultiple subscribes to multiple topics and errors if this fails.
  68. func (c *Client) SubscribeMultiple(ctx context.Context, subscriptions map[string]QOS) error {
  69. subs := make(map[string]byte, len(subscriptions))
  70. for topic, qos := range subscriptions {
  71. subs[topic] = byte(qos)
  72. }
  73. token := c.client.SubscribeMultiple(subs, nil)
  74. err := tokenWithContext(ctx, token)
  75. return err
  76. }
  77. // Unsubscribe unsubscribes from a certain topic and errors if this fails.
  78. func (c *Client) Unsubscribe(ctx context.Context, topic string) error {
  79. token := c.client.Unsubscribe(topic)
  80. err := tokenWithContext(ctx, token)
  81. return err
  82. }