poll.go 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156
  1. // Copyright 2021 ByteDance Inc.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package gopool
  15. import (
  16. "context"
  17. "sync"
  18. "sync/atomic"
  19. )
  20. type Pool interface {
  21. // Name returns the corresponding pool name.
  22. Name() string
  23. // SetCap sets the goroutine capacity of the pool.
  24. SetCap(cap int32)
  25. // Go executes f.
  26. Go(f func())
  27. // CtxGo executes f and accepts the context.
  28. CtxGo(ctx context.Context, f func())
  29. // SetPanicHandler sets the panic handler.
  30. SetPanicHandler(f func(context.Context, interface{}))
  31. // WorkerCount returns the number of running workers
  32. WorkerCount() int32
  33. }
  34. var taskPool sync.Pool
  35. func init() {
  36. taskPool.New = newTask
  37. }
  38. type task struct {
  39. ctx context.Context
  40. f func()
  41. next *task
  42. }
  43. func (t *task) zero() {
  44. t.ctx = nil
  45. t.f = nil
  46. t.next = nil
  47. }
  48. func (t *task) Recycle() {
  49. t.zero()
  50. taskPool.Put(t)
  51. }
  52. func newTask() interface{} {
  53. return &task{}
  54. }
  55. type taskList struct {
  56. sync.Mutex
  57. taskHead *task
  58. taskTail *task
  59. }
  60. type pool struct {
  61. // The name of the pool
  62. name string
  63. // capacity of the pool, the maximum number of goroutines that are actually working
  64. cap int32
  65. // Configuration information
  66. config *Config
  67. // linked list of tasks
  68. taskHead *task
  69. taskTail *task
  70. taskLock sync.Mutex
  71. taskCount int32
  72. // Record the number of running workers
  73. workerCount int32
  74. // This method will be called when the worker panic
  75. panicHandler func(context.Context, interface{})
  76. }
  77. // NewPool creates a new pool with the given name, cap and config.
  78. func NewPool(name string, cap int32, config *Config) Pool {
  79. p := &pool{
  80. name: name,
  81. cap: cap,
  82. config: config,
  83. }
  84. return p
  85. }
  86. func (p *pool) Name() string {
  87. return p.name
  88. }
  89. func (p *pool) SetCap(cap int32) {
  90. atomic.StoreInt32(&p.cap, cap)
  91. }
  92. func (p *pool) Go(f func()) {
  93. p.CtxGo(context.Background(), f)
  94. }
  95. func (p *pool) CtxGo(ctx context.Context, f func()) {
  96. t := taskPool.Get().(*task)
  97. t.ctx = ctx
  98. t.f = f
  99. p.taskLock.Lock()
  100. if p.taskHead == nil {
  101. p.taskHead = t
  102. p.taskTail = t
  103. } else {
  104. p.taskTail.next = t
  105. p.taskTail = t
  106. }
  107. p.taskLock.Unlock()
  108. atomic.AddInt32(&p.taskCount, 1)
  109. // The following two conditions are met:
  110. // 1. the number of tasks is greater than the threshold.
  111. // 2. The current number of workers is less than the upper limit p.cap.
  112. // or there are currently no workers.
  113. if (atomic.LoadInt32(&p.taskCount) >= p.config.ScaleThreshold && p.WorkerCount() < atomic.LoadInt32(&p.cap)) || p.WorkerCount() == 0 {
  114. p.incWorkerCount()
  115. w := workerPool.Get().(*worker)
  116. w.pool = p
  117. w.run()
  118. }
  119. }
  120. // SetPanicHandler the func here will be called after the panic has been recovered.
  121. func (p *pool) SetPanicHandler(f func(context.Context, interface{})) {
  122. p.panicHandler = f
  123. }
  124. func (p *pool) WorkerCount() int32 {
  125. return atomic.LoadInt32(&p.workerCount)
  126. }
  127. func (p *pool) incWorkerCount() {
  128. atomic.AddInt32(&p.workerCount, 1)
  129. }
  130. func (p *pool) decWorkerCount() {
  131. atomic.AddInt32(&p.workerCount, -1)
  132. }