timed_task.go 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229
  1. package timer
  2. import (
  3. "github.com/robfig/cron/v3"
  4. "sync"
  5. )
  6. type Timer interface {
  7. // 寻找所有Cron
  8. FindCronList() map[string]*taskManager
  9. // 添加Task 方法形式以秒的形式加入
  10. AddTaskByFuncWithSecond(cronName string, spec string, fun func(), taskName string, option ...cron.Option) (cron.EntryID, error) // 添加Task Func以秒的形式加入
  11. // 添加Task 接口形式以秒的形式加入
  12. AddTaskByJobWithSeconds(cronName string, spec string, job interface{ Run() }, taskName string, option ...cron.Option) (cron.EntryID, error)
  13. // 通过函数的方法添加任务
  14. AddTaskByFunc(cronName string, spec string, task func(), taskName string, option ...cron.Option) (cron.EntryID, error)
  15. // 通过接口的方法添加任务 要实现一个带有 Run方法的接口触发
  16. AddTaskByJob(cronName string, spec string, job interface{ Run() }, taskName string, option ...cron.Option) (cron.EntryID, error)
  17. // 获取对应taskName的cron 可能会为空
  18. FindCron(cronName string) (*taskManager, bool)
  19. // 指定cron开始执行
  20. StartCron(cronName string)
  21. // 指定cron停止执行
  22. StopCron(cronName string)
  23. // 查找指定cron下的指定task
  24. FindTask(cronName string, taskName string) (*task, bool)
  25. // 根据id删除指定cron下的指定task
  26. RemoveTask(cronName string, id int)
  27. // 根据taskName删除指定cron下的指定task
  28. RemoveTaskByName(cronName string, taskName string)
  29. // 清理掉指定cronName
  30. Clear(cronName string)
  31. // 停止所有的cron
  32. Close()
  33. }
  34. type task struct {
  35. EntryID cron.EntryID
  36. Spec string
  37. TaskName string
  38. }
  39. type taskManager struct {
  40. corn *cron.Cron
  41. tasks map[cron.EntryID]*task
  42. }
  43. // timer 定时任务管理
  44. type timer struct {
  45. cronList map[string]*taskManager
  46. sync.Mutex
  47. }
  48. // AddTaskByFunc 通过函数的方法添加任务
  49. func (t *timer) AddTaskByFunc(cronName string, spec string, fun func(), taskName string, option ...cron.Option) (cron.EntryID, error) {
  50. t.Lock()
  51. defer t.Unlock()
  52. if _, ok := t.cronList[cronName]; !ok {
  53. tasks := make(map[cron.EntryID]*task)
  54. t.cronList[cronName] = &taskManager{
  55. corn: cron.New(option...),
  56. tasks: tasks,
  57. }
  58. }
  59. id, err := t.cronList[cronName].corn.AddFunc(spec, fun)
  60. t.cronList[cronName].corn.Start()
  61. t.cronList[cronName].tasks[id] = &task{
  62. EntryID: id,
  63. Spec: spec,
  64. TaskName: taskName,
  65. }
  66. return id, err
  67. }
  68. // AddTaskByFuncWithSeconds 通过函数的方法使用WithSeconds添加任务
  69. func (t *timer) AddTaskByFuncWithSecond(cronName string, spec string, fun func(), taskName string, option ...cron.Option) (cron.EntryID, error) {
  70. t.Lock()
  71. defer t.Unlock()
  72. option = append(option, cron.WithSeconds())
  73. if _, ok := t.cronList[cronName]; !ok {
  74. tasks := make(map[cron.EntryID]*task)
  75. t.cronList[cronName] = &taskManager{
  76. corn: cron.New(option...),
  77. tasks: tasks,
  78. }
  79. }
  80. id, err := t.cronList[cronName].corn.AddFunc(spec, fun)
  81. t.cronList[cronName].corn.Start()
  82. t.cronList[cronName].tasks[id] = &task{
  83. EntryID: id,
  84. Spec: spec,
  85. TaskName: taskName,
  86. }
  87. return id, err
  88. }
  89. // AddTaskByJob 通过接口的方法添加任务
  90. func (t *timer) AddTaskByJob(cronName string, spec string, job interface{ Run() }, taskName string, option ...cron.Option) (cron.EntryID, error) {
  91. t.Lock()
  92. defer t.Unlock()
  93. if _, ok := t.cronList[cronName]; !ok {
  94. tasks := make(map[cron.EntryID]*task)
  95. t.cronList[cronName] = &taskManager{
  96. corn: cron.New(option...),
  97. tasks: tasks,
  98. }
  99. }
  100. id, err := t.cronList[cronName].corn.AddJob(spec, job)
  101. t.cronList[cronName].corn.Start()
  102. t.cronList[cronName].tasks[id] = &task{
  103. EntryID: id,
  104. Spec: spec,
  105. TaskName: taskName,
  106. }
  107. return id, err
  108. }
  109. // AddTaskByJobWithSeconds 通过接口的方法添加任务
  110. func (t *timer) AddTaskByJobWithSeconds(cronName string, spec string, job interface{ Run() }, taskName string, option ...cron.Option) (cron.EntryID, error) {
  111. t.Lock()
  112. defer t.Unlock()
  113. option = append(option, cron.WithSeconds())
  114. if _, ok := t.cronList[cronName]; !ok {
  115. tasks := make(map[cron.EntryID]*task)
  116. t.cronList[cronName] = &taskManager{
  117. corn: cron.New(option...),
  118. tasks: tasks,
  119. }
  120. }
  121. id, err := t.cronList[cronName].corn.AddJob(spec, job)
  122. t.cronList[cronName].corn.Start()
  123. t.cronList[cronName].tasks[id] = &task{
  124. EntryID: id,
  125. Spec: spec,
  126. TaskName: taskName,
  127. }
  128. return id, err
  129. }
  130. // FindTask 获取对应cronName的cron 可能会为空
  131. func (t *timer) FindCron(cronName string) (*taskManager, bool) {
  132. t.Lock()
  133. defer t.Unlock()
  134. v, ok := t.cronList[cronName]
  135. return v, ok
  136. }
  137. // FindTask 获取对应cronName的cron 可能会为空
  138. func (t *timer) FindTask(cronName string, taskName string) (*task, bool) {
  139. t.Lock()
  140. defer t.Unlock()
  141. v, ok := t.cronList[cronName]
  142. if !ok {
  143. return nil, ok
  144. }
  145. for _, t2 := range v.tasks {
  146. if t2.TaskName == taskName {
  147. return t2, true
  148. }
  149. }
  150. return nil, false
  151. }
  152. // FindCronList 获取所有的任务列表
  153. func (t *timer) FindCronList() map[string]*taskManager {
  154. t.Lock()
  155. defer t.Unlock()
  156. return t.cronList
  157. }
  158. // StartCron 开始任务
  159. func (t *timer) StartCron(cronName string) {
  160. t.Lock()
  161. defer t.Unlock()
  162. if v, ok := t.cronList[cronName]; ok {
  163. v.corn.Start()
  164. }
  165. }
  166. // StopCron 停止任务
  167. func (t *timer) StopCron(cronName string) {
  168. t.Lock()
  169. defer t.Unlock()
  170. if v, ok := t.cronList[cronName]; ok {
  171. v.corn.Stop()
  172. }
  173. }
  174. // Remove 从cronName 删除指定任务
  175. func (t *timer) RemoveTask(cronName string, id int) {
  176. t.Lock()
  177. defer t.Unlock()
  178. if v, ok := t.cronList[cronName]; ok {
  179. v.corn.Remove(cron.EntryID(id))
  180. delete(v.tasks, cron.EntryID(id))
  181. }
  182. }
  183. // RemoveTaskByName 从cronName 使用taskName 删除指定任务
  184. func (t *timer) RemoveTaskByName(cronName string, taskName string) {
  185. fTask, ok := t.FindTask(cronName, taskName)
  186. if !ok {
  187. return
  188. }
  189. t.RemoveTask(cronName, int(fTask.EntryID))
  190. }
  191. // Clear 清除任务
  192. func (t *timer) Clear(cronName string) {
  193. t.Lock()
  194. defer t.Unlock()
  195. if v, ok := t.cronList[cronName]; ok {
  196. v.corn.Stop()
  197. delete(t.cronList, cronName)
  198. }
  199. }
  200. // Close 释放资源
  201. func (t *timer) Close() {
  202. t.Lock()
  203. defer t.Unlock()
  204. for _, v := range t.cronList {
  205. v.corn.Stop()
  206. }
  207. }
  208. func NewTimerTask() Timer {
  209. return &timer{cronList: make(map[string]*taskManager)}
  210. }