Sfoglia il codice sorgente

ClearExpireDataJobHandler

deason 11 mesi fa
parent
commit
4915deacff

+ 138 - 0
examcloud-core-oe-task-service/src/main/java/cn/com/qmth/examcloud/core/oe/task/service/job/ClearExpireDataJobHandler.java

@@ -0,0 +1,138 @@
+package cn.com.qmth.examcloud.core.oe.task.service.job;
+
+import cn.com.qmth.examcloud.support.CacheConstants;
+import cn.com.qmth.examcloud.web.exception.SequenceLockException;
+import cn.com.qmth.examcloud.web.helpers.SequenceLockHelper;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.jdbc.core.BatchPreparedStatementSetter;
+import org.springframework.jdbc.core.JdbcTemplate;
+import org.springframework.stereotype.Component;
+
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.text.SimpleDateFormat;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.List;
+
+/**
+ * 清理过期数据任务
+ */
+@Component
+public class ClearExpireDataJobHandler {
+
+    private static final Logger log = LoggerFactory.getLogger(ClearExpireDataJobHandler.class);
+
+    @Autowired
+    private JdbcTemplate jdbcTemplate;
+
+    public void run(int shardTotal, int shardIndex, String jobParam) throws Exception {
+        final String lockKey = CacheConstants.LOCK_EXAM_DATA_CLEAN;
+
+        try {
+            SequenceLockHelper.getLockSimple(lockKey);
+
+            this.handler(jobParam);
+        } catch (Exception e) {
+            if (e instanceof InterruptedException) {
+                // 若线程终止,则抛出交由任务调度中心处理
+                log.warn("当前任务线程被终止!error:{}", e.getMessage());
+                throw e;
+            } else if (e instanceof SequenceLockException) {
+                // 若锁问题,下次会继续执行
+                log.warn("当前任务获取锁失败!redisKey:{}", lockKey);
+            } else {
+                // 若异常,下次会继续执行(需要排查原因)
+                log.error("当前任务处理失败!error:{}", e.getMessage());
+            }
+        } finally {
+            SequenceLockHelper.releaseLockSimple(lockKey);
+        }
+    }
+
+    private void handler(String jobParam) {
+        int days = 0;
+        if (StringUtils.isNotBlank(jobParam)) {
+            days = Integer.parseInt(jobParam.trim());
+        }
+        // 默认:最少30天
+        days = Math.max(days, 30);
+
+        Calendar c = Calendar.getInstance();
+        c.setTime(new Date());
+        c.add(Calendar.DATE, -days);
+        String expireTime = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(c.getTime());
+
+        // 获取总数
+        final String totalSql = "select count(1) from ec_oes_exam_record_data where sync_status = 'SYNCED' and creation_time <= '%s'";
+        Long totalSize = jdbcTemplate.queryForObject(String.format(totalSql, expireTime), Long.class);
+        log.warn("清理过期数据任务!jobParam:{} expireTime:{} totalSize:{}", jobParam, expireTime, totalSize);
+        if (totalSize == null || totalSize == 0) {
+            return;
+        }
+
+        long startId = 0L, finishCount = 0L;
+        final String querySql = new StringBuilder()
+                .append(" select id from ec_oes_exam_record_data")
+                .append(" where sync_status = 'SYNCED'")
+                .append(" and id > %s")
+                .append(" and creation_time <= '").append(expireTime).append("'")
+                .append(" order by id asc")
+                .append(" limit 5000")
+                .toString();
+
+        long startTime = System.currentTimeMillis();
+        while (true) {
+            List<Long> tempIds = jdbcTemplate.queryForList(String.format(querySql, startId), Long.class);
+            if (CollectionUtils.isEmpty(tempIds)) {
+                break;
+            }
+
+            this.doClear(tempIds);
+
+            finishCount += tempIds.size();
+            float finishRate = finishCount * 100f / totalSize;
+            long cost = (System.currentTimeMillis() - startTime) / 1000L;
+            log.warn("totalSize:{} finishCount:{} finishRate:{}% startId:{} cost:{}s", totalSize, finishCount, finishRate, startId, cost);
+            startId = tempIds.get(tempIds.size() - 1);
+        }
+    }
+
+    private void doClear(List<Long> tempIds) {
+        // this.batchDelete("delete from ec_oes_exam_face_biopsy where exam_record_data_id in(?)", tempIds);
+        // this.batchDelete("delete from ec_oes_exam_face_biopsy_item where exam_record_data_id in(?)", tempIds);
+        // this.batchDelete("delete from ec_oes_exam_face_biopsy_item_step where exam_record_data_id in(?)", tempIds);
+        this.batchDelete("delete from ec_oes_exam_face_liveness_verify where exam_record_data_id in(?)", tempIds);
+        this.batchDelete("delete from ec_oes_exam_face_live_verify where exam_record_data_id in(?)", tempIds);
+        this.batchDelete("delete from ec_oet_exam_capture where exam_record_data_id in(?)", tempIds);
+        this.batchDelete("delete from ec_oet_exam_sync_capture where exam_record_data_id in(?)", tempIds);
+
+        this.batchDelete("delete from ec_oes_exam_continued_record where exam_record_data_id in(?)", tempIds);
+        this.batchDelete("delete from ec_oes_exam_process_record where exam_record_data_id in(?)", tempIds);
+        this.batchDelete("delete from ec_oe_exam_record_data_sync where cache_id in(?)", tempIds);
+
+        this.batchDelete("delete from ec_oes_exam_record_data where id in(?)", tempIds);
+    }
+
+    private void batchDelete(String deleteSql, List<Long> tempIds) {
+        long start = System.currentTimeMillis();
+        jdbcTemplate.batchUpdate(deleteSql, new BatchPreparedStatementSetter() {
+            @Override
+            public void setValues(PreparedStatement ps, int n) throws SQLException {
+                ps.setLong(1, tempIds.get(n));
+            }
+
+            @Override
+            public int getBatchSize() {
+                return tempIds.size();
+            }
+        });
+
+        log.debug("{} cost:{}ms", deleteSql, System.currentTimeMillis() - start);
+    }
+
+}

