Browse Source

重构通讯逻辑

xueli.xue 9 years ago
parent
commit
c1363c7567

+ 1 - 1
README.md

@@ -49,7 +49,7 @@ git.osc地址:http://git.oschina.net/xuxueli0323/xxl-job
 
 # 新版本 V1.3.x,新特性
 	1、遗弃“本地任务”模式,推荐使用“远程任务”,易于系统解耦,任务对应的JobHander统称为“执行器”;
-	2、遗弃“servlet”方式底层系统通讯,推荐使用JETTY方式,重构通讯逻辑;
+	2、遗弃“servlet”方式底层系统通讯,推荐使用JETTY方式,调度+回调双向通讯,重构通讯逻辑;
 	3、UI交互优化:左侧菜单展开状态优化,菜单项选中状态优化,任务列表打开表格有压缩优化;
 	4、【重要】“执行器”细分为:BEAN、GLUE两种开发模式,简介见下文:
 	

+ 2 - 3
xxl-job-admin/src/main/java/com/xxl/job/controller/JobLogController.java

@@ -19,7 +19,6 @@ import org.springframework.web.bind.annotation.ResponseBody;
 import com.xxl.job.client.handler.HandlerRepository;
 import com.xxl.job.client.util.HttpUtil;
 import com.xxl.job.client.util.HttpUtil.RemoteCallBack;
-import com.xxl.job.controller.annotation.PermessionLimit;
 import com.xxl.job.client.util.JacksonUtil;
 import com.xxl.job.core.constant.Constants.JobGroupEnum;
 import com.xxl.job.core.model.ReturnT;
@@ -76,7 +75,7 @@ public class JobLogController {
 		return maps;
 	}
 	
-	@RequestMapping("/save")
+	/*@RequestMapping("/save")
 	@ResponseBody
 	@PermessionLimit(limit=false)
 	public RemoteCallBack triggerLog(int trigger_log_id, String status, String msg) {
@@ -92,7 +91,7 @@ public class JobLogController {
 			return callBack;
 		}
 		return callBack;
-	}
+	}*/
 	
 	@RequestMapping("/logDetail")
 	@ResponseBody

+ 62 - 0
xxl-job-admin/src/main/java/com/xxl/job/core/callback/XxlJobCallbackServer.java

