pool.go 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255
  1. package utils
  2. import (
  3. "container/list"
  4. "fmt"
  5. "log"
  6. "sync"
  7. "sync/atomic"
  8. "time"
  9. )
  10. type Job struct {
  11. F func(...interface{}) interface{}
  12. Args []interface{}
  13. Result interface{}
  14. Err error
  15. added chan bool
  16. Worker_id uint
  17. Job_id uint64
  18. }
  19. type stats struct {
  20. Submitted int
  21. Running int
  22. Completed int
  23. }
  24. type Pool struct {
  25. next_job_id uint64
  26. workers_started bool
  27. supervisor_started bool
  28. num_workers int
  29. job_wanted_pipe chan chan *Job
  30. done_pipe chan *Job
  31. add_pipe chan *Job
  32. result_wanted_pipe chan chan *Job
  33. jobs_ready_to_run *list.List
  34. num_jobs_submitted int
  35. num_jobs_running int
  36. num_jobs_completed int
  37. jobs_completed *list.List
  38. interval time.Duration
  39. working_wanted_pipe chan chan bool
  40. stats_wanted_pipe chan chan stats
  41. worker_kill_pipe chan bool
  42. supervisor_kill_pipe chan bool
  43. worker_wg sync.WaitGroup
  44. supervisor_wg sync.WaitGroup
  45. }
  46. func (pool *Pool) subworker(job *Job) {
  47. defer func() {
  48. if err := recover(); err != nil {
  49. log.Println("panic while running job:", err)
  50. job.Result = nil
  51. job.Err = fmt.Errorf(err.(string))
  52. }
  53. }()
  54. job.Result = job.F(job.Args...)
  55. }
  56. func (pool *Pool) worker(worker_id uint) {
  57. job_pipe := make(chan *Job)
  58. WORKER_LOOP:
  59. for {
  60. pool.job_wanted_pipe <- job_pipe
  61. job := <-job_pipe
  62. if job == nil {
  63. time.Sleep(pool.interval * time.Millisecond)
  64. } else {
  65. job.Worker_id = worker_id
  66. pool.subworker(job)
  67. pool.done_pipe <- job
  68. }
  69. select {
  70. case <-pool.worker_kill_pipe:
  71. break WORKER_LOOP
  72. default:
  73. }
  74. }
  75. pool.worker_wg.Done()
  76. }
  77. func NewPool(workers int) (pool *Pool) {
  78. pool = new(Pool)
  79. pool.num_workers = workers
  80. pool.job_wanted_pipe = make(chan chan *Job)
  81. pool.done_pipe = make(chan *Job)
  82. pool.add_pipe = make(chan *Job)
  83. pool.result_wanted_pipe = make(chan chan *Job)
  84. pool.jobs_ready_to_run = list.New()
  85. pool.jobs_completed = list.New()
  86. pool.working_wanted_pipe = make(chan chan bool)
  87. pool.stats_wanted_pipe = make(chan chan stats)
  88. pool.worker_kill_pipe = make(chan bool)
  89. pool.supervisor_kill_pipe = make(chan bool)
  90. pool.interval = 1000
  91. pool.next_job_id = 0
  92. pool.startSupervisor()
  93. return
  94. }
  95. func (pool *Pool) supervisor() {
  96. SUPERVISOR_LOOP:
  97. for {
  98. select {
  99. case job := <-pool.add_pipe:
  100. pool.jobs_ready_to_run.PushBack(job)
  101. pool.num_jobs_submitted++
  102. job.added <- true
  103. case job_pipe := <-pool.job_wanted_pipe:
  104. element := pool.jobs_ready_to_run.Front()
  105. var job *Job = nil
  106. if element != nil {
  107. job = element.Value.(*Job)
  108. pool.num_jobs_running++
  109. pool.jobs_ready_to_run.Remove(element)
  110. }
  111. job_pipe <- job
  112. case job := <-pool.done_pipe:
  113. pool.num_jobs_running--
  114. pool.jobs_completed.PushBack(job)
  115. pool.num_jobs_completed++
  116. case result_pipe := <-pool.result_wanted_pipe:
  117. close_pipe := false
  118. job := (*Job)(nil)
  119. element := pool.jobs_completed.Front()
  120. if element != nil {
  121. job = element.Value.(*Job)
  122. pool.jobs_completed.Remove(element)
  123. } else {
  124. if pool.num_jobs_running == 0 && pool.num_jobs_completed == pool.num_jobs_submitted {
  125. close_pipe = true
  126. }
  127. }
  128. if close_pipe {
  129. close(result_pipe)
  130. } else {
  131. result_pipe <- job
  132. }
  133. case working_pipe := <-pool.working_wanted_pipe:
  134. working := true
  135. if pool.jobs_ready_to_run.Len() == 0 && pool.num_jobs_running == 0 {
  136. working = false
  137. }
  138. working_pipe <- working
  139. case stats_pipe := <-pool.stats_wanted_pipe:
  140. pool_stats := stats{pool.num_jobs_submitted, pool.num_jobs_running, pool.num_jobs_completed}
  141. stats_pipe <- pool_stats
  142. case <-pool.supervisor_kill_pipe:
  143. break SUPERVISOR_LOOP
  144. }
  145. }
  146. pool.supervisor_wg.Done()
  147. }
  148. func (pool *Pool) Run() {
  149. if pool.workers_started {
  150. panic("trying to start a pool that's already running")
  151. }
  152. for i := uint(0); i < uint(pool.num_workers); i++ {
  153. pool.worker_wg.Add(1)
  154. go pool.worker(i)
  155. }
  156. pool.workers_started = true
  157. if !pool.supervisor_started {
  158. pool.startSupervisor()
  159. }
  160. }
  161. func (pool *Pool) Stop() {
  162. if !pool.workers_started {
  163. panic("trying to stop a pool that's already stopped")
  164. }
  165. for i := 0; i < pool.num_workers; i++ {
  166. pool.worker_kill_pipe <- true
  167. }
  168. pool.worker_wg.Wait()
  169. pool.workers_started = false
  170. if pool.supervisor_started {
  171. pool.stopSupervisor()
  172. }
  173. }
  174. func (pool *Pool) startSupervisor() {
  175. pool.supervisor_wg.Add(1)
  176. go pool.supervisor()
  177. pool.supervisor_started = true
  178. }
  179. func (pool *Pool) stopSupervisor() {
  180. pool.supervisor_kill_pipe <- true
  181. pool.supervisor_wg.Wait()
  182. pool.supervisor_started = false
  183. }
  184. func (pool *Pool) Add(f func(...interface{}) interface{}, args ...interface{}) {
  185. job := &Job{f, args, nil, nil, make(chan bool), 0, pool.getNextJobId()}
  186. pool.add_pipe <- job
  187. <-job.added
  188. }
  189. func (pool *Pool) getNextJobId() uint64 {
  190. return atomic.AddUint64(&pool.next_job_id, 1)
  191. }
  192. func (pool *Pool) Wait() {
  193. working_pipe := make(chan bool)
  194. for {
  195. pool.working_wanted_pipe <- working_pipe
  196. if !<-working_pipe {
  197. break
  198. }
  199. time.Sleep(pool.interval * time.Millisecond)
  200. }
  201. }
  202. func (pool *Pool) Results() (res []*Job) {
  203. res = make([]*Job, pool.jobs_completed.Len())
  204. i := 0
  205. for e := pool.jobs_completed.Front(); e != nil; e = e.Next() {
  206. res[i] = e.Value.(*Job)
  207. i++
  208. }
  209. pool.jobs_completed = list.New()
  210. return
  211. }
  212. func (pool *Pool) WaitForJob() *Job {
  213. result_pipe := make(chan *Job)
  214. var job *Job
  215. var ok bool
  216. for {
  217. pool.result_wanted_pipe <- result_pipe
  218. job, ok = <-result_pipe
  219. if !ok {
  220. return nil
  221. }
  222. if job == (*Job)(nil) {
  223. time.Sleep(pool.interval * time.Millisecond)
  224. } else {
  225. break
  226. }
  227. }
  228. return job
  229. }
  230. func (pool *Pool) Status() stats {
  231. stats_pipe := make(chan stats)
  232. if pool.supervisor_started {
  233. pool.stats_wanted_pipe <- stats_pipe
  234. return <-stats_pipe
  235. }
  236. return stats{}
  237. }