+ 17 - 5
examcloud-core-oe-task-starter/src/main/java/cn/com/qmth/examcloud/core/oe/task/starter/config/OeTaskExecutor.java

@@ -1,9 +1,6 @@
 package cn.com.qmth.examcloud.core.oe.task.starter.config;
 
-import cn.com.qmth.examcloud.core.oe.task.service.job.AfterHandInExamJobHandler;
-import cn.com.qmth.examcloud.core.oe.task.service.job.BeforeHandInExamJobHandler;
-import cn.com.qmth.examcloud.core.oe.task.service.job.FaceVerifyJobHandler;
-import cn.com.qmth.examcloud.core.oe.task.service.job.SyncExamRecordDataJobHandler;
+import cn.com.qmth.examcloud.core.oe.task.service.job.*;
 import com.xxl.job.core.context.XxlJobHelper;
 import com.xxl.job.core.handler.annotation.XxlJob;
 import org.slf4j.Logger;
@@ -31,6 +28,9 @@ public class OeTaskExecutor {
     @Autowired
     private FaceVerifyJobHandler faceVerifyJobHandler;
 
+    @Autowired
+    private ClearExpireDataJobHandler clearExpireDataJobHandler;
+
     /**
      * 1、处理交卷前考试数据任务
      */
@@ -75,8 +75,20 @@ public class OeTaskExecutor {
         int shardIndex = XxlJobHelper.getShardIndex();
         int shardTotal = XxlJobHelper.getShardTotal();
         String jobParam = XxlJobHelper.getJobParam();
-        XxlJobHelper.log("shardTotal:{}, shardIndex:{}", shardTotal, shardIndex);
+        XxlJobHelper.log("shardTotal:{}, shardIndex:{}, jobParam:{}", shardTotal, shardIndex, jobParam);
         faceVerifyJobHandler.run(shardTotal, shardIndex, jobParam);
     }
 
+    /**
+     * 清理过期数据任务
+     */
+    @XxlJob("clearExpireDataJobHandler")
+    public void clearExpireDataJobHandler() throws Exception {
+        int shardIndex = XxlJobHelper.getShardIndex();
+        int shardTotal = XxlJobHelper.getShardTotal();
+        String jobParam = XxlJobHelper.getJobParam();
+        XxlJobHelper.log("shardTotal:{}, shardIndex:{}, jobParam:{}", shardTotal, shardIndex, jobParam);
+        clearExpireDataJobHandler.run(shardTotal, shardIndex, jobParam);
+    }
+
 }