123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156 |
- // Copyright 2021 ByteDance Inc.
- //
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- // See the License for the specific language governing permissions and
- // limitations under the License.
- package gopool
- import (
- "context"
- "sync"
- "sync/atomic"
- )
- type Pool interface {
- // Name returns the corresponding pool name.
- Name() string
- // SetCap sets the goroutine capacity of the pool.
- SetCap(cap int32)
- // Go executes f.
- Go(f func())
- // CtxGo executes f and accepts the context.
- CtxGo(ctx context.Context, f func())
- // SetPanicHandler sets the panic handler.
- SetPanicHandler(f func(context.Context, interface{}))
- // WorkerCount returns the number of running workers
- WorkerCount() int32
- }
- var taskPool sync.Pool
- func init() {
- taskPool.New = newTask
- }
- type task struct {
- ctx context.Context
- f func()
- next *task
- }
- func (t *task) zero() {
- t.ctx = nil
- t.f = nil
- t.next = nil
- }
- func (t *task) Recycle() {
- t.zero()
- taskPool.Put(t)
- }
- func newTask() interface{} {
- return &task{}
- }
- type taskList struct {
- sync.Mutex
- taskHead *task
- taskTail *task
- }
- type pool struct {
- // The name of the pool
- name string
- // capacity of the pool, the maximum number of goroutines that are actually working
- cap int32
- // Configuration information
- config *Config
- // linked list of tasks
- taskHead *task
- taskTail *task
- taskLock sync.Mutex
- taskCount int32
- // Record the number of running workers
- workerCount int32
- // This method will be called when the worker panic
- panicHandler func(context.Context, interface{})
- }
- // NewPool creates a new pool with the given name, cap and config.
- func NewPool(name string, cap int32, config *Config) Pool {
- p := &pool{
- name: name,
- cap: cap,
- config: config,
- }
- return p
- }
- func (p *pool) Name() string {
- return p.name
- }
- func (p *pool) SetCap(cap int32) {
- atomic.StoreInt32(&p.cap, cap)
- }
- func (p *pool) Go(f func()) {
- p.CtxGo(context.Background(), f)
- }
- func (p *pool) CtxGo(ctx context.Context, f func()) {
- t := taskPool.Get().(*task)
- t.ctx = ctx
- t.f = f
- p.taskLock.Lock()
- if p.taskHead == nil {
- p.taskHead = t
- p.taskTail = t
- } else {
- p.taskTail.next = t
- p.taskTail = t
- }
- p.taskLock.Unlock()
- atomic.AddInt32(&p.taskCount, 1)
- // The following two conditions are met:
- // 1. the number of tasks is greater than the threshold.
- // 2. The current number of workers is less than the upper limit p.cap.
- // or there are currently no workers.
- if (atomic.LoadInt32(&p.taskCount) >= p.config.ScaleThreshold && p.WorkerCount() < atomic.LoadInt32(&p.cap)) || p.WorkerCount() == 0 {
- p.incWorkerCount()
- w := workerPool.Get().(*worker)
- w.pool = p
- w.run()
- }
- }
- // SetPanicHandler the func here will be called after the panic has been recovered.
- func (p *pool) SetPanicHandler(f func(context.Context, interface{})) {
- p.panicHandler = f
- }
- func (p *pool) WorkerCount() int32 {
- return atomic.LoadInt32(&p.workerCount)
- }
- func (p *pool) incWorkerCount() {
- atomic.AddInt32(&p.workerCount, 1)
- }
- func (p *pool) decWorkerCount() {
- atomic.AddInt32(&p.workerCount, -1)
- }
|