Ver Fonte

任务告警逻辑调整,改为通过扫描失败日志方式触发。一方面精确扫描失败任务,降低扫描范围;另一方面取消内存队列,降低线程内存消耗;

xuxueli há 6 anos atrás
pai
commit
1aa17c9f9a

+ 1 - 1
doc/XXL-JOB官方文档.md

@@ -1389,7 +1389,7 @@ Tips: 历史版本(V1.3.x)目前已经Release至稳定版本, 进入维护阶段
 
 ### 6.24 版本 v2.0.2 Release Notes[迭代中]
 - 1、调度中心告警邮件发送组件改为 “spring-boot-starter-mail”;
-- 2、[迭代中]任务告警逻辑调整:任务调度,以及任务回调失败时,均推送监控队列。考虑通过任务Log字段控制告警状态
+- 2、任务告警逻辑调整,改为通过扫描失败日志方式触发。一方面精确扫描失败任务,降低扫描范围;另一方面取消内存队列,降低线程内存消耗
 
 
 ### TODO LIST

+ 1 - 0
doc/db/tables_xxl_job.sql

@@ -187,6 +187,7 @@ CREATE TABLE `XXL_JOB_QRTZ_TRIGGER_LOG` (
   `handle_time` datetime DEFAULT NULL COMMENT '执行-时间',
   `handle_code` int(11) NOT NULL COMMENT '执行-状态',
   `handle_msg` text COMMENT '执行-日志',
+  `alarm_status` tinyint(4) NOT NULL DEFAULT '0' COMMENT '告警状态:0-默认、1-无需告警、2-告警成功、3-告警失败',
   PRIMARY KEY (`id`),
   KEY `I_trigger_time` (`trigger_time`)
 ) ENGINE=InnoDB DEFAULT CHARSET=utf8;

+ 12 - 0
xxl-job-admin/src/main/java/com/xxl/job/admin/core/model/XxlJobLog.java

