| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254 |
- package util
- import (
- "encoding/json"
- "sync"
- )
- var SHARD_COUNT = 32
- type ConMap []*ConMapShared
- type ConMapShared struct {
- items map[string]interface{}
- sync.RWMutex
- }
- func NewConMap() ConMap {
- m := make(ConMap, SHARD_COUNT)
- for i := 0; i < SHARD_COUNT; i++ {
- m[i] = &ConMapShared{items: make(map[string]interface{})}
- }
- return m
- }
- func (m ConMap) GetShard(key string) *ConMapShared {
- return m[uint(fnv32(key))%uint(SHARD_COUNT)]
- }
- func (m ConMap) MSet(data map[string]interface{}) {
- for key, value := range data {
- shard := m.GetShard(key)
- shard.Lock()
- shard.items[key] = value
- shard.Unlock()
- }
- }
- func (m ConMap) Set(key string, value interface{}) {
- shard := m.GetShard(key)
- shard.Lock()
- shard.items[key] = value
- shard.Unlock()
- }
- type UpsertCb func(exist bool, valueInMap interface{}, newValue interface{}) interface{}
- func (m ConMap) Upsert(key string, value interface{}, cb UpsertCb) (res interface{}) {
- shard := m.GetShard(key)
- shard.Lock()
- v, ok := shard.items[key]
- res = cb(ok, v, value)
- shard.items[key] = res
- shard.Unlock()
- return res
- }
- func (m ConMap) SetIfAbsent(key string, value interface{}) bool {
- shard := m.GetShard(key)
- shard.Lock()
- _, ok := shard.items[key]
- if !ok {
- shard.items[key] = value
- }
- shard.Unlock()
- return !ok
- }
- func (m ConMap) Get(key string) (interface{}, bool) {
- shard := m.GetShard(key)
- shard.RLock()
- val, ok := shard.items[key]
- shard.RUnlock()
- return val, ok
- }
- func (m ConMap) Count() int {
- count := 0
- for i := 0; i < SHARD_COUNT; i++ {
- shard := m[i]
- shard.RLock()
- count += len(shard.items)
- shard.RUnlock()
- }
- return count
- }
- func (m ConMap) Has(key string) bool {
- shard := m.GetShard(key)
- shard.RLock()
- _, ok := shard.items[key]
- shard.RUnlock()
- return ok
- }
- func (m ConMap) Remove(key string) {
- shard := m.GetShard(key)
- shard.Lock()
- delete(shard.items, key)
- shard.Unlock()
- }
- type RemoveCb func(key string, v interface{}, exists bool) bool
- func (m ConMap) RemoveCb(key string, cb RemoveCb) bool {
- shard := m.GetShard(key)
- shard.Lock()
- v, ok := shard.items[key]
- remove := cb(key, v, ok)
- if remove && ok {
- delete(shard.items, key)
- }
- shard.Unlock()
- return remove
- }
- func (m ConMap) Pop(key string) (v interface{}, exists bool) {
- shard := m.GetShard(key)
- shard.Lock()
- v, exists = shard.items[key]
- delete(shard.items, key)
- shard.Unlock()
- return v, exists
- }
- func (m ConMap) IsEmpty() bool {
- return m.Count() == 0
- }
- type Tuple struct {
- Key string
- Val interface{}
- }
- func (m ConMap) Iter() <-chan Tuple {
- chans := snapshot(m)
- ch := make(chan Tuple)
- go fanIn(chans, ch)
- return ch
- }
- func (m ConMap) IterBuffered() <-chan Tuple {
- chans := snapshot(m)
- total := 0
- for _, c := range chans {
- total += cap(c)
- }
- ch := make(chan Tuple, total)
- go fanIn(chans, ch)
- return ch
- }
- func snapshot(m ConMap) (chans []chan Tuple) {
- chans = make([]chan Tuple, SHARD_COUNT)
- wg := sync.WaitGroup{}
- wg.Add(SHARD_COUNT)
- for index, shard := range m {
- go func(index int, shard *ConMapShared) {
- shard.RLock()
- chans[index] = make(chan Tuple, len(shard.items))
- wg.Done()
- for key, val := range shard.items {
- chans[index] <- Tuple{key, val}
- }
- shard.RUnlock()
- close(chans[index])
- }(index, shard)
- }
- wg.Wait()
- return chans
- }
- func fanIn(chans []chan Tuple, out chan Tuple) {
- wg := sync.WaitGroup{}
- wg.Add(len(chans))
- for _, ch := range chans {
- go func(ch chan Tuple) {
- for t := range ch {
- out <- t
- }
- wg.Done()
- }(ch)
- }
- wg.Wait()
- close(out)
- }
- func (m ConMap) Items() map[string]interface{} {
- tmp := make(map[string]interface{})
- for item := range m.IterBuffered() {
- tmp[item.Key] = item.Val
- }
- return tmp
- }
- type IterCb func(key string, v interface{})
- func (m ConMap) IterCb(fn IterCb) {
- for idx := range m {
- shard := (m)[idx]
- shard.RLock()
- for key, value := range shard.items {
- fn(key, value)
- }
- shard.RUnlock()
- }
- }
- func (m ConMap) Keys() []string {
- count := m.Count()
- ch := make(chan string, count)
- go func() {
- wg := sync.WaitGroup{}
- wg.Add(SHARD_COUNT)
- for _, shard := range m {
- go func(shard *ConMapShared) {
- shard.RLock()
- for key := range shard.items {
- ch <- key
- }
- shard.RUnlock()
- wg.Done()
- }(shard)
- }
- wg.Wait()
- close(ch)
- }()
- keys := make([]string, 0, count)
- for k := range ch {
- keys = append(keys, k)
- }
- return keys
- }
- func (m ConMap) MarshalJSON() ([]byte, error) {
- tmp := make(map[string]interface{})
- for item := range m.IterBuffered() {
- tmp[item.Key] = item.Val
- }
- return json.Marshal(tmp)
- }
- func fnv32(key string) uint32 {
- hash := uint32(2166136261)
- const prime32 = uint32(16777619)
- for i := 0; i < len(key); i++ {
- hash *= prime32
- hash ^= uint32(key[i])
- }
- return hash
- }
|