@@ -0,0 +1,62 @@
+package com.xxl.job.core.callback;
+
+import org.eclipse.jetty.server.Connector;
+import org.eclipse.jetty.server.Handler;
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.server.handler.HandlerCollection;
+import org.eclipse.jetty.server.nio.SelectChannelConnector;
+import org.eclipse.jetty.util.thread.ExecutorThreadPool;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.xxl.job.client.util.IpUtil;
+
+/**
+ * Created by xuxueli on 2016-5-22 11:15:42
+ */
+public class XxlJobCallbackServer {
+    private static final Logger logger = LoggerFactory.getLogger(XxlJobCallbackServer.class);
+
+    private static String trigger_log_address;
+    public static String getTrigger_log_address() {
+		return trigger_log_address;
+	}
+    
+    public void start(int callBackPort) throws Exception {
+    	// init address
+    	
+    	String ip = IpUtil.getIp();
+    	trigger_log_address = ip.concat(":").concat(String.valueOf(callBackPort));
+		
+    	final int port = Integer.valueOf(callBackPort);
+        new Thread(new Runnable() {
+            @Override
+            public void run() {
+                Server server = new Server();
+                server.setThreadPool(new ExecutorThreadPool(200, 200, 30000));	// 非阻塞
+
+                // connector
+                SelectChannelConnector connector = new SelectChannelConnector();
+                connector.setPort(port);
+                connector.setMaxIdleTime(30000);
+                server.setConnectors(new Connector[] { connector });
+
+                // handler
+                HandlerCollection handlerc =new HandlerCollection();
+                handlerc.setHandlers(new Handler[]{new XxlJobCallbackServerHandler()});
+                server.setHandler(handlerc);
+
+                try {
+                    server.start();
+                    logger.info(">>>>>>>>>>>> xxl-job XxlJobCallbackServer start success at port:{}.", port);
+                    server.join();  // block until server ready
+                    logger.info(">>>>>>>>>>>> xxl-job XxlJobCallbackServer join success at port:{}.", port);
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+        }).start();
+
+    }
+
+}

+ 57 - 0
xxl-job-admin/src/main/java/com/xxl/job/core/callback/XxlJobCallbackServerHandler.java

@@ -0,0 +1,57 @@
+package com.xxl.job.core.callback;
+
+import java.io.IOException;
+import java.util.Date;
+
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.commons.lang.StringUtils;
+import org.eclipse.jetty.server.Request;
+import org.eclipse.jetty.server.handler.AbstractHandler;
+
+import com.xxl.job.client.util.HttpUtil.RemoteCallBack;
+import com.xxl.job.client.util.JacksonUtil;
+import com.xxl.job.core.model.XxlJobLog;
+import com.xxl.job.core.util.DynamicSchedulerUtil;
+
+/**
+ * Created by xuxueli on 2016-5-22 11:15:42
+ */
+public class XxlJobCallbackServerHandler extends AbstractHandler {
+
+	@Override
+	public void handle(String s, Request baseRequest, HttpServletRequest httpServletRequest, HttpServletResponse httpServletResponse) throws IOException, ServletException {
+
+		httpServletRequest.setCharacterEncoding("UTF-8");
+		httpServletResponse.setCharacterEncoding("UTF-8");
+
+		// parse param
+		String trigger_log_id = httpServletRequest.getParameter("trigger_log_id");
+		String status = httpServletRequest.getParameter("status");
+		String msg = httpServletRequest.getParameter("msg");
+		
+		// process
+		RemoteCallBack callBack = new RemoteCallBack();
+		callBack.setStatus(RemoteCallBack.FAIL);
+		if (StringUtils.isNumeric(trigger_log_id) && StringUtils.isNotBlank(status)) {
+			XxlJobLog log = DynamicSchedulerUtil.xxlJobLogDao.load(Integer.valueOf(trigger_log_id));
+			if (log!=null) {
+				log.setHandleTime(new Date());
+				log.setHandleStatus(status);
+				log.setHandleMsg(msg);
+				DynamicSchedulerUtil.xxlJobLogDao.updateHandleInfo(log);
+				callBack.setStatus(RemoteCallBack.SUCCESS);
+			}
+		}
+		String resp = JacksonUtil.writeValueAsString(callBack);
+
+		// response
+		httpServletResponse.setContentType("text/html;charset=utf-8");
+		httpServletResponse.setStatus(HttpServletResponse.SC_OK);
+		baseRequest.setHandled(true);
+		httpServletResponse.getWriter().println(resp);
+	}
+
+}

+ 17 - 0
xxl-job-admin/src/main/java/com/xxl/job/core/util/DynamicSchedulerUtil.java

@@ -32,6 +32,7 @@ import org.springframework.context.ApplicationContextAware;
 import org.springframework.util.Assert;
 
 import com.xxl.job.client.util.JacksonUtil;
+import com.xxl.job.core.callback.XxlJobCallbackServer;
 import com.xxl.job.core.model.XxlJobInfo;
 import com.xxl.job.dao.IXxlJobInfoDao;
 import com.xxl.job.dao.IXxlJobLogDao;
@@ -49,6 +50,22 @@ public final class DynamicSchedulerUtil implements ApplicationContextAware, Init
 		DynamicSchedulerUtil.scheduler = scheduler;
 	}
     
+    // trigger callback port
+    private int callBackPort = 8888;
+    public void setCallBackPort(int callBackPort) {
+		this.callBackPort = callBackPort;
+	}
+    
+    // init
+    public void init(){
+    	try {
+    		// start callback server
+			new XxlJobCallbackServer().start(callBackPort);
+		} catch (Exception e) {
+			e.printStackTrace();
+		}
+    }
+    
     // xxlJobLogDao、xxlJobInfoDao
     public static IXxlJobLogDao xxlJobLogDao;
     public static IXxlJobInfoDao xxlJobInfoDao;

+ 3 - 6
xxl-job-admin/src/main/java/com/xxl/job/service/job/RemoteHttpJobBean.java

@@ -16,11 +16,11 @@ import com.xxl.job.client.handler.HandlerRepository;
 import com.xxl.job.client.util.HttpUtil;
 import com.xxl.job.client.util.HttpUtil.RemoteCallBack;
 import com.xxl.job.client.util.JacksonUtil;
+import com.xxl.job.core.callback.XxlJobCallbackServer;
 import com.xxl.job.core.model.XxlJobInfo;
 import com.xxl.job.core.model.XxlJobLog;
 import com.xxl.job.core.thread.JobMonitorHelper;
 import com.xxl.job.core.util.DynamicSchedulerUtil;
-import com.xxl.job.core.util.PropertiesUtil;
 
 /**
  * http job bean
@@ -58,8 +58,8 @@ public class RemoteHttpJobBean extends QuartzJobBean {
 		params.put(HandlerRepository.TRIGGER_TIMESTAMP, String.valueOf(System.currentTimeMillis()));
 		params.put(HandlerRepository.NAMESPACE, HandlerRepository.NameSpaceEnum.RUN.name());
 		
-		params.put(HandlerRepository.TRIGGER_LOG_URL, PropertiesUtil.getString(HandlerRepository.TRIGGER_LOG_URL));
 		params.put(HandlerRepository.TRIGGER_LOG_ID, String.valueOf(jobLog.getId()));
+		params.put(HandlerRepository.TRIGGER_LOG_ADDRESS, XxlJobCallbackServer.getTrigger_log_address());
 		
 		params.put(HandlerRepository.HANDLER_NAME, jobDataMap.get(HandlerRepository.HANDLER_NAME));
 		params.put(HandlerRepository.HANDLER_PARAMS, jobDataMap.get(HandlerRepository.HANDLER_PARAMS));
@@ -71,11 +71,8 @@ public class RemoteHttpJobBean extends QuartzJobBean {
 
 		// handler address, jetty (servlet dead)
 		String handler_address = jobDataMap.get(HandlerRepository.HANDLER_ADDRESS);
-		if (!handler_address.startsWith("http")){
-			handler_address = "http://" + handler_address + "/";
-		}
 
-		RemoteCallBack callback = HttpUtil.post(handler_address, params);
+		RemoteCallBack callback = HttpUtil.post(HttpUtil.addressToUrl(handler_address), params);
 		logger.info(">>>>>>>>>>> xxl-job trigger http response, jobLog.id:{}, jobLog:{}, callback:{}", jobLog.getId(), jobLog, callback);
 
 		// update trigger info

+ 2 - 1
xxl-job-admin/src/main/resources/applicationcontext-trigger-db.xml

@@ -18,9 +18,10 @@
 	</bean>
 	
 	<!-- 协同-调度器 -->
-	<bean id="dynamicSchedulerUtil" class="com.xxl.job.core.util.DynamicSchedulerUtil">
+	<bean id="dynamicSchedulerUtil" class="com.xxl.job.core.util.DynamicSchedulerUtil" init-method="init">
 		<!-- (轻易不要变更“调度器名称”, 任务创建时会绑定该“调度器名称”) -->
         <property name="scheduler" ref="quartzScheduler"/>
+        <property name="callBackPort" value="8888"/>
     </bean>
 
 </beans>

+ 0 - 3
xxl-job-admin/src/main/resources/config.properties

@@ -1,6 +1,3 @@
-# for trigger log callback
-trigger_log_url=http://localhost:8080/xxl-job-admin/joblog/save
-
 # for email
 mail.host=smtp.163.com
 mail.port=25

+ 1 - 1
xxl-job-client/src/main/java/com/xxl/job/client/handler/HandlerRepository.java

@@ -31,7 +31,7 @@ public class HandlerRepository {
 	public static final String HANDLER_JOB_NAME = "handler_job_name";
 	
 	public static final String TRIGGER_LOG_ID = "trigger_log_id";
-	public static final String TRIGGER_LOG_URL = "trigger_log_url";
+	public static final String TRIGGER_LOG_ADDRESS = "trigger_log_address";
 	public static final String TRIGGER_TIMESTAMP = "trigger_timestamp";
 	
 	public static ConcurrentHashMap<String, HandlerThread> handlerTreadMap = new ConcurrentHashMap<String, HandlerThread>();

+ 2 - 2
xxl-job-client/src/main/java/com/xxl/job/client/handler/HandlerThread.java

@@ -60,7 +60,7 @@ public class HandlerThread extends Thread{
 				Map<String, String> handlerData = handlerDataQueue.poll();
 				if (handlerData!=null) {
 					i= 0;
-					String trigger_log_url = handlerData.get(HandlerRepository.TRIGGER_LOG_URL);
+					String trigger_log_address = handlerData.get(HandlerRepository.TRIGGER_LOG_ADDRESS);
 					String trigger_log_id = handlerData.get(HandlerRepository.TRIGGER_LOG_ID);
 					String handler_params = handlerData.get(HandlerRepository.HANDLER_PARAMS);
 					logIdSet.remove(trigger_log_id);
@@ -97,7 +97,7 @@ public class HandlerThread extends Thread{
 					RemoteCallBack callback = null;
 					logger.info(">>>>>>>>>>> xxl-job callback start.");
 					try {
-						callback = HttpUtil.post(trigger_log_url, params);
+						callback = HttpUtil.post(HttpUtil.addressToUrl(trigger_log_address), params);
 					} catch (Exception e) {
 						logger.info("HandlerThread Exception:", e);
 					}

+ 10 - 0
xxl-job-client/src/main/java/com/xxl/job/client/util/HttpUtil.java

@@ -116,4 +116,14 @@ public class HttpUtil {
 		
 		return callback;
 	}
+	
+	/**
+	 * parse address ip:port to url http://.../ 
+	 * @param address
+	 * @return
+	 */
+	public static String addressToUrl(String address){
+		return "http://" + address + "/";
+	}
+	
 }