@@ -31,6 +31,9 @@ public class XxlJobLog {
 	private int handleCode;
 	private String handleMsg;
 
+	// alarm info
+	private int alarmStatus;
+
 	public int getId() {
 		return id;
 	}
@@ -142,4 +145,13 @@ public class XxlJobLog {
 	public void setHandleMsg(String handleMsg) {
 		this.handleMsg = handleMsg;
 	}
+
+	public int getAlarmStatus() {
+		return alarmStatus;
+	}
+
+	public void setAlarmStatus(int alarmStatus) {
+		this.alarmStatus = alarmStatus;
+	}
+
 }

+ 44 - 65
xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobFailMonitorHelper.java

@@ -7,7 +7,6 @@ import com.xxl.job.admin.core.model.XxlJobLog;
 import com.xxl.job.admin.core.trigger.TriggerTypeEnum;
 import com.xxl.job.admin.core.util.I18nUtil;
 import com.xxl.job.core.biz.model.ReturnT;
-import com.xxl.job.core.handler.IJobHandler;
 import org.apache.commons.collections4.CollectionUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -17,12 +16,15 @@ import javax.mail.MessagingException;
 import javax.mail.internet.MimeMessage;
 import java.io.UnsupportedEncodingException;
 import java.text.MessageFormat;
-import java.util.*;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
 /**
  * job monitor instance
+ *
  * @author xuxueli 2015-9-1 18:05:56
  */
 public class JobFailMonitorHelper {
@@ -35,8 +37,6 @@ public class JobFailMonitorHelper {
 
 	// ---------------------- monitor ----------------------
 
-	private LinkedBlockingQueue<Integer> queue = new LinkedBlockingQueue<Integer>(0xfff8);
-
 	private Thread monitorThread;
 	private volatile boolean toStop = false;
 	public void start(){
@@ -44,52 +44,47 @@ public class JobFailMonitorHelper {
 
 			@Override
 			public void run() {
+
 				// monitor
 				while (!toStop) {
 					try {
-						List<Integer> jobLogIdList = new ArrayList<Integer>();
-						int drainToNum = JobFailMonitorHelper.instance.queue.drainTo(jobLogIdList);
 
-						if (CollectionUtils.isNotEmpty(jobLogIdList)) {
-							for (Integer jobLogId : jobLogIdList) {
-								if (jobLogId==null || jobLogId==0) {
+						List<Integer> failLogIds = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findFailJobLogIds(1000);
+						if (CollectionUtils.isNotEmpty(failLogIds)) {
+							for (int failLogId: failLogIds) {
+
+								// lock log
+								int lockRet = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateAlarmStatus(failLogId, 0, -1);
+								if (lockRet < 1) {
 									continue;
 								}
-								XxlJobLog log = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().load(jobLogId);
-								if (log == null) {
-									continue;
+								XxlJobLog log = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().load(failLogId);
+								XxlJobInfo info = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(log.getJobId());
+
+								// 1、fail retry monitor
+								if (log.getExecutorFailRetryCount() > 0) {
+									JobTriggerPoolHelper.trigger(log.getJobId(), TriggerTypeEnum.RETRY, (log.getExecutorFailRetryCount()-1), log.getExecutorShardingParam(), null);
+									String retryMsg = "<br><br><span style=\"color:#F39C12;\" > >>>>>>>>>>>"+ I18nUtil.getString("jobconf_trigger_type_retry") +"<<<<<<<<<<< </span><br>";
+									log.setTriggerMsg(log.getTriggerMsg() + retryMsg);
+									XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateTriggerInfo(log);
 								}
-								if (IJobHandler.SUCCESS.getCode() == log.getTriggerCode() && log.getHandleCode() == 0) {
-									// job running
-									JobFailMonitorHelper.monitor(jobLogId);
-									logger.debug(">>>>>>>>>>> job monitor, job running, JobLogId:{}", jobLogId);
-								} else if (IJobHandler.SUCCESS.getCode() == log.getHandleCode()) {
-									// job success, pass
-									logger.info(">>>>>>>>>>> job monitor, job success, JobLogId:{}", jobLogId);
-								} else /*if (IJobHandler.FAIL.getCode() == log.getTriggerCode()
-										|| IJobHandler.FAIL.getCode() == log.getHandleCode()
-										|| IJobHandler.FAIL_RETRY.getCode() == log.getHandleCode() )*/ {
-
-									// job fail,
-
-									// 1、fail retry
-									XxlJobInfo info = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(log.getJobId());
-
-									if (log.getExecutorFailRetryCount() > 0) {
-										JobTriggerPoolHelper.trigger(log.getJobId(), TriggerTypeEnum.RETRY, (log.getExecutorFailRetryCount()-1), log.getExecutorShardingParam(), null);
-										String retryMsg = "<br><br><span style=\"color:#F39C12;\" > >>>>>>>>>>>"+ I18nUtil.getString("jobconf_trigger_type_retry") +"<<<<<<<<<<< </span><br>";
-										log.setTriggerMsg(log.getTriggerMsg() + retryMsg);
-										XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateTriggerInfo(log);
-									}
 
-									// 2、fail alarm
-									failAlarm(info, log);
+								// 2、fail alarm monitor
+								int newAlarmStatus = 0;		// 告警状态:0-默认、-1=锁定状态、1-无需告警、2-告警成功、3-告警失败
+								if (info!=null && info.getAlarmEmail()!=null && info.getAlarmEmail().trim().length()>0) {
+									boolean alarmResult = true;
+									try {
+										alarmResult = failAlarm(info, log);
+									} catch (Exception e) {
+										alarmResult = false;
+										logger.error(e.getMessage(), e);
+									}
+									newAlarmStatus = alarmResult?2:3;
+								} else {
+									newAlarmStatus = 1;
+								}
 
-									logger.info(">>>>>>>>>>> job monitor, job fail, JobLogId:{}", jobLogId);
-								}/* else {
-									JobFailMonitorHelper.monitor(jobLogId);
-									logger.info(">>>>>>>>>>> job monitor, job status unknown, JobLogId:{}", jobLogId);
-								}*/
+								XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateAlarmStatus(failLogId, -1, newAlarmStatus);
 							}
 						}
 
@@ -99,22 +94,6 @@ public class JobFailMonitorHelper {
 					}
 				}
 
-				// monitor all clear
-				List<Integer> jobLogIdList = new ArrayList<Integer>();
-				int drainToNum = getInstance().queue.drainTo(jobLogIdList);
-				if (jobLogIdList!=null && jobLogIdList.size()>0) {
-					for (Integer jobLogId: jobLogIdList) {
-						XxlJobLog log = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().load(jobLogId);
-						if (ReturnT.FAIL_CODE == log.getTriggerCode()|| ReturnT.FAIL_CODE==log.getHandleCode()) {
-							// job fail,
-							XxlJobInfo info = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(log.getJobId());
-
-							failAlarm(info, log);
-							logger.info(">>>>>>>>>>> job monitor last, job fail, JobLogId:{}", jobLogId);
-						}
-					}
-				}
-
 			}
 		});
 		monitorThread.setDaemon(true);
@@ -131,11 +110,6 @@ public class JobFailMonitorHelper {
 			logger.error(e.getMessage(), e);
 		}
 	}
-	
-	// producer
-	public static void monitor(int jobLogId){
-		getInstance().queue.offer(jobLogId);
-	}
 
 
 	// ---------------------- alarm ----------------------
@@ -168,7 +142,8 @@ public class JobFailMonitorHelper {
 	 *
 	 * @param jobLog
 	 */
-	private void failAlarm(XxlJobInfo info, XxlJobLog jobLog){
+	private boolean failAlarm(XxlJobInfo info, XxlJobLog jobLog){
+		boolean alarmResult = true;
 
 		// send monitor email
 		if (info!=null && info.getAlarmEmail()!=null && info.getAlarmEmail().trim().length()>0) {
@@ -205,8 +180,10 @@ public class JobFailMonitorHelper {
 					helper.setText(content, true);
 
 					XxlJobAdminConfig.getAdminConfig().getMailSender().send(mimeMessage);
-				} catch (UnsupportedEncodingException | MessagingException e) {
+				} catch (Exception e) {
 					logger.error(">>>>>>>>>>> job monitor alarm email send error, JobLogId:{}", jobLog.getId(), e);
+
+					alarmResult = false;
 				}
 
 			}
@@ -214,6 +191,8 @@ public class JobFailMonitorHelper {
 
 		// TODO, custom alarm strategy, such as sms
 
+
+		return alarmResult;
 	}
 
 }

+ 0 - 3
xxl-job-admin/src/main/java/com/xxl/job/admin/core/trigger/XxlJobTrigger.java

@@ -6,7 +6,6 @@ import com.xxl.job.admin.core.model.XxlJobInfo;
 import com.xxl.job.admin.core.model.XxlJobLog;
 import com.xxl.job.admin.core.route.ExecutorRouteStrategyEnum;
 import com.xxl.job.admin.core.schedule.XxlJobDynamicScheduler;
-import com.xxl.job.admin.core.thread.JobFailMonitorHelper;
 import com.xxl.job.admin.core.util.I18nUtil;
 import com.xxl.job.core.biz.ExecutorBiz;
 import com.xxl.job.core.biz.model.ReturnT;
@@ -173,8 +172,6 @@ public class XxlJobTrigger {
         jobLog.setTriggerMsg(triggerMsgSb.toString());
         XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateTriggerInfo(jobLog);
 
-        // 7、monitor trigger
-        JobFailMonitorHelper.monitor(jobLog.getId());
         logger.debug(">>>>>>>>>>> xxl-job trigger end, jobId:{}", jobLog.getId());
     }
 

+ 6 - 0
xxl-job-admin/src/main/java/com/xxl/job/admin/dao/XxlJobLogDao.java

@@ -50,4 +50,10 @@ public interface XxlJobLogDao {
 						@Param("clearBeforeTime") Date clearBeforeTime,
 						@Param("clearBeforeNum") int clearBeforeNum);
 
+	public List<Integer> findFailJobLogIds(@Param("pagesize") int pagesize);
+
+	public int updateAlarmStatus(@Param("logId") int logId,
+								 @Param("oldAlarmStatus") int oldAlarmStatus,
+								 @Param("newAlarmStatus") int newAlarmStatus);
+
 }

+ 23 - 3
xxl-job-admin/src/main/resources/mybatis-mapper/XxlJobLogMapper.xml

@@ -22,7 +22,8 @@
 	    <result column="handle_time" property="handleTime" />
 	    <result column="handle_code" property="handleCode" />
 	    <result column="handle_msg" property="handleMsg" />
-	    
+
+		<result column="alarm_status" property="alarmStatus" />
 	</resultMap>
 
 	<sql id="Base_Column_List">
@@ -39,7 +40,8 @@
 		t.trigger_msg,
 		t.handle_time,
 		t.handle_code,
-		t.handle_msg
+		t.handle_msg,
+		t.alarm_status
 	</sql>
 	
 	<select id="pageList" resultMap="XxlJobLog">
@@ -176,7 +178,7 @@
 		SELECT
 			DATE_FORMAT(trigger_time,'%Y-%m-%d') triggerDay,
 			COUNT(handle_code) triggerDayCount,
-			SUM(CASE WHEN (trigger_code = 200 and handle_code = 0) then 1 else 0 end) as triggerDayCountRunning,
+			SUM(CASE WHEN (trigger_code in (0, 200) and handle_code = 0) then 1 else 0 end) as triggerDayCountRunning,
 			SUM(CASE WHEN handle_code = 200 then 1 else 0 end) as triggerDayCountSuc
 		FROM XXL_JOB_QRTZ_TRIGGER_LOG
 		WHERE trigger_time BETWEEN #{from} and #{to}
@@ -214,5 +216,23 @@
 			</if>
 		</trim>
 	</delete>
+
+	<select id="findFailJobLogIds" resultType="int" >
+		SELECT id FROM `XXL_JOB_QRTZ_TRIGGER_LOG`
+		WHERE !(
+			(trigger_code in (0, 200) and handle_code = 0)
+			OR
+			(handle_code = 200)
+		)
+		AND `alarm_status` = 0
+		ORDER BY id ASC
+	</select>
+
+	<update id="updateAlarmStatus" >
+		UPDATE XXL_JOB_QRTZ_TRIGGER_LOG
+		SET
+			`alarm_status` = #{newAlarmStatus}
+		WHERE `id`= #{logId} AND `alarm_status` = #{oldAlarmStatus}
+	</update>
 	
 </mapper>