123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899 |
- package mqtt
- import (
- "context"
- "encoding/json"
- paho "github.com/eclipse/paho.mqtt.golang"
- )
- // A Message from or to the broker
- type Message struct {
- message paho.Message
- vars []string
- }
- // A MessageHandler to handle incoming messages
- type MessageHandler func(Message)
- // TopicVars is a list of all the message specific matches for a wildcard in a route topic.
- // If the route would be `config/+/full` and the messages topic is `config/server_1/full` then thous would return `[]string{"server_1"}`
- func (m *Message) TopicVars() []string {
- return m.vars
- }
- // Topic is the topic the message was recieved on
- func (m *Message) Topic() string {
- return m.message.Topic()
- }
- // QOS is the quality of service the message was recieved with
- func (m *Message) QOS() QOS {
- return QOS(m.message.Qos())
- }
- // IsDuplicate is true if this exact message has been recieved before (due to a AtLeastOnce QOS)
- func (m *Message) IsDuplicate() bool {
- return m.message.Duplicate()
- }
- // Acknowledge explicitly acknowledges to a broker that the message has been recieved
- func (m *Message) Acknowledge() {
- m.message.Ack()
- }
- // Payload returns the payload as a byte array
- func (m *Message) Payload() []byte {
- return m.message.Payload()
- }
- // PayloadString returns the payload as a string
- func (m *Message) PayloadString() string {
- return string(m.message.Payload())
- }
- // PayloadJSON unmarshals the payload into the provided interface using encoding/json and returns an error if anything fails
- func (m *Message) PayloadJSON(v interface{}) error {
- return json.Unmarshal(m.message.Payload(), v)
- }
- // Handle adds a handler for a certain topic. This handler gets called if any message arrives that matches the topic.
- // Also returns a route that can be used to unsubsribe. Does not automatically subscribe.
- func (c *Client) Handle(topic string, handler MessageHandler) Route {
- return c.router.addRoute(topic, handler)
- }
- // Listen returns a stream of messages that match the topic.
- // Also returns a route that can be used to unsubsribe. Does not automatically subscribe.
- func (c *Client) Listen(topic string) (chan Message, Route) {
- queue := make(chan Message)
- route := c.router.addRoute(topic, func(message Message) {
- queue <- message
- })
- return queue, route
- }
- // Subscribe subscribes to a certain topic and errors if this fails.
- func (c *Client) Subscribe(ctx context.Context, topic string, qos QOS) error {
- token := c.client.Subscribe(topic, byte(qos), nil)
- err := tokenWithContext(ctx, token)
- return err
- }
- // SubscribeMultiple subscribes to multiple topics and errors if this fails.
- func (c *Client) SubscribeMultiple(ctx context.Context, subscriptions map[string]QOS) error {
- subs := make(map[string]byte, len(subscriptions))
- for topic, qos := range subscriptions {
- subs[topic] = byte(qos)
- }
- token := c.client.SubscribeMultiple(subs, nil)
- err := tokenWithContext(ctx, token)
- return err
- }
- // Unsubscribe unsubscribes from a certain topic and errors if this fails.
- func (c *Client) Unsubscribe(ctx context.Context, topic string) error {
- token := c.client.Unsubscribe(topic)
- err := tokenWithContext(ctx, token)
- return err
- }
|