Browse Source

调度中心注册:
1、调度中心自动注册;
2、调度时,加载在线的所有调度中心地址,push给执行器,执行器获取多个执行器地址,failover方式进行回调;

xueli.xue 8 years ago
parent
commit
caf817124e

+ 11 - 3
xxl-job-admin/src/main/java/com/xxl/job/admin/core/jobbean/RemoteHttpJobBean.java

@@ -30,7 +30,7 @@ import java.util.*;
 //@DisallowConcurrentExecution
 public class RemoteHttpJobBean extends QuartzJobBean {
 	private static Logger logger = LoggerFactory.getLogger(RemoteHttpJobBean.class);
-	
+
 	@Override
 	protected void executeInternal(JobExecutionContext context)
 			throws JobExecutionException {
@@ -43,7 +43,15 @@ public class RemoteHttpJobBean extends QuartzJobBean {
 		jobLog.setJobName(jobInfo.getJobName());
 		DynamicSchedulerUtil.xxlJobLogDao.save(jobLog);
 		logger.info(">>>>>>>>>>> xxl-job trigger start, jobId:{}", jobLog.getId());
-		
+
+        // admin address
+        List<String> adminAddressList = JobRegistryHelper.discover(RegistHelper.RegistType.ADMIN.name(), RegistHelper.RegistType.ADMIN.name());
+		Set<String> adminAddressSet = new HashSet<String>();
+        if (adminAddressList!=null) {
+            adminAddressSet.addAll(adminAddressList);
+        }
+        adminAddressSet.add(XxlJobLogCallbackServer.getTrigger_log_address());
+
 		// trigger request
 		RequestModel requestModel = new RequestModel();
 		requestModel.setTimestamp(System.currentTimeMillis());
@@ -53,7 +61,7 @@ public class RemoteHttpJobBean extends QuartzJobBean {
 		requestModel.setExecutorHandler(jobInfo.getExecutorHandler());
 		requestModel.setExecutorParams(jobInfo.getExecutorParam());
 		requestModel.setGlueSwitch((jobInfo.getGlueSwitch()==0)?false:true);
-		requestModel.setLogAddress(XxlJobLogCallbackServer.getTrigger_log_address());
+		requestModel.setLogAddress(adminAddressSet);
 		requestModel.setLogId(jobLog.getId());
 
 		// parse address

+ 7 - 5
xxl-job-core/src/main/java/com/xxl/job/core/router/model/RequestModel.java

@@ -1,5 +1,7 @@
 package com.xxl.job.core.router.model;
 
+import java.util.Set;
+
 /**
  * Created by xuxueli on 16/7/22.
  */
@@ -16,13 +18,14 @@ public class RequestModel {
 
     private boolean glueSwitch;
 
-    private String logAddress;
+    private Set<String> logAddress;
     private int logId;
     private long logDateTim;
 
     private String status;
     private String msg;
 
+
     public long getTimestamp() {
         return timestamp;
     }
@@ -79,11 +82,11 @@ public class RequestModel {
         this.glueSwitch = glueSwitch;
     }
 
-    public String getLogAddress() {
+    public Set<String> getLogAddress() {
         return logAddress;
     }
 
-    public void setLogAddress(String logAddress) {
+    public void setLogAddress(Set<String> logAddress) {
         this.logAddress = logAddress;
     }
 
@@ -129,12 +132,11 @@ public class RequestModel {
                 ", executorHandler='" + executorHandler + '\'' +
                 ", executorParams='" + executorParams + '\'' +
                 ", glueSwitch=" + glueSwitch +
-                ", logAddress='" + logAddress + '\'' +
+                ", logAddress=" + logAddress +
                 ", logId=" + logId +
                 ", logDateTim=" + logDateTim +
                 ", status='" + status + '\'' +
                 ", msg='" + msg + '\'' +
                 '}';
     }
-
 }

+ 10 - 5
xxl-job-core/src/main/java/com/xxl/job/core/router/thread/TriggerCallbackThread.java

@@ -23,11 +23,16 @@ public class TriggerCallbackThread {
                     try {
                         RequestModel callback = callBackQueue.take();
                         if (callback != null) {
-                            try {
-                                ResponseModel responseModel = XxlJobNetCommUtil.postHex(XxlJobNetCommUtil.addressToUrl(callback.getLogAddress()), callback);
-                                logger.info(">>>>>>>>>>> xxl-job callback , RequestModel:{}, ResponseModel:{}", new Object[]{callback.toString(), responseModel.toString()});
-                            } catch (Exception e) {
-                                logger.info("JobThread Exception:", e);
+                            for (String address : callback.getLogAddress()) {
+                                try {
+                                    ResponseModel responseModel = XxlJobNetCommUtil.postHex(XxlJobNetCommUtil.addressToUrl(address), callback);
+                                    logger.info(">>>>>>>>>>> xxl-job callback , RequestModel:{}, ResponseModel:{}", new Object[]{callback.toString(), responseModel.toString()});
+                                    if (ResponseModel.SUCCESS.equals(responseModel.getStatus())) {
+                                        break;
+                                    }
+                                } catch (Exception e) {
+                                    logger.info("JobThread Exception:", e);
+                                }
                             }
                         }
                     } catch (Exception e) {