Kaynağa Gözat

远程调度增强校验:1、调度中心与执行器时间偏移超过60秒拒绝任务执行;2、执行器校验调度uuid避免一次调度重复执行;

xueli.xue 9 yıl önce
ebeveyn
işleme
ee9eca0db7

+ 1 - 0
xxl-job-admin/src/main/java/com/xxl/job/service/job/RemoteHttpJobBean.java

@@ -57,6 +57,7 @@ public class RemoteHttpJobBean extends QuartzJobBean {
 		HashMap<String, String> params = new HashMap<String, String>();
 		params.put(HandlerRepository.TRIGGER_LOG_URL, PropertiesUtil.getString(HandlerRepository.TRIGGER_LOG_URL));
 		params.put(HandlerRepository.TRIGGER_LOG_ID, String.valueOf(jobLog.getId()));
+		params.put(HandlerRepository.TRIGGER_TIMESTAMP, String.valueOf(System.currentTimeMillis()));
 		params.put(HandlerRepository.HANDLER_NAME, jobDataMap.get(HandlerRepository.HANDLER_NAME));
 		params.put(HandlerRepository.HANDLER_PARAMS, jobDataMap.get(HandlerRepository.HANDLER_PARAMS));
 

+ 8 - 0
xxl-job-client/src/main/java/com/xxl/job/client/handler/HandlerRepository.java

@@ -22,6 +22,7 @@ public class HandlerRepository {
 	
 	public static final String TRIGGER_LOG_ID = "trigger_log_id";
 	public static final String TRIGGER_LOG_URL = "trigger_log_url";
+	public static final String TRIGGER_TIMESTAMP = "trigger_timestamp";
 	
 	public static ConcurrentHashMap<String, HandlerThread> handlerTreadMap = new ConcurrentHashMap<String, HandlerThread>();
 	
@@ -41,6 +42,13 @@ public class HandlerRepository {
 		RemoteCallBack callback = new RemoteCallBack();
 		callback.setStatus(RemoteCallBack.FAIL);
 		
+		// encryption check
+		long timestamp = _param.get(HandlerRepository.TRIGGER_TIMESTAMP)!=null?Long.valueOf(_param.get(HandlerRepository.TRIGGER_TIMESTAMP)):-1;
+		if (System.currentTimeMillis() - timestamp > 60000) {
+			callback.setMsg("Timestamp check failed.");
+			return JacksonUtil.writeValueAsString(callback);
+		}
+				
 		// push data to queue
 		String handler_name = _param.get(HandlerRepository.HANDLER_NAME);
 		if (handler_name!=null && handler_name.trim().length()>0) {

+ 11 - 4
xxl-job-client/src/main/java/com/xxl/job/client/handler/HandlerThread.java

@@ -7,6 +7,7 @@ import java.util.Map;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 
+import org.eclipse.jetty.util.ConcurrentHashSet;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -23,14 +24,18 @@ public class HandlerThread extends Thread{
 	
 	private IJobHandler handler;
 	private LinkedBlockingQueue<Map<String, String>> handlerDataQueue;
+	private ConcurrentHashSet<String> logIdSet;		// avoid repeat trigger for the same TRIGGER_LOG_ID
 	
 	public HandlerThread(IJobHandler handler) {
 		this.handler = handler;
 		handlerDataQueue = new LinkedBlockingQueue<Map<String,String>>();
+		logIdSet = new ConcurrentHashSet<String>();
 	}
 	
 	public void pushData(Map<String, String> param) {
-		handlerDataQueue.offer(param);
+		if (param.get(HandlerRepository.TRIGGER_LOG_ID)!=null && !logIdSet.contains(param.get(HandlerRepository.TRIGGER_LOG_ID))) {
+			handlerDataQueue.offer(param);
+		}
 	}
 	
 	int i = 1;
@@ -38,12 +43,13 @@ public class HandlerThread extends Thread{
 	public void run() {
 		while(true){
 			try {
-				i++;
 				Map<String, String> handlerData = handlerDataQueue.poll();
 				if (handlerData!=null) {
+					i= 0;
 					String trigger_log_url = handlerData.get(HandlerRepository.TRIGGER_LOG_URL);
 					String trigger_log_id = handlerData.get(HandlerRepository.TRIGGER_LOG_ID);
 					String handler_params = handlerData.get(HandlerRepository.HANDLER_PARAMS);
+					logIdSet.remove(trigger_log_id);
 					
 					// parse param
 					String[] handlerParams = null; 
@@ -68,9 +74,8 @@ public class HandlerThread extends Thread{
 					// callback handler info
 					RemoteCallBack callback = null;
 					try {
-						
 						HashMap<String, String> params = new HashMap<String, String>();
-						params.put(HandlerRepository.TRIGGER_LOG_ID, trigger_log_id);
+						params.put("trigger_log_id", trigger_log_id);
 						params.put("status", _status.name());
 						params.put("msg", _msg);
 						callback = HttpUtil.post(trigger_log_url, params);
@@ -80,6 +85,8 @@ public class HandlerThread extends Thread{
 					logger.info("<<<<<<<<<<< xxl-job thread handle, handlerData:{}, callback_status:{}, callback_msg:{}, callback:{}, thread:{}", 
 							new Object[]{handlerData, _status, _msg, callback, this});
 				} else {
+					i++;
+					logIdSet.clear();
 					try {
 						TimeUnit.MILLISECONDS.sleep(i * 100);
 					} catch (InterruptedException e) {