router.go 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125
  1. package mqtt
  2. import (
  3. "strings"
  4. "sync"
  5. "github.com/google/uuid"
  6. )
  7. type router struct {
  8. routes []Route
  9. lock sync.RWMutex
  10. }
  11. func newRouter() *router {
  12. return &router{routes: []Route{}, lock: sync.RWMutex{}}
  13. }
  14. // Route is a receipt for listening or handling certain topic
  15. type Route struct {
  16. router *router
  17. id string
  18. topic string
  19. handler MessageHandler
  20. }
  21. func newRoute(router *router, topic string, handler MessageHandler) Route {
  22. return Route{router: router, id: uuid.New().String(), topic: topic, handler: handler}
  23. }
  24. func match(route []string, topic []string) bool {
  25. if len(route) == 0 {
  26. return len(topic) == 0
  27. }
  28. if len(topic) == 0 {
  29. return route[0] == "#"
  30. }
  31. if route[0] == "#" {
  32. return true
  33. }
  34. if (route[0] == "+") || (route[0] == topic[0]) {
  35. return match(route[1:], topic[1:])
  36. }
  37. return false
  38. }
  39. func routeIncludesTopic(route, topic string) bool {
  40. return match(routeSplit(route), strings.Split(topic, "/"))
  41. }
  42. func routeSplit(route string) []string {
  43. var result []string
  44. if strings.HasPrefix(route, "$share") {
  45. result = strings.Split(route, "/")[2:]
  46. } else {
  47. result = strings.Split(route, "/")
  48. }
  49. return result
  50. }
  51. func (r *Route) match(message *Message) bool {
  52. return r.topic == message.Topic() || routeIncludesTopic(r.topic, message.Topic())
  53. }
  54. func (r *Route) vars(message *Message) []string {
  55. var vars []string
  56. route := routeSplit(r.topic)
  57. topic := strings.Split(message.Topic(), "/")
  58. for i, section := range route {
  59. if section == "+" {
  60. if len(topic) > i {
  61. vars = append(vars, topic[i])
  62. }
  63. } else if section == "#" {
  64. if len(topic) > i {
  65. vars = append(vars, topic[i:]...)
  66. }
  67. }
  68. }
  69. return vars
  70. }
  71. func (r *router) addRoute(topic string, handler MessageHandler) Route {
  72. if handler != nil {
  73. route := newRoute(r, topic, handler)
  74. r.lock.Lock()
  75. r.routes = append(r.routes, route)
  76. r.lock.Unlock()
  77. return route
  78. }
  79. return Route{router: r}
  80. }
  81. func (r *router) removeRoute(removeRoute *Route) {
  82. r.lock.Lock()
  83. for i, route := range r.routes {
  84. if route.id == removeRoute.id {
  85. r.routes[i] = r.routes[len(r.routes)-1]
  86. r.routes = r.routes[:len(r.routes)-1]
  87. }
  88. }
  89. r.lock.Unlock()
  90. }
  91. func (r *router) match(message *Message) []Route {
  92. routes := []Route{}
  93. r.lock.RLock()
  94. for _, route := range r.routes {
  95. if route.match(message) {
  96. routes = append(routes, route)
  97. }
  98. }
  99. r.lock.RUnlock()
  100. return routes
  101. }
  102. // Stop removes this route from the router and stops matching it
  103. func (r *Route) Stop() {
  104. r.router.removeRoute(r)
  105. }