conmap.go 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254
  1. package util
  2. import (
  3. "encoding/json"
  4. "sync"
  5. )
  6. var SHARD_COUNT = 32
  7. type ConMap []*ConMapShared
  8. type ConMapShared struct {
  9. items map[string]interface{}
  10. sync.RWMutex
  11. }
  12. func NewConMap() ConMap {
  13. m := make(ConMap, SHARD_COUNT)
  14. for i := 0; i < SHARD_COUNT; i++ {
  15. m[i] = &ConMapShared{items: make(map[string]interface{})}
  16. }
  17. return m
  18. }
  19. func (m ConMap) GetShard(key string) *ConMapShared {
  20. return m[uint(fnv32(key))%uint(SHARD_COUNT)]
  21. }
  22. func (m ConMap) MSet(data map[string]interface{}) {
  23. for key, value := range data {
  24. shard := m.GetShard(key)
  25. shard.Lock()
  26. shard.items[key] = value
  27. shard.Unlock()
  28. }
  29. }
  30. func (m ConMap) Set(key string, value interface{}) {
  31. shard := m.GetShard(key)
  32. shard.Lock()
  33. shard.items[key] = value
  34. shard.Unlock()
  35. }
  36. type UpsertCb func(exist bool, valueInMap interface{}, newValue interface{}) interface{}
  37. func (m ConMap) Upsert(key string, value interface{}, cb UpsertCb) (res interface{}) {
  38. shard := m.GetShard(key)
  39. shard.Lock()
  40. v, ok := shard.items[key]
  41. res = cb(ok, v, value)
  42. shard.items[key] = res
  43. shard.Unlock()
  44. return res
  45. }
  46. func (m ConMap) SetIfAbsent(key string, value interface{}) bool {
  47. shard := m.GetShard(key)
  48. shard.Lock()
  49. _, ok := shard.items[key]
  50. if !ok {
  51. shard.items[key] = value
  52. }
  53. shard.Unlock()
  54. return !ok
  55. }
  56. func (m ConMap) Get(key string) (interface{}, bool) {
  57. shard := m.GetShard(key)
  58. shard.RLock()
  59. val, ok := shard.items[key]
  60. shard.RUnlock()
  61. return val, ok
  62. }
  63. func (m ConMap) Count() int {
  64. count := 0
  65. for i := 0; i < SHARD_COUNT; i++ {
  66. shard := m[i]
  67. shard.RLock()
  68. count += len(shard.items)
  69. shard.RUnlock()
  70. }
  71. return count
  72. }
  73. func (m ConMap) Has(key string) bool {
  74. shard := m.GetShard(key)
  75. shard.RLock()
  76. _, ok := shard.items[key]
  77. shard.RUnlock()
  78. return ok
  79. }
  80. func (m ConMap) Remove(key string) {
  81. shard := m.GetShard(key)
  82. shard.Lock()
  83. delete(shard.items, key)
  84. shard.Unlock()
  85. }
  86. type RemoveCb func(key string, v interface{}, exists bool) bool
  87. func (m ConMap) RemoveCb(key string, cb RemoveCb) bool {
  88. shard := m.GetShard(key)
  89. shard.Lock()
  90. v, ok := shard.items[key]
  91. remove := cb(key, v, ok)
  92. if remove && ok {
  93. delete(shard.items, key)
  94. }
  95. shard.Unlock()
  96. return remove
  97. }
  98. func (m ConMap) Pop(key string) (v interface{}, exists bool) {
  99. shard := m.GetShard(key)
  100. shard.Lock()
  101. v, exists = shard.items[key]
  102. delete(shard.items, key)
  103. shard.Unlock()
  104. return v, exists
  105. }
  106. func (m ConMap) IsEmpty() bool {
  107. return m.Count() == 0
  108. }
  109. type Tuple struct {
  110. Key string
  111. Val interface{}
  112. }
  113. func (m ConMap) Iter() <-chan Tuple {
  114. chans := snapshot(m)
  115. ch := make(chan Tuple)
  116. go fanIn(chans, ch)
  117. return ch
  118. }
  119. func (m ConMap) IterBuffered() <-chan Tuple {
  120. chans := snapshot(m)
  121. total := 0
  122. for _, c := range chans {
  123. total += cap(c)
  124. }
  125. ch := make(chan Tuple, total)
  126. go fanIn(chans, ch)
  127. return ch
  128. }
  129. func snapshot(m ConMap) (chans []chan Tuple) {
  130. chans = make([]chan Tuple, SHARD_COUNT)
  131. wg := sync.WaitGroup{}
  132. wg.Add(SHARD_COUNT)
  133. for index, shard := range m {
  134. go func(index int, shard *ConMapShared) {
  135. shard.RLock()
  136. chans[index] = make(chan Tuple, len(shard.items))
  137. wg.Done()
  138. for key, val := range shard.items {
  139. chans[index] <- Tuple{key, val}
  140. }
  141. shard.RUnlock()
  142. close(chans[index])
  143. }(index, shard)
  144. }
  145. wg.Wait()
  146. return chans
  147. }
  148. func fanIn(chans []chan Tuple, out chan Tuple) {
  149. wg := sync.WaitGroup{}
  150. wg.Add(len(chans))
  151. for _, ch := range chans {
  152. go func(ch chan Tuple) {
  153. for t := range ch {
  154. out <- t
  155. }
  156. wg.Done()
  157. }(ch)
  158. }
  159. wg.Wait()
  160. close(out)
  161. }
  162. func (m ConMap) Items() map[string]interface{} {
  163. tmp := make(map[string]interface{})
  164. for item := range m.IterBuffered() {
  165. tmp[item.Key] = item.Val
  166. }
  167. return tmp
  168. }
  169. type IterCb func(key string, v interface{})
  170. func (m ConMap) IterCb(fn IterCb) {
  171. for idx := range m {
  172. shard := (m)[idx]
  173. shard.RLock()
  174. for key, value := range shard.items {
  175. fn(key, value)
  176. }
  177. shard.RUnlock()
  178. }
  179. }
  180. func (m ConMap) Keys() []string {
  181. count := m.Count()
  182. ch := make(chan string, count)
  183. go func() {
  184. wg := sync.WaitGroup{}
  185. wg.Add(SHARD_COUNT)
  186. for _, shard := range m {
  187. go func(shard *ConMapShared) {
  188. shard.RLock()
  189. for key := range shard.items {
  190. ch <- key
  191. }
  192. shard.RUnlock()
  193. wg.Done()
  194. }(shard)
  195. }
  196. wg.Wait()
  197. close(ch)
  198. }()
  199. keys := make([]string, 0, count)
  200. for k := range ch {
  201. keys = append(keys, k)
  202. }
  203. return keys
  204. }
  205. func (m ConMap) MarshalJSON() ([]byte, error) {
  206. tmp := make(map[string]interface{})
  207. for item := range m.IterBuffered() {
  208. tmp[item.Key] = item.Val
  209. }
  210. return json.Marshal(tmp)
  211. }
  212. func fnv32(key string) uint32 {
  213. hash := uint32(2166136261)
  214. const prime32 = uint32(16777619)
  215. for i := 0; i < len(key); i++ {
  216. hash *= prime32
  217. hash ^= uint32(key[i])
  218. }
  219. return hash
  220. }