scheduler.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562
  1. package gocron
  2. import (
  3. "math"
  4. "reflect"
  5. "sort"
  6. "strings"
  7. "time"
  8. )
  9. // Scheduler struct stores a list of Jobs and the location of time Scheduler
  10. // Scheduler implements the sort.Interface{} for sorting Jobs, by the time of nextRun
  11. type Scheduler struct {
  12. jobs []*Job
  13. loc *time.Location
  14. running bool
  15. stopChan chan struct{} // signal to stop scheduling
  16. time timeWrapper // wrapper around time.Time
  17. }
  18. // NewScheduler creates a new Scheduler
  19. func NewScheduler(loc *time.Location) *Scheduler {
  20. return &Scheduler{
  21. jobs: make([]*Job, 0),
  22. loc: loc,
  23. running: false,
  24. stopChan: make(chan struct{}),
  25. time: &trueTime{},
  26. }
  27. }
  28. // StartBlocking starts all the pending jobs using a second-long ticker and blocks the current thread
  29. func (s *Scheduler) StartBlocking() {
  30. <-s.StartAsync()
  31. }
  32. // StartAsync starts a goroutine that runs all the pending using a second-long ticker
  33. func (s *Scheduler) StartAsync() chan struct{} {
  34. if s.running {
  35. return s.stopChan
  36. }
  37. s.running = true
  38. s.scheduleAllJobs()
  39. ticker := s.time.NewTicker(1 * time.Second)
  40. go func() {
  41. for {
  42. select {
  43. case <-ticker.C:
  44. s.RunPending()
  45. case <-s.stopChan:
  46. ticker.Stop()
  47. s.running = false
  48. return
  49. }
  50. }
  51. }()
  52. return s.stopChan
  53. }
  54. // Jobs returns the list of Jobs from the Scheduler
  55. func (s *Scheduler) Jobs() []*Job {
  56. return s.jobs
  57. }
  58. // Len returns the number of Jobs in the Scheduler
  59. func (s *Scheduler) Len() int {
  60. return len(s.jobs)
  61. }
  62. // Swap
  63. func (s *Scheduler) Swap(i, j int) {
  64. s.jobs[i], s.jobs[j] = s.jobs[j], s.jobs[i]
  65. }
  66. func (s *Scheduler) Less(i, j int) bool {
  67. return s.jobs[j].nextRun.Unix() >= s.jobs[i].nextRun.Unix()
  68. }
  69. // ChangeLocation changes the default time location
  70. func (s *Scheduler) ChangeLocation(newLocation *time.Location) {
  71. s.loc = newLocation
  72. }
  73. // scheduleNextRun Compute the instant when this Job should run next
  74. func (s *Scheduler) scheduleNextRun(j *Job) {
  75. j.Lock()
  76. defer j.Unlock()
  77. now := s.time.Now(s.loc)
  78. if j.startsImmediately {
  79. j.nextRun = now
  80. j.startsImmediately = false
  81. return
  82. }
  83. // delta represent the time slice used to calculate the next run
  84. // it can be the last time ran by a job, or time.Now() if the job never ran
  85. var delta time.Time
  86. if j.neverRan() {
  87. if !j.nextRun.IsZero() { // scheduled for future run, wait to run at least once
  88. return
  89. }
  90. delta = now
  91. } else {
  92. delta = j.lastRun
  93. }
  94. switch j.unit {
  95. case seconds, minutes, hours:
  96. j.nextRun = s.rescheduleDuration(j, delta)
  97. case days:
  98. j.nextRun = s.rescheduleDay(j, delta)
  99. case weeks:
  100. j.nextRun = s.rescheduleWeek(j, delta)
  101. case months:
  102. j.nextRun = s.rescheduleMonth(j, delta)
  103. }
  104. }
  105. func (s *Scheduler) rescheduleMonth(j *Job, delta time.Time) time.Time {
  106. if j.neverRan() { // calculate days to j.dayOfTheMonth
  107. jobDay := time.Date(delta.Year(), delta.Month(), j.dayOfTheMonth, 0, 0, 0, 0, s.loc).Add(j.atTime)
  108. daysDifference := int(math.Abs(delta.Sub(jobDay).Hours()) / 24)
  109. nextRun := s.roundToMidnight(delta)
  110. if jobDay.Before(delta) { // shouldn't run this month; schedule for next interval minus day difference
  111. nextRun = nextRun.AddDate(0, int(j.interval), -daysDifference)
  112. } else {
  113. if j.interval == 1 { // every month counts current month
  114. nextRun = nextRun.AddDate(0, int(j.interval)-1, daysDifference)
  115. } else { // should run next month interval
  116. nextRun = nextRun.AddDate(0, int(j.interval), daysDifference)
  117. }
  118. }
  119. return nextRun.Add(j.atTime)
  120. }
  121. return s.roundToMidnight(delta).AddDate(0, int(j.interval), 0).Add(j.atTime)
  122. }
  123. func (s *Scheduler) rescheduleWeek(j *Job, delta time.Time) time.Time {
  124. var days int
  125. if j.scheduledWeekday != nil { // weekday selected, Every().Monday(), for example
  126. days = s.calculateWeekdayDifference(delta, j)
  127. } else {
  128. days = int(j.interval) * 7
  129. }
  130. delta = s.roundToMidnight(delta)
  131. return delta.AddDate(0, 0, days).Add(j.atTime)
  132. }
  133. func (s *Scheduler) rescheduleDay(j *Job, delta time.Time) time.Time {
  134. if j.interval == 1 {
  135. atTime := time.Date(delta.Year(), delta.Month(), delta.Day(), 0, 0, 0, 0, s.loc).Add(j.atTime)
  136. if delta.Before(atTime) { // should run today
  137. return s.roundToMidnight(delta).Add(j.atTime)
  138. }
  139. }
  140. return s.roundToMidnight(delta).AddDate(0, 0, int(j.interval)).Add(j.atTime)
  141. }
  142. func (s *Scheduler) rescheduleDuration(j *Job, delta time.Time) time.Time {
  143. if j.neverRan() && j.atTime != 0 { // ugly. in order to avoid this we could prohibit setting .At() and allowing only .StartAt() when dealing with Duration types
  144. atTime := time.Date(delta.Year(), delta.Month(), delta.Day(), 0, 0, 0, 0, s.loc).Add(j.atTime)
  145. if delta.Before(atTime) || delta.Equal(atTime) {
  146. return s.roundToMidnight(delta).Add(j.atTime)
  147. }
  148. }
  149. var periodDuration time.Duration
  150. switch j.unit {
  151. case seconds:
  152. periodDuration = time.Duration(j.interval) * time.Second
  153. case minutes:
  154. periodDuration = time.Duration(j.interval) * time.Minute
  155. case hours:
  156. periodDuration = time.Duration(j.interval) * time.Hour
  157. }
  158. return delta.Add(periodDuration)
  159. }
  160. func (s *Scheduler) calculateWeekdayDifference(delta time.Time, j *Job) int {
  161. daysToWeekday := remainingDaysToWeekday(delta.Weekday(), *j.scheduledWeekday)
  162. if j.interval > 1 {
  163. return daysToWeekday + int(j.interval-1)*7 // minus a week since to compensate daysToWeekday
  164. }
  165. if daysToWeekday > 0 { // within the next following days, but not today
  166. return daysToWeekday
  167. }
  168. // following paths are on same day
  169. if j.atTime.Seconds() == 0 && j.neverRan() { // .At() not set, run today
  170. return 0
  171. }
  172. atJobTime := time.Date(delta.Year(), delta.Month(), delta.Day(), 0, 0, 0, 0, s.loc).Add(j.atTime)
  173. if delta.Before(atJobTime) || delta.Equal(atJobTime) { // .At() set and should run today
  174. return 0
  175. }
  176. return 7
  177. }
  178. func remainingDaysToWeekday(from time.Weekday, to time.Weekday) int {
  179. daysUntilScheduledDay := int(to) - int(from)
  180. if daysUntilScheduledDay < 0 {
  181. daysUntilScheduledDay += 7
  182. }
  183. return daysUntilScheduledDay
  184. }
  185. // roundToMidnight truncates time to midnight
  186. func (s *Scheduler) roundToMidnight(t time.Time) time.Time {
  187. return time.Date(t.Year(), t.Month(), t.Day(), 0, 0, 0, 0, s.loc)
  188. }
  189. // Get the current runnable Jobs, which shouldRun is True
  190. func (s *Scheduler) runnableJobs() []*Job {
  191. var runnableJobs []*Job
  192. sort.Sort(s)
  193. for _, job := range s.jobs {
  194. if s.shouldRun(job) {
  195. runnableJobs = append(runnableJobs, job)
  196. } else {
  197. break
  198. }
  199. }
  200. return runnableJobs
  201. }
  202. // NextRun datetime when the next Job should run.
  203. func (s *Scheduler) NextRun() (*Job, time.Time) {
  204. if len(s.jobs) <= 0 {
  205. return nil, s.time.Now(s.loc)
  206. }
  207. sort.Sort(s)
  208. return s.jobs[0], s.jobs[0].nextRun
  209. }
  210. // Every schedules a new periodic Job with interval
  211. func (s *Scheduler) Every(interval uint64) *Scheduler {
  212. job := NewJob(interval)
  213. s.jobs = append(s.jobs, job)
  214. return s
  215. }
  216. // RunPending runs all the Jobs that are scheduled to run.
  217. func (s *Scheduler) RunPending() {
  218. for _, job := range s.runnableJobs() {
  219. s.runAndReschedule(job) // we should handle this error somehow
  220. }
  221. }
  222. func (s *Scheduler) runAndReschedule(job *Job) error {
  223. if err := s.run(job); err != nil {
  224. return err
  225. }
  226. s.scheduleNextRun(job)
  227. return nil
  228. }
  229. func (s *Scheduler) run(job *Job) error {
  230. job.lastRun = s.time.Now(s.loc)
  231. go job.run()
  232. return nil
  233. }
  234. // RunAll run all Jobs regardless if they are scheduled to run or not
  235. func (s *Scheduler) RunAll() {
  236. s.RunAllWithDelay(0)
  237. }
  238. // RunAllWithDelay runs all Jobs with delay seconds
  239. func (s *Scheduler) RunAllWithDelay(d int) {
  240. for _, job := range s.jobs {
  241. err := s.run(job)
  242. if err != nil {
  243. continue
  244. }
  245. s.time.Sleep(time.Duration(d) * time.Second)
  246. }
  247. }
  248. // Remove specific Job j by function
  249. func (s *Scheduler) Remove(j interface{}) {
  250. s.removeByCondition(func(someJob *Job) bool {
  251. return someJob.jobFunc == getFunctionName(j)
  252. })
  253. }
  254. // RemoveByReference removes specific Job j by reference
  255. func (s *Scheduler) RemoveByReference(j *Job) {
  256. s.removeByCondition(func(someJob *Job) bool {
  257. return someJob == j
  258. })
  259. }
  260. func (s *Scheduler) removeByCondition(shouldRemove func(*Job) bool) {
  261. retainedJobs := make([]*Job, 0)
  262. for _, job := range s.jobs {
  263. if !shouldRemove(job) {
  264. retainedJobs = append(retainedJobs, job)
  265. }
  266. }
  267. s.jobs = retainedJobs
  268. }
  269. // RemoveJobByTag will Remove Jobs by Tag
  270. func (s *Scheduler) RemoveJobByTag(tag string) error {
  271. jobindex, err := s.findJobsIndexByTag(tag)
  272. if err != nil {
  273. return err
  274. }
  275. // Remove job if jobindex is valid
  276. s.jobs = removeAtIndex(s.jobs, jobindex)
  277. return nil
  278. }
  279. // Find first job index by given string
  280. func (s *Scheduler) findJobsIndexByTag(tag string) (int, error) {
  281. for i, job := range s.jobs {
  282. if strings.Contains(strings.Join(job.Tags(), " "), tag) {
  283. return i, nil
  284. }
  285. }
  286. return -1, ErrJobNotFoundWithTag
  287. }
  288. func removeAtIndex(jobs []*Job, i int) []*Job {
  289. if i == len(jobs)-1 {
  290. return jobs[:i]
  291. }
  292. jobs = append(jobs[:i], jobs[i+1:]...)
  293. return jobs
  294. }
  295. // Scheduled checks if specific Job j was already added
  296. func (s *Scheduler) Scheduled(j interface{}) bool {
  297. for _, job := range s.jobs {
  298. if job.jobFunc == getFunctionName(j) {
  299. return true
  300. }
  301. }
  302. return false
  303. }
  304. // Clear clear all Jobs from this scheduler
  305. func (s *Scheduler) Clear() {
  306. s.jobs = make([]*Job, 0)
  307. }
  308. // Stop stops the scheduler. This is a no-op if the scheduler is already stopped .
  309. func (s *Scheduler) Stop() {
  310. if s.running {
  311. s.stopScheduler()
  312. }
  313. }
  314. func (s *Scheduler) stopScheduler() {
  315. s.stopChan <- struct{}{}
  316. }
  317. // Do specifies the jobFunc that should be called every time the Job runs
  318. func (s *Scheduler) Do(jobFun interface{}, params ...interface{}) (*Job, error) {
  319. j := s.getCurrentJob()
  320. if j.err != nil {
  321. return nil, j.err
  322. }
  323. typ := reflect.TypeOf(jobFun)
  324. if typ.Kind() != reflect.Func {
  325. return nil, ErrNotAFunction
  326. }
  327. fname := getFunctionName(jobFun)
  328. j.funcs[fname] = jobFun
  329. j.fparams[fname] = params
  330. j.jobFunc = fname
  331. // we should not schedule if not running since we cant foresee how long it will take for the scheduler to start
  332. if s.running {
  333. s.scheduleNextRun(j)
  334. }
  335. return j, nil
  336. }
  337. // At schedules the Job at a specific time of day in the form "HH:MM:SS" or "HH:MM"
  338. func (s *Scheduler) At(t string) *Scheduler {
  339. j := s.getCurrentJob()
  340. hour, min, sec, err := parseTime(t)
  341. if err != nil {
  342. j.err = ErrTimeFormat
  343. return s
  344. }
  345. // save atTime start as duration from midnight
  346. j.atTime = time.Duration(hour)*time.Hour + time.Duration(min)*time.Minute + time.Duration(sec)*time.Second
  347. return s
  348. }
  349. // SetTag will add tag when creating a job
  350. func (s *Scheduler) SetTag(t []string) *Scheduler {
  351. job := s.getCurrentJob()
  352. job.tags = t
  353. return s
  354. }
  355. // StartAt schedules the next run of the Job
  356. func (s *Scheduler) StartAt(t time.Time) *Scheduler {
  357. s.getCurrentJob().nextRun = t
  358. return s
  359. }
  360. // StartImmediately sets the Jobs next run as soon as the scheduler starts
  361. func (s *Scheduler) StartImmediately() *Scheduler {
  362. job := s.getCurrentJob()
  363. job.startsImmediately = true
  364. return s
  365. }
  366. // shouldRun returns true if the Job should be run now
  367. func (s *Scheduler) shouldRun(j *Job) bool {
  368. return j.shouldRun() && s.time.Now(s.loc).Unix() >= j.nextRun.Unix()
  369. }
  370. // setUnit sets the unit type
  371. func (s *Scheduler) setUnit(unit timeUnit) {
  372. currentJob := s.getCurrentJob()
  373. currentJob.unit = unit
  374. }
  375. // Second sets the unit with seconds
  376. func (s *Scheduler) Second() *Scheduler {
  377. return s.Seconds()
  378. }
  379. // Seconds sets the unit with seconds
  380. func (s *Scheduler) Seconds() *Scheduler {
  381. s.setUnit(seconds)
  382. return s
  383. }
  384. // Minute sets the unit with minutes
  385. func (s *Scheduler) Minute() *Scheduler {
  386. return s.Minutes()
  387. }
  388. // Minutes sets the unit with minutes
  389. func (s *Scheduler) Minutes() *Scheduler {
  390. s.setUnit(minutes)
  391. return s
  392. }
  393. // Hour sets the unit with hours
  394. func (s *Scheduler) Hour() *Scheduler {
  395. return s.Hours()
  396. }
  397. // Hours sets the unit with hours
  398. func (s *Scheduler) Hours() *Scheduler {
  399. s.setUnit(hours)
  400. return s
  401. }
  402. // Day sets the unit with days
  403. func (s *Scheduler) Day() *Scheduler {
  404. s.setUnit(days)
  405. return s
  406. }
  407. // Days set the unit with days
  408. func (s *Scheduler) Days() *Scheduler {
  409. s.setUnit(days)
  410. return s
  411. }
  412. // Week sets the unit with weeks
  413. func (s *Scheduler) Week() *Scheduler {
  414. s.setUnit(weeks)
  415. return s
  416. }
  417. // Weeks sets the unit with weeks
  418. func (s *Scheduler) Weeks() *Scheduler {
  419. s.setUnit(weeks)
  420. return s
  421. }
  422. // Month sets the unit with months
  423. func (s *Scheduler) Month(dayOfTheMonth int) *Scheduler {
  424. return s.Months(dayOfTheMonth)
  425. }
  426. // Months sets the unit with months
  427. func (s *Scheduler) Months(dayOfTheMonth int) *Scheduler {
  428. s.getCurrentJob().dayOfTheMonth = dayOfTheMonth
  429. s.setUnit(months)
  430. return s
  431. }
  432. // NOTE: If the dayOfTheMonth for the above two functions is
  433. // more than the number of days in that month, the extra day(s)
  434. // spill over to the next month. Similarly, if it's less than 0,
  435. // it will go back to the month before
  436. // Weekday sets the start with a specific weekday weekday
  437. func (s *Scheduler) Weekday(startDay time.Weekday) *Scheduler {
  438. s.getCurrentJob().scheduledWeekday = &startDay
  439. s.setUnit(weeks)
  440. return s
  441. }
  442. // Monday sets the start day as Monday
  443. func (s *Scheduler) Monday() *Scheduler {
  444. return s.Weekday(time.Monday)
  445. }
  446. // Tuesday sets the start day as Tuesday
  447. func (s *Scheduler) Tuesday() *Scheduler {
  448. return s.Weekday(time.Tuesday)
  449. }
  450. // Wednesday sets the start day as Wednesday
  451. func (s *Scheduler) Wednesday() *Scheduler {
  452. return s.Weekday(time.Wednesday)
  453. }
  454. // Thursday sets the start day as Thursday
  455. func (s *Scheduler) Thursday() *Scheduler {
  456. return s.Weekday(time.Thursday)
  457. }
  458. // Friday sets the start day as Friday
  459. func (s *Scheduler) Friday() *Scheduler {
  460. return s.Weekday(time.Friday)
  461. }
  462. // Saturday sets the start day as Saturday
  463. func (s *Scheduler) Saturday() *Scheduler {
  464. return s.Weekday(time.Saturday)
  465. }
  466. // Sunday sets the start day as Sunday
  467. func (s *Scheduler) Sunday() *Scheduler {
  468. return s.Weekday(time.Sunday)
  469. }
  470. func (s *Scheduler) getCurrentJob() *Job {
  471. return s.jobs[len(s.jobs)-1]
  472. }
  473. func (s *Scheduler) scheduleAllJobs() {
  474. for _, j := range s.jobs {
  475. s.scheduleNextRun(j)
  476. }
  477. }