Browse Source

执行器优雅停机优化,修复任务线程中断未join导致回调丢失的问题;

xuxueli 5 years ago
parent
commit
9f5267c246

+ 8 - 7
doc/XXL-JOB官方文档.md

@@ -1716,13 +1716,14 @@ public ReturnT<String> execute(String param) {
 - 10、移除旧类注解JobHandler,推荐使用基于方法注解 "@XxlJob" 的方式进行任务开发;(如需保留类注解JobHandler使用方式,可以参考旧版逻辑定制开发);
 - 11、修复bootstrap.min.css.map 404问题;
 - 12、XxlJob注解扫描方式优化,支持查找父类以及接口和基于类代理等常见情况;
-- 13、[迭代中]自定义失败重试时间间隔;
-- 14、[迭代中]任务复制功能;点击复制是弹出新建任务弹框,并初始化被复制任务信息;
-- 15、[迭代中]新增执行器描述、任务描述属性;
-- 16、[迭代中]任务执行一次的时候指定IP;
-- 17、[迭代中]任务日志支持单个清理和状态转移,方便触发子任务;
-- 18、[迭代中]任务结果丢失处理:针对长期处于运行中的任务(设置过期时间时,运行超过"过期时间+1min";未设置超时时间时,运行超过"30min"),主动检测该执行器是否在线,如果不在线主动标记失败;
-- 19、[迭代中]优雅停机回调丢失问题修复;
+- 13、执行器优雅停机优化,修复任务线程中断未join导致回调丢失的问题;
+- 14、[迭代中]自定义失败重试时间间隔;
+- 15、[迭代中]任务复制功能;点击复制是弹出新建任务弹框,并初始化被复制任务信息;
+- 16、[迭代中]新增执行器描述、任务描述属性;
+- 17、[迭代中]任务执行一次的时候指定IP;
+- 18、[迭代中]任务日志支持单个清理和状态转移,方便触发子任务;
+- 19、[迭代中]任务结果丢失处理:针对长期处于运行中的任务(设置过期时间时,运行超过"过期时间+1min";未设置超时时间时,运行超过"30min"),主动检测该执行器是否在线,如果不在线主动标记失败;
+- 20、[迭代中]优雅停机回调丢失问题修复;
 
 
 ### TODO LIST

+ 13 - 17
xxl-job-core/src/main/java/com/xxl/job/core/executor/XxlJobExecutor.java

@@ -84,33 +84,26 @@ public class XxlJobExecutor  {
         initRpcProvider(ip, port, appName, accessToken);
     }
     public void destroy(){
-
-        List<JobThread> runningThreads = new ArrayList<>(jobThreadRepository.values());
-        if (logger.isInfoEnabled()) {
-            logger.info("running threads {}", runningThreads);
-        }
-
         // destory executor-server
         stopRpcProvider();
 
         // destory jobThreadRepository
         if (jobThreadRepository.size() > 0) {
             for (Map.Entry<Integer, JobThread> item: jobThreadRepository.entrySet()) {
-                removeJobThread(item.getKey(), "web container destroy and kill the job.");
+                JobThread oldJobThread = removeJobThread(item.getKey(), "web container destroy and kill the job.");
+                // wait for job thread push result to callback queue
+                if (oldJobThread != null) {
+                    try {
+                        oldJobThread.join();
+                    } catch (InterruptedException e) {
+                        logger.error(">>>>>>>>>>> xxl-job, JobThread destroy(join) error, jobId:{}", item.getKey(), e);
+                    }
+                }
             }
             jobThreadRepository.clear();
         }
         jobHandlerRepository.clear();
 
-        for (JobThread runningThread : runningThreads) {
-            try {
-                // wait for all job thread push result to callback queue
-                runningThread.join();
-            } catch (InterruptedException e) {
-                logger.warn("interrupted while stopping {}", runningThread);
-                break;
-            }
-        }
 
         // destory JobLogFileCleanThread
         JobLogFileCleanThread.getInstance().toStop();
@@ -246,12 +239,15 @@ public class XxlJobExecutor  {
 
         return newJobThread;
     }
-    public static void removeJobThread(int jobId, String removeOldReason){
+    public static JobThread removeJobThread(int jobId, String removeOldReason){
         JobThread oldJobThread = jobThreadRepository.remove(jobId);
         if (oldJobThread != null) {
             oldJobThread.toStop(removeOldReason);
             oldJobThread.interrupt();
+
+            return oldJobThread;
         }
+        return null;
     }
     public static JobThread loadJobThread(int jobId){
         JobThread jobThread = jobThreadRepository.get(jobId);