Browse Source

任务注册功能:
1、设计注册表结构;
2、执行器,开发注册线程,15s注册一次,以 "执行器 + 地址" 为粒度进行注册刷新";

xueli.xue 8 years ago
parent
commit
1fd922f95b

+ 8 - 1
db/tables_xxl_job.sql

@@ -193,7 +193,14 @@ CREATE TABLE XXL_JOB_QRTZ_TRIGGER_LOGGLUE (
   PRIMARY KEY (`id`)
 ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
 
-
+CREATE TABLE XXL_JOB_QRTZ_TRIGGER_REGISTRY (
+  `id` int(11) NOT NULL AUTO_INCREMENT,
+  `registry_group` varchar(255) NOT NULL,
+  `registry_key` varchar(255) NOT NULL,
+  `registry_value` varchar(255) NOT NULL,
+  `update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,
+  PRIMARY KEY (`id`)
+) ENGINE=InnoDB DEFAULT CHARSET=utf8;
 
 commit;
 

+ 40 - 5
xxl-job-core/src/main/java/com/xxl/job/core/executor/jetty/XxlJobExecutor.java

@@ -1,8 +1,10 @@
 package com.xxl.job.core.executor.jetty;
 
-import com.xxl.job.core.router.HandlerRouter;
 import com.xxl.job.core.handler.IJobHandler;
 import com.xxl.job.core.handler.annotation.JobHander;
+import com.xxl.job.core.registry.RegistHelper;
+import com.xxl.job.core.router.HandlerRouter;
+import com.xxl.job.core.util.IpUtil;
 import org.eclipse.jetty.server.Connector;
 import org.eclipse.jetty.server.Handler;
 import org.eclipse.jetty.server.Server;
@@ -16,6 +18,7 @@ import org.springframework.context.ApplicationContext;
 import org.springframework.context.ApplicationContextAware;
 
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 
 /**
  * Created by xuxueli on 2016/3/2 21:14.
@@ -24,15 +27,23 @@ public class XxlJobExecutor implements ApplicationContextAware {
     private static final Logger logger = LoggerFactory.getLogger(XxlJobExecutor.class);
 
     private int port = 9999;
+    private String appName;
+    private RegistHelper registHelper;
     public void setPort(int port) {
         this.port = port;
     }
+    public void setAppName(String appName) {
+        this.appName = appName;
+    }
+    public void setRegistHelper(RegistHelper registHelper) {
+        this.registHelper = registHelper;
+    }
 
     // ---------------------------------- job server ------------------------------------
     Server server = null;
     public void start() throws Exception {
 
-        new Thread(new Runnable() {
+        Thread executorTnread = new Thread(new Runnable() {
             @Override
             public void run() {
                 server = new Server();
@@ -52,14 +63,16 @@ public class XxlJobExecutor implements ApplicationContextAware {
                 try {
                     server.start();
                     logger.info(">>>>>>>>>>>> xxl-job jetty server start success at port:{}.", port);
-                    server.join();  // block until server ready
+                    registryBeat();
+                    server.join();  // block until thread stopped
                     logger.info(">>>>>>>>>>>> xxl-job jetty server join success at port:{}.", port);
                 } catch (Exception e) {
                     e.printStackTrace();
                 }
             }
-        }).start();
-
+        });
+        executorTnread.setDaemon(true); // daemon, service jvm, user thread leave >>> daemon leave >>> jvm leave
+        executorTnread.start();
     }
     
     public void destroy(){
@@ -72,6 +85,28 @@ public class XxlJobExecutor implements ApplicationContextAware {
 		}
     }
 
+    private void registryBeat(){
+        if (registHelper==null && appName==null || appName.trim().length()==0) {
+            return;
+        }
+        Thread registryThread = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                while (true) {
+                    try {
+                        String address = IpUtil.getIp().concat(":").concat(String.valueOf(port));
+                        registHelper.registry(RegistHelper.RegistType.EXECUTOR.name(), appName, address);
+                        TimeUnit.SECONDS.sleep(15);
+                    } catch (Exception e) {
+                        e.printStackTrace();
+                    }
+                }
+            }
+        });
+        registryThread.setDaemon(true);
+        registryThread.start();
+    }
+
     // ---------------------------------- init job handler ------------------------------------
     public static ApplicationContext applicationContext;
 	@Override

+ 12 - 0
xxl-job-core/src/main/java/com/xxl/job/core/registry/RegistHelper.java

@@ -0,0 +1,12 @@
+package com.xxl.job.core.registry;
+
+/**
+ * Created by xuxueli on 16/9/30.
+ */
+public interface RegistHelper {
+
+    public enum RegistType{ EXECUTOR, ADMIN }
+
+    public int registry(String registGroup, String registryKey, String registryValue);
+
+}

+ 28 - 0
xxl-job-core/src/main/java/com/xxl/job/core/registry/impl/DbRegistHelper.java

@@ -0,0 +1,28 @@
+package com.xxl.job.core.registry.impl;
+
+import com.xxl.job.core.registry.RegistHelper;
+import com.xxl.job.core.util.DBUtil;
+
+import javax.sql.DataSource;
+
+/**
+ * Created by xuxueli on 16/9/30.
+ */
+public class DbRegistHelper implements RegistHelper {
+
+    private DataSource dataSource;
+    public void setDataSource(DataSource dataSource) {
+        this.dataSource = dataSource;
+    }
+
+    @Override
+    public int registry(String registGroup, String registryKey, String registryValue) {
+        String updateSql = "UPDATE XXL_JOB_QRTZ_TRIGGER_REGISTRY SET `update_time` = NOW() WHERE `registry_group` = ? AND `registry_key` = ? AND `registry_value` = ?";
+        String insertSql = "INSERT INTO XXL_JOB_QRTZ_TRIGGER_REGISTRY( `registry_group` , `registry_key` , `registry_value`, `update_time`) VALUES(? , ? , ?, NOW())";
+        int ret = DBUtil.update(dataSource, updateSql, new Object[]{registGroup, registryKey, registryValue});
+        if (ret<1) {
+            ret = DBUtil.update(dataSource, insertSql, new Object[]{registGroup, registryKey, registryValue});
+        }
+        return ret;
+    }
+}

+ 10 - 1
xxl-job-executor-example/src/main/resources/applicationcontext-xxl-job.xml

@@ -16,6 +16,15 @@
 	<bean id="xxlJobExecutor" class="com.xxl.job.core.executor.jetty.XxlJobExecutor" init-method="start" destroy-method="destroy" >
 		<!-- 执行器端口号 -->
 		<property name="port" value="9999" />
+        <property name="appName" value="xxl-job-executor-example" />
+        <!-- 执行器注册器,默认使用系统提供的 "DbRegistHelper", 推荐将其改为公共的RPC服务 -->
+        <property name="registHelper" >
+            <!-- DbRegistHelper, 依赖 "XXL-JOB公共数据源" -->
+            <bean class="com.xxl.job.core.registry.impl.DbRegistHelper" >
+                <!-- XXL-JOB公共数据源 -->
+                <property name="dataSource" ref="dataSource" />
+            </bean>
+        </property>
 	</bean>
 
     <!-- ********************************* "GlueFactory" 配置, 仅在启动 "GLUE模式任务" 时才需要, 否则可删除 ********************************* -->
@@ -34,7 +43,7 @@
 		</property>
 	</bean>
 
-    <!-- ********************************* "XXL-JOB公共数据源" 配置, 仅在启动 "GLUE模式任务" 的 "DbGlueLoader" 时才需要, 否则可删除 ********************************* -->
+    <!-- ********************************* "XXL-JOB公共数据源" 配置, 仅在启动 "DbRegistHelper" 或 "DbGlueLoader" 时才需要, 否则可删除 ********************************* -->
 
 	<!-- 配置04、XXL-JOB公共数据源 -->
 	<bean id="dataSource" class="com.mchange.v2.c3p0.ComboPooledDataSource"  destroy-method="close">