xiatian 4 жил өмнө
parent
commit
90383c2bd8

+ 14 - 42
themis-business/src/main/java/com/qmth/themis/business/util/RedisUtil.java

@@ -1,17 +1,17 @@
 package com.qmth.themis.business.util;
 
-import com.qmth.themis.business.constant.SystemConstant;
-import org.springframework.data.redis.core.RedisCallback;
-import org.springframework.data.redis.core.RedisTemplate;
-import org.springframework.stereotype.Component;
-
-import javax.annotation.Resource;
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
+import javax.annotation.Resource;
+
+import org.springframework.data.redis.core.RedisTemplate;
+import org.springframework.stereotype.Component;
+
+import com.qmth.themis.business.constant.SystemConstant;
+
 /**
  * @Description: redis util
  * @Param:
@@ -247,35 +247,13 @@ public class RedisUtil {
      * 分布式锁
      *
      * @param key
-     * @param hashKey
-     * @param time
-     * @param timeUnit
+     * @param timeout SECONDS
      * @return
      */
-    public boolean lock(String key, String hashKey, long time, TimeUnit timeUnit) {
-        String lock = key + hashKey;
-        // 利用lambda表达式
-        return (Boolean) redisTemplate.execute((RedisCallback) connection -> {
-            long expireAt = System.currentTimeMillis() + time + 1;
-            Boolean acquire = connection.setNX(lock.getBytes(), String.valueOf(expireAt).getBytes());
-            if (acquire) {
-                redisTemplate.expire(lock, time, timeUnit);
-                return true;
-            } else {
-                byte[] value = connection.get(lock.getBytes());
-                if (Objects.nonNull(value) && value.length > 0) {
-                    long expireTime = Long.parseLong(new String(value));
-                    // 如果锁已经过期
-                    if (expireTime < System.currentTimeMillis()) {
-                        // 重新加锁,防止死锁
-                        byte[] oldValue = connection.getSet(lock.getBytes(), String.valueOf(System.currentTimeMillis() + time + 1).getBytes());
-                        redisTemplate.expire(lock, time, timeUnit);
-                        return Long.parseLong(new String(oldValue)) < System.currentTimeMillis();
-                    }
-                }
-            }
-            return false;
-        });
+    public boolean lock(String key,long timeout) {
+    	long expireAt = System.currentTimeMillis() + (timeout*1000) + 1;
+    	Boolean b = redisTemplate.opsForValue().setIfAbsent(key, expireAt, timeout, TimeUnit.SECONDS);
+		return b;
     }
 
     /**
@@ -284,14 +262,8 @@ public class RedisUtil {
      * @param key
      * @return
      */
-    public boolean releaseLock(String key) {
-        Object result = redisTemplate.opsForValue().get(key);
-        if (Objects.nonNull(result)) {
-            delete(key);
-            return true;
-        } else {
-            return false;
-        }
+    public void releaseLock(String key) {
+    	redisTemplate.expire(key, 100, TimeUnit.MILLISECONDS);
     }
 
     /**

+ 21 - 16
themis-exam/src/main/java/com/qmth/themis/exam/websocket/WebSocketServer.java

@@ -1,5 +1,24 @@
 package com.qmth.themis.exam.websocket;//package com.qmth.themis.backend.websocket;
 
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+
+import javax.websocket.OnClose;
+import javax.websocket.OnError;
+import javax.websocket.OnMessage;
+import javax.websocket.OnOpen;
+import javax.websocket.Session;
+import javax.websocket.server.PathParam;
+import javax.websocket.server.ServerEndpoint;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+
 import com.alibaba.fastjson.JSONObject;
 import com.qmth.themis.business.constant.SpringContextHolder;
 import com.qmth.themis.business.constant.SystemConstant;
@@ -11,20 +30,6 @@ import com.qmth.themis.common.signature.SignatureInfo;
 import com.qmth.themis.common.signature.SignatureType;
 import com.qmth.themis.exam.enums.WebsocketTypeEnum;
 import com.qmth.themis.exam.websocketTemplete.WebSocketOeMessageTemplete;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.stereotype.Component;
-
-import javax.websocket.*;
-import javax.websocket.server.PathParam;
-import javax.websocket.server.ServerEndpoint;
-import java.io.IOException;
-import java.lang.reflect.Method;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
 
 /**
  * @Description: websocker服务端
@@ -213,7 +218,7 @@ public class WebSocketServer
      * 在线人数加一
      */
     public synchronized void addOnlineCount() {
-        if (redisUtil.lock(SystemConstant.REDIS_LOCK_WEBSOCKET_PREFIX, this.sessionId, SystemConstant.REDIS_LOCK_WEBSOCKET_TIME_OUT, TimeUnit.SECONDS)) {
+        if (redisUtil.lock(SystemConstant.REDIS_LOCK_WEBSOCKET_PREFIX+this.sessionId, SystemConstant.REDIS_LOCK_WEBSOCKET_TIME_OUT)) {
             Object o = redisUtil.get(SystemConstant.WEBSOCKET_OE_ONLINE_COUNT);
             int count = 0;
             if (Objects.nonNull(o)) {
@@ -228,7 +233,7 @@ public class WebSocketServer
      * 在线人数减一
      */
     public synchronized void subOnlineCount() {
-        if (redisUtil.lock(SystemConstant.REDIS_LOCK_WEBSOCKET_PREFIX, this.sessionId, SystemConstant.REDIS_LOCK_WEBSOCKET_TIME_OUT, TimeUnit.SECONDS)) {
+        if (redisUtil.lock(SystemConstant.REDIS_LOCK_WEBSOCKET_PREFIX+this.sessionId, SystemConstant.REDIS_LOCK_WEBSOCKET_TIME_OUT)) {
             Object o = redisUtil.get(SystemConstant.WEBSOCKET_OE_ONLINE_COUNT);
             int count = 0;
             if (Objects.nonNull(o)) {

+ 1 - 2
themis-mq/src/main/java/com/qmth/themis/mq/listener/RocketSessionConsumer.java

@@ -2,7 +2,6 @@ package com.qmth.themis.mq.listener;
 
 import java.util.List;
 import java.util.Objects;
-import java.util.concurrent.TimeUnit;
 
 import javax.annotation.Resource;
 
@@ -75,7 +74,7 @@ public class RocketSessionConsumer implements
                 mqDto = JacksonUtil.readJson(new String(messageExt.getBody(), Constants.CHARSET), MqDto.class);
                 log.info(":{}-:{} sessionConsumer 接收到的消息:{}", threadId, threadName, JacksonUtil.parseJson(mqDto));
 //                log.info(":{}-:{} sessionConsumer mqDto sequence:{},tag:{}", threadId, threadName, mqDto.getSequence(), mqDto.getTag());
-                if (Objects.nonNull(mqDto.getAck()) && mqDto.getAck().intValue() != SystemConstant.STANDARD_ACK_TYPE && Objects.nonNull(redisUtil.get(SystemConstant.SESSION_TOPIC_BUFFER_LIST, mqDto.getId())) && redisUtil.lock(SystemConstant.REDIS_LOCK_MQ_PREFIX, mqDto.getId(), SystemConstant.REDIS_LOCK_MQ_TIME_OUT, TimeUnit.SECONDS)) {
+                if (Objects.nonNull(mqDto.getAck()) && mqDto.getAck().intValue() != SystemConstant.STANDARD_ACK_TYPE && Objects.nonNull(redisUtil.get(SystemConstant.SESSION_TOPIC_BUFFER_LIST, mqDto.getId())) && redisUtil.lock(SystemConstant.REDIS_LOCK_MQ_PREFIX+mqDto.getId(), SystemConstant.REDIS_LOCK_MQ_TIME_OUT)) {
                     log.info(":{}-:{} 更新db", threadId, threadName);
                     tbSessionService.saveSessionInfo(JacksonUtil.readJson(JacksonUtil.parseJson(mqDto.getBody()), TBSession.class), mqDto.getTimestamp());
                     mqDto.setAck(SystemConstant.STANDARD_ACK_TYPE);

+ 24 - 23
themis-mq/src/main/java/com/qmth/themis/mq/listener/RocketTaskConsumer.java

@@ -1,20 +1,12 @@
 package com.qmth.themis.mq.listener;
 
-import com.google.gson.Gson;
-import com.qmth.themis.business.constant.SpringContextHolder;
-import com.qmth.themis.business.constant.SystemConstant;
-import com.qmth.themis.business.entity.TMRocketMessage;
-import com.qmth.themis.business.service.TMRocketMessageService;
-import com.qmth.themis.business.templete.TaskExportTemplete;
-import com.qmth.themis.business.templete.TaskImportTemplete;
-import com.qmth.themis.business.templete.impl.TaskExamStudentImportTemplete;
-import com.qmth.themis.business.templete.impl.TaskRoomCodeExportTemplete;
-import com.qmth.themis.business.templete.impl.TaskRoomCodeImportTemplete;
-import com.qmth.themis.business.threadPool.MyThreadPool;
-import com.qmth.themis.business.util.JacksonUtil;
-import com.qmth.themis.business.util.RedisUtil;
-import com.qmth.themis.common.contanst.Constants;
-import com.qmth.themis.mq.dto.MqDto;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+import javax.annotation.Resource;
+
 import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
 import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
 import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
@@ -30,12 +22,21 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.stereotype.Service;
 
-import javax.annotation.Resource;
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.concurrent.TimeUnit;
+import com.google.gson.Gson;
+import com.qmth.themis.business.constant.SpringContextHolder;
+import com.qmth.themis.business.constant.SystemConstant;
+import com.qmth.themis.business.entity.TMRocketMessage;
+import com.qmth.themis.business.service.TMRocketMessageService;
+import com.qmth.themis.business.templete.TaskExportTemplete;
+import com.qmth.themis.business.templete.TaskImportTemplete;
+import com.qmth.themis.business.templete.impl.TaskExamStudentImportTemplete;
+import com.qmth.themis.business.templete.impl.TaskRoomCodeExportTemplete;
+import com.qmth.themis.business.templete.impl.TaskRoomCodeImportTemplete;
+import com.qmth.themis.business.threadPool.MyThreadPool;
+import com.qmth.themis.business.util.JacksonUtil;
+import com.qmth.themis.business.util.RedisUtil;
+import com.qmth.themis.common.contanst.Constants;
+import com.qmth.themis.mq.dto.MqDto;
 
 /**
  * @Description: 普通消息监听 task
@@ -75,7 +76,7 @@ public class RocketTaskConsumer {
                     mqDto = JacksonUtil.readJson(new String(messageExt.getBody(), Constants.CHARSET), MqDto.class);
                     log.info(":{}-:{} task import Consumer接收到的消息:{}", threadId, threadName, JacksonUtil.parseJson(mqDto));
 //                    log.info(":{}-:{} task import Consumer mqDto sequence:{},tag:{}", threadId, threadName, mqDto.getSequence(), mqDto.getTag());
-                    if (Objects.nonNull(mqDto.getAck()) && mqDto.getAck().intValue() != SystemConstant.STANDARD_ACK_TYPE && Objects.nonNull(redisUtil.get(SystemConstant.TASKLOG_TOPIC_BUFFER_LIST, mqDto.getId())) && redisUtil.lock(SystemConstant.REDIS_LOCK_MQ_PREFIX, mqDto.getId(), SystemConstant.REDIS_LOCK_MQ_TIME_OUT, TimeUnit.SECONDS)) {
+                    if (Objects.nonNull(mqDto.getAck()) && mqDto.getAck().intValue() != SystemConstant.STANDARD_ACK_TYPE && Objects.nonNull(redisUtil.get(SystemConstant.TASKLOG_TOPIC_BUFFER_LIST, mqDto.getId())) && redisUtil.lock(SystemConstant.REDIS_LOCK_MQ_PREFIX+mqDto.getId(), SystemConstant.REDIS_LOCK_MQ_TIME_OUT)) {
                         Map<String, Object> map = (Map<String, Object>) mqDto.getBody();
                         String tag = mqDto.getTag();
                         myThreadPool.arbitratePoolTaskExecutor.execute(() -> {
@@ -175,7 +176,7 @@ public class RocketTaskConsumer {
                     mqDto = JacksonUtil.readJson(new String(messageExt.getBody(), Constants.CHARSET), MqDto.class);
                     log.info(":{}-:{} task export Consumer接收到的消息:{}", threadId, threadName, JacksonUtil.parseJson(mqDto));
 //                    log.info(":{}-:{} task ExamStudentImport Consumer mqDto sequence:{},tag:{}", threadId, threadName, mqDto.getSequence(), mqDto.getTag());
-                    if (Objects.nonNull(mqDto.getAck()) && mqDto.getAck().intValue() != SystemConstant.STANDARD_ACK_TYPE && Objects.nonNull(redisUtil.get(SystemConstant.TASKLOG_TOPIC_BUFFER_LIST, mqDto.getId())) && redisUtil.lock(SystemConstant.REDIS_LOCK_MQ_PREFIX, mqDto.getId(), SystemConstant.REDIS_LOCK_MQ_TIME_OUT, TimeUnit.SECONDS)) {
+                    if (Objects.nonNull(mqDto.getAck()) && mqDto.getAck().intValue() != SystemConstant.STANDARD_ACK_TYPE && Objects.nonNull(redisUtil.get(SystemConstant.TASKLOG_TOPIC_BUFFER_LIST, mqDto.getId())) && redisUtil.lock(SystemConstant.REDIS_LOCK_MQ_PREFIX+ mqDto.getId(), SystemConstant.REDIS_LOCK_MQ_TIME_OUT)) {
                         Map<String, Object> map = (Map<String, Object>) mqDto.getBody();
                         String tag = mqDto.getTag();
                         myThreadPool.arbitratePoolTaskExecutor.execute(() -> {

+ 18 - 17
themis-mq/src/main/java/com/qmth/themis/mq/listener/RocketUserLogConsumer.java

@@ -1,17 +1,10 @@
 package com.qmth.themis.mq.listener;
 
-import com.google.gson.Gson;
-import com.qmth.themis.business.constant.SystemConstant;
-import com.qmth.themis.business.entity.TMRocketMessage;
-import com.qmth.themis.business.enums.MqEnum;
-import com.qmth.themis.business.enums.SystemOperationEnum;
-import com.qmth.themis.business.service.TEExamStudentLogService;
-import com.qmth.themis.business.service.TEUserLogService;
-import com.qmth.themis.business.service.TMRocketMessageService;
-import com.qmth.themis.business.util.JacksonUtil;
-import com.qmth.themis.business.util.RedisUtil;
-import com.qmth.themis.common.contanst.Constants;
-import com.qmth.themis.mq.dto.MqDto;
+import java.util.List;
+import java.util.Objects;
+
+import javax.annotation.Resource;
+
 import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
 import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
 import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
@@ -28,10 +21,18 @@ import org.slf4j.LoggerFactory;
 import org.springframework.stereotype.Service;
 import org.springframework.transaction.annotation.Transactional;
 
-import javax.annotation.Resource;
-import java.util.List;
-import java.util.Objects;
-import java.util.concurrent.TimeUnit;
+import com.google.gson.Gson;
+import com.qmth.themis.business.constant.SystemConstant;
+import com.qmth.themis.business.entity.TMRocketMessage;
+import com.qmth.themis.business.enums.MqEnum;
+import com.qmth.themis.business.enums.SystemOperationEnum;
+import com.qmth.themis.business.service.TEExamStudentLogService;
+import com.qmth.themis.business.service.TEUserLogService;
+import com.qmth.themis.business.service.TMRocketMessageService;
+import com.qmth.themis.business.util.JacksonUtil;
+import com.qmth.themis.business.util.RedisUtil;
+import com.qmth.themis.common.contanst.Constants;
+import com.qmth.themis.mq.dto.MqDto;
 
 /**
  * @Description: 普通消息监听 用户日志
@@ -79,7 +80,7 @@ public class RocketUserLogConsumer implements MessageListenerConcurrently {
                     o = redisUtil.get(SystemConstant.STUDENTLOG_TOPIC_BUFFER_LIST, mqDto.getId());
                     mqTopic = SystemConstant.STUDENTLOG_TOPIC_BUFFER_LIST;
                 }
-                if (Objects.nonNull(mqDto.getAck()) && mqDto.getAck().intValue() != SystemConstant.STANDARD_ACK_TYPE && Objects.nonNull(o) && redisUtil.lock(SystemConstant.REDIS_LOCK_MQ_PREFIX, mqDto.getId(), SystemConstant.REDIS_LOCK_MQ_TIME_OUT, TimeUnit.SECONDS)) {
+                if (Objects.nonNull(mqDto.getAck()) && mqDto.getAck().intValue() != SystemConstant.STANDARD_ACK_TYPE && Objects.nonNull(o) && redisUtil.lock(SystemConstant.REDIS_LOCK_MQ_PREFIX+mqDto.getId(), SystemConstant.REDIS_LOCK_MQ_TIME_OUT)) {
                     log.info(":{}-:{} 插入用户轨迹日志", threadId, threadName);
                     if (tag.contains("user")) {
                         teUserLogService.saveUserLogInfo(mqDto.getTimestamp(), mqDto.getObjId(), MqEnum.valueOf(String.valueOf(mqDto.getType())).getId(), SystemOperationEnum.valueOf(String.valueOf(mqDto.getBody())).getCode(), JacksonUtil.parseJson(mqDto));