123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255 |
- package utils
- import (
- "container/list"
- "fmt"
- "log"
- "sync"
- "sync/atomic"
- "time"
- )
- type Job struct {
- F func(...interface{}) interface{}
- Args []interface{}
- Result interface{}
- Err error
- added chan bool
- Worker_id uint
- Job_id uint64
- }
- type stats struct {
- Submitted int
- Running int
- Completed int
- }
- type Pool struct {
- next_job_id uint64
- workers_started bool
- supervisor_started bool
- num_workers int
- job_wanted_pipe chan chan *Job
- done_pipe chan *Job
- add_pipe chan *Job
- result_wanted_pipe chan chan *Job
- jobs_ready_to_run *list.List
- num_jobs_submitted int
- num_jobs_running int
- num_jobs_completed int
- jobs_completed *list.List
- interval time.Duration
- working_wanted_pipe chan chan bool
- stats_wanted_pipe chan chan stats
- worker_kill_pipe chan bool
- supervisor_kill_pipe chan bool
- worker_wg sync.WaitGroup
- supervisor_wg sync.WaitGroup
- }
- func (pool *Pool) subworker(job *Job) {
- defer func() {
- if err := recover(); err != nil {
- log.Println("panic while running job:", err)
- job.Result = nil
- job.Err = fmt.Errorf(err.(string))
- }
- }()
- job.Result = job.F(job.Args...)
- }
- func (pool *Pool) worker(worker_id uint) {
- job_pipe := make(chan *Job)
- WORKER_LOOP:
- for {
- pool.job_wanted_pipe <- job_pipe
- job := <-job_pipe
- if job == nil {
- time.Sleep(pool.interval * time.Millisecond)
- } else {
- job.Worker_id = worker_id
- pool.subworker(job)
- pool.done_pipe <- job
- }
- select {
- case <-pool.worker_kill_pipe:
- break WORKER_LOOP
- default:
- }
- }
- pool.worker_wg.Done()
- }
- func NewPool(workers int) (pool *Pool) {
- pool = new(Pool)
- pool.num_workers = workers
- pool.job_wanted_pipe = make(chan chan *Job)
- pool.done_pipe = make(chan *Job)
- pool.add_pipe = make(chan *Job)
- pool.result_wanted_pipe = make(chan chan *Job)
- pool.jobs_ready_to_run = list.New()
- pool.jobs_completed = list.New()
- pool.working_wanted_pipe = make(chan chan bool)
- pool.stats_wanted_pipe = make(chan chan stats)
- pool.worker_kill_pipe = make(chan bool)
- pool.supervisor_kill_pipe = make(chan bool)
- pool.interval = 1000
- pool.next_job_id = 0
- pool.startSupervisor()
- return
- }
- func (pool *Pool) supervisor() {
- SUPERVISOR_LOOP:
- for {
- select {
- case job := <-pool.add_pipe:
- pool.jobs_ready_to_run.PushBack(job)
- pool.num_jobs_submitted++
- job.added <- true
- case job_pipe := <-pool.job_wanted_pipe:
- element := pool.jobs_ready_to_run.Front()
- var job *Job = nil
- if element != nil {
- job = element.Value.(*Job)
- pool.num_jobs_running++
- pool.jobs_ready_to_run.Remove(element)
- }
- job_pipe <- job
- case job := <-pool.done_pipe:
- pool.num_jobs_running--
- pool.jobs_completed.PushBack(job)
- pool.num_jobs_completed++
- case result_pipe := <-pool.result_wanted_pipe:
- close_pipe := false
- job := (*Job)(nil)
- element := pool.jobs_completed.Front()
- if element != nil {
- job = element.Value.(*Job)
- pool.jobs_completed.Remove(element)
- } else {
- if pool.num_jobs_running == 0 && pool.num_jobs_completed == pool.num_jobs_submitted {
- close_pipe = true
- }
- }
- if close_pipe {
- close(result_pipe)
- } else {
- result_pipe <- job
- }
- case working_pipe := <-pool.working_wanted_pipe:
- working := true
- if pool.jobs_ready_to_run.Len() == 0 && pool.num_jobs_running == 0 {
- working = false
- }
- working_pipe <- working
- case stats_pipe := <-pool.stats_wanted_pipe:
- pool_stats := stats{pool.num_jobs_submitted, pool.num_jobs_running, pool.num_jobs_completed}
- stats_pipe <- pool_stats
- case <-pool.supervisor_kill_pipe:
- break SUPERVISOR_LOOP
- }
- }
- pool.supervisor_wg.Done()
- }
- func (pool *Pool) Run() {
- if pool.workers_started {
- panic("trying to start a pool that's already running")
- }
- for i := uint(0); i < uint(pool.num_workers); i++ {
- pool.worker_wg.Add(1)
- go pool.worker(i)
- }
- pool.workers_started = true
- if !pool.supervisor_started {
- pool.startSupervisor()
- }
- }
- func (pool *Pool) Stop() {
- if !pool.workers_started {
- panic("trying to stop a pool that's already stopped")
- }
- for i := 0; i < pool.num_workers; i++ {
- pool.worker_kill_pipe <- true
- }
- pool.worker_wg.Wait()
- pool.workers_started = false
- if pool.supervisor_started {
- pool.stopSupervisor()
- }
- }
- func (pool *Pool) startSupervisor() {
- pool.supervisor_wg.Add(1)
- go pool.supervisor()
- pool.supervisor_started = true
- }
- func (pool *Pool) stopSupervisor() {
- pool.supervisor_kill_pipe <- true
- pool.supervisor_wg.Wait()
- pool.supervisor_started = false
- }
- func (pool *Pool) Add(f func(...interface{}) interface{}, args ...interface{}) {
- job := &Job{f, args, nil, nil, make(chan bool), 0, pool.getNextJobId()}
- pool.add_pipe <- job
- <-job.added
- }
- func (pool *Pool) getNextJobId() uint64 {
- return atomic.AddUint64(&pool.next_job_id, 1)
- }
- func (pool *Pool) Wait() {
- working_pipe := make(chan bool)
- for {
- pool.working_wanted_pipe <- working_pipe
- if !<-working_pipe {
- break
- }
- time.Sleep(pool.interval * time.Millisecond)
- }
- }
- func (pool *Pool) Results() (res []*Job) {
- res = make([]*Job, pool.jobs_completed.Len())
- i := 0
- for e := pool.jobs_completed.Front(); e != nil; e = e.Next() {
- res[i] = e.Value.(*Job)
- i++
- }
- pool.jobs_completed = list.New()
- return
- }
- func (pool *Pool) WaitForJob() *Job {
- result_pipe := make(chan *Job)
- var job *Job
- var ok bool
- for {
- pool.result_wanted_pipe <- result_pipe
- job, ok = <-result_pipe
- if !ok {
- return nil
- }
- if job == (*Job)(nil) {
- time.Sleep(pool.interval * time.Millisecond)
- } else {
- break
- }
- }
- return job
- }
- func (pool *Pool) Status() stats {
- stats_pipe := make(chan stats)
- if pool.supervisor_started {
- pool.stats_wanted_pipe <- stats_pipe
- return <-stats_pipe
- }
- return stats{}
- }
|