123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156 |
- package util
- import (
- "fmt"
- "runtime"
- "sync/atomic"
- )
- type mlCache struct {
- putNo uint32
- getNo uint32
- value interface{}
- }
- type MlQueue struct {
- capaciity uint32
- capMod uint32
- putPos uint32
- getPos uint32
- cache []mlCache
- }
- func NewQueue(capaciity uint32) *MlQueue {
- q := new(MlQueue)
- q.capaciity = minQuantity(capaciity)
- q.capMod = q.capaciity - 1
- q.putPos = 0
- q.getPos = 0
- q.cache = make([]mlCache, q.capaciity)
- for i := range q.cache {
- cache := &q.cache[i]
- cache.getNo = uint32(i)
- cache.putNo = uint32(i)
- }
- cache := &q.cache[0]
- cache.getNo = q.capaciity
- cache.putNo = q.capaciity
- return q
- }
- func (q *MlQueue) String() string {
- getPos := atomic.LoadUint32(&q.getPos)
- putPos := atomic.LoadUint32(&q.putPos)
- return fmt.Sprintf("Queue{capaciity: %v, capMod: %v, putPos: %v, getPos: %v}",
- q.capaciity, q.capMod, putPos, getPos)
- }
- func (q *MlQueue) Capaciity() uint32 {
- return q.capaciity
- }
- func (q *MlQueue) Quantity() uint32 {
- var putPos, getPos uint32
- var quantity uint32
- getPos = atomic.LoadUint32(&q.getPos)
- putPos = atomic.LoadUint32(&q.putPos)
- if putPos >= getPos {
- quantity = putPos - getPos
- } else {
- quantity = q.capMod + (putPos - getPos)
- }
- return quantity
- }
- func (q *MlQueue) Put(val interface{}) (ok bool, quantity uint32) {
- var putPos, putPosNew, getPos, posCnt uint32
- var cache *mlCache
- capMod := q.capMod
- getPos = atomic.LoadUint32(&q.getPos)
- putPos = atomic.LoadUint32(&q.putPos)
- if putPos >= getPos {
- posCnt = putPos - getPos
- } else {
- posCnt = capMod + (putPos - getPos)
- }
- if posCnt >= capMod-1 {
- runtime.Gosched()
- return false, posCnt
- }
- putPosNew = putPos + 1
- if !atomic.CompareAndSwapUint32(&q.putPos, putPos, putPosNew) {
- runtime.Gosched()
- return false, posCnt
- }
- cache = &q.cache[putPosNew&capMod]
- for {
- getNo := atomic.LoadUint32(&cache.getNo)
- putNo := atomic.LoadUint32(&cache.putNo)
- if putPosNew == putNo && getNo == putNo {
- cache.value = val
- atomic.AddUint32(&cache.putNo, q.capaciity)
- return true, posCnt + 1
- } else {
- runtime.Gosched()
- }
- }
- }
- func (q *MlQueue) Get() (val interface{}, ok bool, quantity uint32) {
- var putPos, getPos, getPosNew, posCnt uint32
- var cache *mlCache
- capMod := q.capMod
- putPos = atomic.LoadUint32(&q.putPos)
- getPos = atomic.LoadUint32(&q.getPos)
- if putPos >= getPos {
- posCnt = putPos - getPos
- } else {
- posCnt = capMod + (putPos - getPos)
- }
- if posCnt < 1 {
- runtime.Gosched()
- return nil, false, posCnt
- }
- getPosNew = getPos + 1
- if !atomic.CompareAndSwapUint32(&q.getPos, getPos, getPosNew) {
- runtime.Gosched()
- return nil, false, posCnt
- }
- cache = &q.cache[getPosNew&capMod]
- for {
- getNo := atomic.LoadUint32(&cache.getNo)
- putNo := atomic.LoadUint32(&cache.putNo)
- if getPosNew == getNo && getNo == putNo-q.capaciity {
- val = cache.value
- atomic.AddUint32(&cache.getNo, q.capaciity)
- return val, true, posCnt - 1
- } else {
- runtime.Gosched()
- }
- }
- }
- func minQuantity(v uint32) uint32 {
- v--
- v |= v >> 1
- v |= v >> 2
- v |= v >> 4
- v |= v >> 8
- v |= v >> 16
- v++
- return v
- }
|