Преглед изворни кода

功能完善:
1、jetty关闭优化(来自osc上好友QQ2575029833的pr);
2、任务终止时回调优化,执行队列中的调度进行回调;

xueli.xue пре 8 година
родитељ
комит
168050dcda

+ 8 - 0
README.md

@@ -71,6 +71,13 @@ git.osc地址:http://git.oschina.net/xuxueli0323/xxl-job
 	2、执行器异步回调执行日志;
 	3、【重要】在 “调度中心” 支持HA的基础上,扩展执行器的Failover支持,支持配置多执行期地址;
 
+# 规划中
+	1、任务终止时,任务队列中调度回调通过被终止的接口;
+	2、任务执行规则自定义:假如前一个任务正在执行,后续调度执行规则支持自定义;
+		串行(默认,当前逻辑):后续调度入调度队列;
+		并行:后续调度并行执行;
+		Pass:后续调度被Pass;
+
 # 源码目录说明
 	/xxl-job-admin					【调度中心】:负责管理调度信息,按照调度配置发出调度请求;
 	/xxl-job-core					公共依赖
@@ -87,3 +94,4 @@ git.osc地址:http://git.oschina.net/xuxueli0323/xxl-job
 	4、人人聚财金服;
 	5、……
 	更多接入公司,欢迎在https://github.com/xuxueli/xxl-job/issues/1 登记。
+	

+ 12 - 1
xxl-job-admin/src/main/java/com/xxl/job/admin/core/callback/XxlJobLogCallbackServer.java

@@ -22,6 +22,7 @@ public class XxlJobLogCallbackServer {
 		return trigger_log_address;
 	}
     
+    Server server = null;
     public void start(int callBackPort) throws Exception {
     	// init address
     	
@@ -32,7 +33,7 @@ public class XxlJobLogCallbackServer {
         new Thread(new Runnable() {
             @Override
             public void run() {
-                Server server = new Server();
+                server = new Server();
                 server.setThreadPool(new ExecutorThreadPool(200, 200, 30000));	// 非阻塞
 
                 // connector
@@ -59,4 +60,14 @@ public class XxlJobLogCallbackServer {
 
     }
 
+	public void destroy() {
+		if (server!=null) {
+			try {
+				server.stop();
+			} catch (Exception e) {
+				e.printStackTrace();
+			}
+		}
+	}
+
 }

+ 10 - 1
xxl-job-admin/src/main/java/com/xxl/job/admin/core/util/DynamicSchedulerUtil.java

@@ -55,15 +55,24 @@ public final class DynamicSchedulerUtil implements ApplicationContextAware, Init
 	}
     
     // init
+    XxlJobLogCallbackServer xxlJobLogCallbackServer = null;
     public void init(){
     	try {
     		// start callback server
-			new XxlJobLogCallbackServer().start(callBackPort);
+    		xxlJobLogCallbackServer = new XxlJobLogCallbackServer();
+    		xxlJobLogCallbackServer.start(callBackPort);
 		} catch (Exception e) {
 			e.printStackTrace();
 		}
     }
     
+    // destroy
+    public void destroy(){
+    	if (xxlJobLogCallbackServer!=null) {
+    		xxlJobLogCallbackServer.destroy();
+		}
+    }
+    
     // xxlJobLogDao、xxlJobInfoDao
     public static IXxlJobLogDao xxlJobLogDao;
     public static IXxlJobInfoDao xxlJobInfoDao;

+ 1 - 1
xxl-job-admin/src/main/resources/applicationcontext-xxl-job.xml

@@ -18,7 +18,7 @@
 	</bean>
 	
 	<!-- 协同-调度器 -->
-	<bean id="dynamicSchedulerUtil" class="com.xxl.job.admin.core.util.DynamicSchedulerUtil" init-method="init">
+	<bean id="dynamicSchedulerUtil" class="com.xxl.job.admin.core.util.DynamicSchedulerUtil" init-method="init" destroy-method="destroy" >
 		<!-- (轻易不要变更“调度器名称”, 任务创建时会绑定该“调度器名称”) -->
         <property name="scheduler" ref="quartzScheduler"/>
         <property name="callBackPort" value="8888"/>

+ 14 - 1
xxl-job-core/src/main/java/com/xxl/job/core/executor/jetty/XxlJobExecutor.java

@@ -29,12 +29,14 @@ public class XxlJobExecutor implements ApplicationContextAware {
         this.port = port;
     }
 
+    // ---------------------------------- job server ------------------------------------
+    Server server = null;
     public void start() throws Exception {
 
         new Thread(new Runnable() {
             @Override
             public void run() {
-                Server server = new Server();
+                server = new Server();
                 server.setThreadPool(new ExecutorThreadPool(200, 200, 30000));	// 非阻塞
 
                 // connector
@@ -60,7 +62,18 @@ public class XxlJobExecutor implements ApplicationContextAware {
         }).start();
 
     }
+    
+    public void destroy(){
+    	if (server!=null) {
+    		try {
+				server.stop();
+			} catch (Exception e) {
+				e.printStackTrace();
+			}
+		}
+    }
 
+    // ---------------------------------- init job handler ------------------------------------
     public static ApplicationContext applicationContext;
 	@Override
 	public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {

+ 22 - 0
xxl-job-core/src/main/java/com/xxl/job/core/handler/HandlerThread.java

@@ -96,6 +96,12 @@ public class HandlerThread extends Thread{
 						params.put("status", _status.name());
 						params.put("msg", _msg);
 						HandlerRepository.pushCallBack(HttpUtil.addressToUrl(log_address), params);
+					} else {
+						HashMap<String, String> params = new HashMap<String, String>();
+						params.put("log_id", log_id);
+						params.put("status", JobHandleStatus.FAIL.name());
+						params.put("msg", "人工手动终止[业务运行中,被强制终止]");
+						HandlerRepository.pushCallBack(HttpUtil.addressToUrl(log_address), params);
 					}
 				} else {
 					i++;
@@ -113,6 +119,22 @@ public class HandlerThread extends Thread{
 				logger.info("HandlerThread Exception:", e);
 			}
 		}
+		
+		// callback trigger request in queue
+		while(handlerDataQueue!=null && handlerDataQueue.size()>0){
+			Map<String, String> handlerData = handlerDataQueue.poll();
+			if (handlerData!=null) {
+				String log_address = handlerData.get(HandlerParamEnum.LOG_ADDRESS.name());
+				String log_id = handlerData.get(HandlerParamEnum.LOG_ID.name());
+				
+				HashMap<String, String> params = new HashMap<String, String>();
+				params.put("log_id", log_id);
+				params.put("status", JobHandleStatus.FAIL.name());
+				params.put("msg", "人工手动终止[任务尚未执行,在调度队列中被终止]");
+				HandlerRepository.pushCallBack(HttpUtil.addressToUrl(log_address), params);
+			}
+		}
+		
 		logger.info(">>>>>>>>>>>> xxl-job handlerThrad stoped, hashCode:{}", Thread.currentThread());
 	}
 }

+ 1 - 1
xxl-job-executor-example/src/main/resources/applicationcontext-xxl-job.xml

@@ -12,7 +12,7 @@
 	<context:component-scan base-package="com.xxl.job.executor" />
 
 	<!-- 执行器 -->
-	<bean id="xxlJobExecutor" class="com.xxl.job.core.executor.jetty.XxlJobExecutor" init-method="start">
+	<bean id="xxlJobExecutor" class="com.xxl.job.core.executor.jetty.XxlJobExecutor" init-method="start" destroy-method="destroy" >
 		<property name="port" value="9999" />
 	</bean>