queue.go 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156
  1. package util
  2. import (
  3. "fmt"
  4. "runtime"
  5. "sync/atomic"
  6. )
  7. type mlCache struct {
  8. putNo uint32
  9. getNo uint32
  10. value interface{}
  11. }
  12. type MlQueue struct {
  13. capaciity uint32
  14. capMod uint32
  15. putPos uint32
  16. getPos uint32
  17. cache []mlCache
  18. }
  19. func NewQueue(capaciity uint32) *MlQueue {
  20. q := new(MlQueue)
  21. q.capaciity = minQuantity(capaciity)
  22. q.capMod = q.capaciity - 1
  23. q.putPos = 0
  24. q.getPos = 0
  25. q.cache = make([]mlCache, q.capaciity)
  26. for i := range q.cache {
  27. cache := &q.cache[i]
  28. cache.getNo = uint32(i)
  29. cache.putNo = uint32(i)
  30. }
  31. cache := &q.cache[0]
  32. cache.getNo = q.capaciity
  33. cache.putNo = q.capaciity
  34. return q
  35. }
  36. func (q *MlQueue) String() string {
  37. getPos := atomic.LoadUint32(&q.getPos)
  38. putPos := atomic.LoadUint32(&q.putPos)
  39. return fmt.Sprintf("Queue{capaciity: %v, capMod: %v, putPos: %v, getPos: %v}",
  40. q.capaciity, q.capMod, putPos, getPos)
  41. }
  42. func (q *MlQueue) Capaciity() uint32 {
  43. return q.capaciity
  44. }
  45. func (q *MlQueue) Quantity() uint32 {
  46. var putPos, getPos uint32
  47. var quantity uint32
  48. getPos = atomic.LoadUint32(&q.getPos)
  49. putPos = atomic.LoadUint32(&q.putPos)
  50. if putPos >= getPos {
  51. quantity = putPos - getPos
  52. } else {
  53. quantity = q.capMod + (putPos - getPos)
  54. }
  55. return quantity
  56. }
  57. func (q *MlQueue) Put(val interface{}) (ok bool, quantity uint32) {
  58. var putPos, putPosNew, getPos, posCnt uint32
  59. var cache *mlCache
  60. capMod := q.capMod
  61. getPos = atomic.LoadUint32(&q.getPos)
  62. putPos = atomic.LoadUint32(&q.putPos)
  63. if putPos >= getPos {
  64. posCnt = putPos - getPos
  65. } else {
  66. posCnt = capMod + (putPos - getPos)
  67. }
  68. if posCnt >= capMod-1 {
  69. runtime.Gosched()
  70. return false, posCnt
  71. }
  72. putPosNew = putPos + 1
  73. if !atomic.CompareAndSwapUint32(&q.putPos, putPos, putPosNew) {
  74. runtime.Gosched()
  75. return false, posCnt
  76. }
  77. cache = &q.cache[putPosNew&capMod]
  78. for {
  79. getNo := atomic.LoadUint32(&cache.getNo)
  80. putNo := atomic.LoadUint32(&cache.putNo)
  81. if putPosNew == putNo && getNo == putNo {
  82. cache.value = val
  83. atomic.AddUint32(&cache.putNo, q.capaciity)
  84. return true, posCnt + 1
  85. } else {
  86. runtime.Gosched()
  87. }
  88. }
  89. }
  90. func (q *MlQueue) Get() (val interface{}, ok bool, quantity uint32) {
  91. var putPos, getPos, getPosNew, posCnt uint32
  92. var cache *mlCache
  93. capMod := q.capMod
  94. putPos = atomic.LoadUint32(&q.putPos)
  95. getPos = atomic.LoadUint32(&q.getPos)
  96. if putPos >= getPos {
  97. posCnt = putPos - getPos
  98. } else {
  99. posCnt = capMod + (putPos - getPos)
  100. }
  101. if posCnt < 1 {
  102. runtime.Gosched()
  103. return nil, false, posCnt
  104. }
  105. getPosNew = getPos + 1
  106. if !atomic.CompareAndSwapUint32(&q.getPos, getPos, getPosNew) {
  107. runtime.Gosched()
  108. return nil, false, posCnt
  109. }
  110. cache = &q.cache[getPosNew&capMod]
  111. for {
  112. getNo := atomic.LoadUint32(&cache.getNo)
  113. putNo := atomic.LoadUint32(&cache.putNo)
  114. if getPosNew == getNo && getNo == putNo-q.capaciity {
  115. val = cache.value
  116. atomic.AddUint32(&cache.getNo, q.capaciity)
  117. return val, true, posCnt - 1
  118. } else {
  119. runtime.Gosched()
  120. }
  121. }
  122. }
  123. func minQuantity(v uint32) uint32 {
  124. v--
  125. v |= v >> 1
  126. v |= v >> 2
  127. v |= v >> 4
  128. v |= v >> 8
  129. v |= v >> 16
  130. v++
  131. return v
  132. }