+ 50 - 0
xxl-job-client/src/main/java/com/xxl/job/client/util/IpUtil.java

@@ -0,0 +1,50 @@
+package com.xxl.job.client.util;
+
+import java.net.InetAddress;
+import java.net.NetworkInterface;
+import java.net.UnknownHostException;
+import java.util.Enumeration;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * get ip
+ * @author xuxueli 2016-5-22 11:38:05
+ */
+public class IpUtil {
+	private static final Logger logger = LoggerFactory.getLogger(IpUtil.class);
+
+	/**
+	 * 获取本机ip
+	 * @return
+	 */
+	public static String getIp() {
+		try {
+			Enumeration<NetworkInterface> interfaces = NetworkInterface.getNetworkInterfaces();
+			InetAddress address = null;
+			while (interfaces.hasMoreElements()) {
+				NetworkInterface ni = interfaces.nextElement();
+				Enumeration<InetAddress> addresses = ni.getInetAddresses();
+				while (addresses.hasMoreElements()) {
+					address = addresses.nextElement();
+					if (!address.isLoopbackAddress() && address.getHostAddress().indexOf(":") == -1) {
+						return address.getHostAddress();
+					}
+				}
+			}
+			logger.info("xxl job getHostAddress fail");
+			return null;
+		} catch (Throwable t) {
+			logger.error("xxl job getHostAddress error, {}", t);
+			return null;
+		}
+	}
+
+	public static void main(String[] args) throws UnknownHostException {
+		System.out.println(InetAddress.getLocalHost().getCanonicalHostName());
+		System.out.println(InetAddress.getLocalHost().getHostName());
+		System.out.println(getIp());
+	}
+
+}