123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125 |
- package mqtt
- import (
- "strings"
- "sync"
- "github.com/google/uuid"
- )
- type router struct {
- routes []Route
- lock sync.RWMutex
- }
- func newRouter() *router {
- return &router{routes: []Route{}, lock: sync.RWMutex{}}
- }
- // Route is a receipt for listening or handling certain topic
- type Route struct {
- router *router
- id string
- topic string
- handler MessageHandler
- }
- func newRoute(router *router, topic string, handler MessageHandler) Route {
- return Route{router: router, id: uuid.New().String(), topic: topic, handler: handler}
- }
- func match(route []string, topic []string) bool {
- if len(route) == 0 {
- return len(topic) == 0
- }
- if len(topic) == 0 {
- return route[0] == "#"
- }
- if route[0] == "#" {
- return true
- }
- if (route[0] == "+") || (route[0] == topic[0]) {
- return match(route[1:], topic[1:])
- }
- return false
- }
- func routeIncludesTopic(route, topic string) bool {
- return match(routeSplit(route), strings.Split(topic, "/"))
- }
- func routeSplit(route string) []string {
- var result []string
- if strings.HasPrefix(route, "$share") {
- result = strings.Split(route, "/")[2:]
- } else {
- result = strings.Split(route, "/")
- }
- return result
- }
- func (r *Route) match(message *Message) bool {
- return r.topic == message.Topic() || routeIncludesTopic(r.topic, message.Topic())
- }
- func (r *Route) vars(message *Message) []string {
- var vars []string
- route := routeSplit(r.topic)
- topic := strings.Split(message.Topic(), "/")
- for i, section := range route {
- if section == "+" {
- if len(topic) > i {
- vars = append(vars, topic[i])
- }
- } else if section == "#" {
- if len(topic) > i {
- vars = append(vars, topic[i:]...)
- }
- }
- }
- return vars
- }
- func (r *router) addRoute(topic string, handler MessageHandler) Route {
- if handler != nil {
- route := newRoute(r, topic, handler)
- r.lock.Lock()
- r.routes = append(r.routes, route)
- r.lock.Unlock()
- return route
- }
- return Route{router: r}
- }
- func (r *router) removeRoute(removeRoute *Route) {
- r.lock.Lock()
- for i, route := range r.routes {
- if route.id == removeRoute.id {
- r.routes[i] = r.routes[len(r.routes)-1]
- r.routes = r.routes[:len(r.routes)-1]
- }
- }
- r.lock.Unlock()
- }
- func (r *router) match(message *Message) []Route {
- routes := []Route{}
- r.lock.RLock()
- for _, route := range r.routes {
- if route.match(message) {
- routes = append(routes, route)
- }
- }
- r.lock.RUnlock()
- return routes
- }
- // Stop removes this route from the router and stops matching it
- func (r *Route) Stop() {
- r.router.removeRoute(r)
- }
|