wangliang 4 年 前
コミット
a7fe8845f8
23 ファイル変更927 行追加299 行削除
  1. 3 0
      themis-business/src/main/java/com/qmth/themis/business/constant/SystemConstant.java
  2. 3 3
      themis-business/src/main/java/com/qmth/themis/business/dao/TIeExamInvigilateCallLogMapper.java
  3. 56 44
      themis-business/src/main/java/com/qmth/themis/business/entity/TIeExamInvigilateCallLog.java
  4. 12 8
      themis-business/src/main/java/com/qmth/themis/business/enums/FieldUniqueEnum.java
  5. 3 1
      themis-business/src/main/java/com/qmth/themis/business/enums/MonitorStatusSourceEnum.java
  6. 5 0
      themis-business/src/main/java/com/qmth/themis/business/enums/MqGroupEnum.java
  7. 2 1
      themis-business/src/main/java/com/qmth/themis/business/enums/MqTagEnum.java
  8. 9 1
      themis-business/src/main/java/com/qmth/themis/business/enums/WebsocketTypeEnum.java
  9. 15 0
      themis-business/src/main/java/com/qmth/themis/business/service/TIeExamInvigilateCallLogService.java
  10. 0 16
      themis-business/src/main/java/com/qmth/themis/business/service/TIeExamInvigilateCallService.java
  11. 19 0
      themis-business/src/main/java/com/qmth/themis/business/service/impl/TIeExamInvigilateCallLogServiceImpl.java
  12. 0 19
      themis-business/src/main/java/com/qmth/themis/business/service/impl/TIeExamInvigilateCallServiceImpl.java
  13. 1 1
      themis-business/src/main/resources/mapper/TIeExamInvigilateCallLogMapper.xml
  14. 2 2
      themis-exam/src/main/java/com/qmth/themis/exam/api/TEStudentController.java
  15. 0 158
      themis-exam/src/main/java/com/qmth/themis/exam/api/TIeInvigilateCallController.java
  16. 177 0
      themis-exam/src/main/java/com/qmth/themis/exam/api/TIeInvigilateCallLogMobileController.java
  17. 177 0
      themis-exam/src/main/java/com/qmth/themis/exam/api/TIeInvigilateCallLogOeController.java
  18. 4 0
      themis-exam/src/main/java/com/qmth/themis/exam/start/StartRunning.java
  19. 309 0
      themis-exam/src/main/java/com/qmth/themis/exam/websocket/WebSocketMobileServer.java
  20. 1 1
      themis-exam/src/main/java/com/qmth/themis/exam/websocket/WebSocketOeServer.java
  21. 22 10
      themis-mq/src/main/java/com/qmth/themis/mq/service/MqLogicService.java
  22. 39 34
      themis-mq/src/main/java/com/qmth/themis/mq/service/impl/MqLogicServiceImpl.java
  23. 68 0
      themis-mq/src/main/java/com/qmth/themis/mq/templete/impl/LogConcurrentlyImpl.java

+ 3 - 0
themis-business/src/main/java/com/qmth/themis/business/constant/SystemConstant.java

@@ -72,6 +72,9 @@ public class SystemConstant {
     public static final String OK = "ok";
     public static String FILES_DIR;
     public static String TEMP_FILES_DIR;
+    public static final String MONITOR_LIVE_URL_ = "monitorLiveUrl_";
+    public static final String MONITOR_STATUS_ = "monitorStatus_";
+
     /**
      * session过期时间
      */

+ 3 - 3
themis-business/src/main/java/com/qmth/themis/business/dao/TIeExamInvigilateCallMapper.java → themis-business/src/main/java/com/qmth/themis/business/dao/TIeExamInvigilateCallLogMapper.java

@@ -1,17 +1,17 @@
 package com.qmth.themis.business.dao;
 
 import com.baomidou.mybatisplus.core.mapper.BaseMapper;
-import com.qmth.themis.business.entity.TIeExamInvigilateCall;
+import com.qmth.themis.business.entity.TIeExamInvigilateCallLog;
 import org.apache.ibatis.annotations.Mapper;
 
 /**
- * @Description: 监考 监控通话申请 Mapper 接口
+ * @Description: 监考 监控通话申请日志 Mapper 接口
  * @Param:
  * @return:
  * @Author: wangliang
  * @Date: 2020/6/25
  */
 @Mapper
-public interface TIeExamInvigilateCallMapper extends BaseMapper<TIeExamInvigilateCall> {
+public interface TIeExamInvigilateCallLogMapper extends BaseMapper<TIeExamInvigilateCallLog> {
 
 }

+ 56 - 44
themis-business/src/main/java/com/qmth/themis/business/entity/TIeExamInvigilateCall.java → themis-business/src/main/java/com/qmth/themis/business/entity/TIeExamInvigilateCallLog.java

@@ -2,19 +2,25 @@ package com.qmth.themis.business.entity;
 
 import com.baomidou.mybatisplus.annotation.TableField;
 import com.qmth.themis.business.base.BaseEntity;
+import com.qmth.themis.business.enums.MonitorStatusSourceEnum;
+import com.qmth.themis.business.enums.MonitorVideoSourceEnum;
 import com.qmth.themis.common.contanst.Constants;
 import io.swagger.annotations.ApiModel;
 import io.swagger.annotations.ApiModelProperty;
 
 /**
- * @Description: 监考 监控通话申请
+ * @Description: 监考 监控通话申请日志
  * @Param:
  * @return:
  * @Author: wangliang
  * @Date: 2020/8/7
  */
-@ApiModel(value = "t_ie_exam_invigilate_call", description = "监控观看地址和通话申请")
-public class TIeExamInvigilateCall extends BaseEntity {
+@ApiModel(value = "t_ie_exam_invigilate_call_log", description = "监控观看地址和通话申请日志")
+public class TIeExamInvigilateCallLog extends BaseEntity {
+
+    @ApiModelProperty(value = "考试记录ID")
+    @TableField(value = "exam_record_id")
+    private Long examRecordId;
 
     @ApiModelProperty(value = "考试ID")
     @TableField(value = "exam_id")
@@ -24,17 +30,13 @@ public class TIeExamInvigilateCall extends BaseEntity {
     @TableField(value = "exam_activity_id")
     private Long examActivityId;
 
-    @ApiModelProperty(value = "考试记录ID")
-    @TableField(value = "exam_record_id")
-    private Long examRecordId;
-
     @ApiModelProperty(value = "考生ID")
     @TableField(value = "exam_student_id")
     private Long examStudentId;
 
     @ApiModelProperty(value = "监考视频源")
     @TableField(value = "source")
-    private String source;
+    private MonitorVideoSourceEnum source;
 
     @ApiModelProperty(value = "观看地址")
     @TableField(value = "live_url")
@@ -42,37 +44,31 @@ public class TIeExamInvigilateCall extends BaseEntity {
 
     @ApiModelProperty(value = "状态")
     @TableField(value = "status")
-    private String status;
+    private MonitorStatusSourceEnum status;
 
-    @ApiModelProperty(value = "类型")
-    @TableField(value = "type")
-    private String type;
+    @ApiModelProperty(value = "备注")
+    @TableField(value = "remark")
+    private String remark;
 
-    public TIeExamInvigilateCall() {
+    public TIeExamInvigilateCallLog() {
 
     }
 
-    public TIeExamInvigilateCall(Long examRecordId, String source, String liveUrl) {
+    public TIeExamInvigilateCallLog(Long examRecordId, MonitorVideoSourceEnum source, String liveUrl, MonitorStatusSourceEnum status) {
         setId(Constants.idGen.next());
         this.examRecordId = examRecordId;
         this.source = source;
         this.liveUrl = liveUrl;
+        this.status = status;
     }
 
-    public Long getExamId() {
-        return examId;
-    }
-
-    public void setExamId(Long examId) {
-        this.examId = examId;
-    }
-
-    public Long getExamActivityId() {
-        return examActivityId;
-    }
-
-    public void setExamActivityId(Long examActivityId) {
-        this.examActivityId = examActivityId;
+    public TIeExamInvigilateCallLog(Long examRecordId, MonitorVideoSourceEnum source, String liveUrl, MonitorStatusSourceEnum status, String remark) {
+        setId(Constants.idGen.next());
+        this.examRecordId = examRecordId;
+        this.source = source;
+        this.liveUrl = liveUrl;
+        this.remark = remark;
+        this.status = status;
     }
 
     public Long getExamRecordId() {
@@ -83,19 +79,11 @@ public class TIeExamInvigilateCall extends BaseEntity {
         this.examRecordId = examRecordId;
     }
 
-    public Long getExamStudentId() {
-        return examStudentId;
-    }
-
-    public void setExamStudentId(Long examStudentId) {
-        this.examStudentId = examStudentId;
-    }
-
-    public String getSource() {
+    public MonitorVideoSourceEnum getSource() {
         return source;
     }
 
-    public void setSource(String source) {
+    public void setSource(MonitorVideoSourceEnum source) {
         this.source = source;
     }
 
@@ -107,19 +95,43 @@ public class TIeExamInvigilateCall extends BaseEntity {
         this.liveUrl = liveUrl;
     }
 
-    public String getStatus() {
+    public MonitorStatusSourceEnum getStatus() {
         return status;
     }
 
-    public void setStatus(String status) {
+    public void setStatus(MonitorStatusSourceEnum status) {
         this.status = status;
     }
 
-    public String getType() {
-        return type;
+    public String getRemark() {
+        return remark;
+    }
+
+    public void setRemark(String remark) {
+        this.remark = remark;
+    }
+
+    public Long getExamId() {
+        return examId;
+    }
+
+    public void setExamId(Long examId) {
+        this.examId = examId;
+    }
+
+    public Long getExamActivityId() {
+        return examActivityId;
     }
 
-    public void setType(String type) {
-        this.type = type;
+    public void setExamActivityId(Long examActivityId) {
+        this.examActivityId = examActivityId;
+    }
+
+    public Long getExamStudentId() {
+        return examStudentId;
+    }
+
+    public void setExamStudentId(Long examStudentId) {
+        this.examStudentId = examStudentId;
     }
 }

+ 12 - 8
themis-business/src/main/java/com/qmth/themis/business/enums/FieldUniqueEnum.java

@@ -25,9 +25,9 @@ public enum FieldUniqueEnum {
 
     t_b_exam_invigilate_user_orgId_userId_roomCode_Idx("监考员"),
 
-    t_b_user_loginName_orgId_Idx("用户名"),
+    t_b_user_loginName_orgId_Idx("用户名");
 
-    t_ie_exam_invigilate_call_recordId_source_Idx("监考视频源");
+//    t_ie_exam_invigilate_call_recordId_source_Idx("监考视频源");
 
     private String code;
 
@@ -62,9 +62,11 @@ public enum FieldUniqueEnum {
             return t_b_exam_invigilate_user_orgId_userId_roomCode_Idx.name();
         } else if (Objects.equals(value.trim(), t_b_user_loginName_orgId_Idx.code)) {
             return t_b_user_loginName_orgId_Idx.name();
-        } else if (Objects.equals(value.trim(), t_ie_exam_invigilate_call_recordId_source_Idx.code)) {
-            return t_ie_exam_invigilate_call_recordId_source_Idx.name();
-        } else {
+        }
+//        else if (Objects.equals(value.trim(), t_ie_exam_invigilate_call_recordId_source_Idx.code)) {
+//            return t_ie_exam_invigilate_call_recordId_source_Idx.name();
+//        }
+        else {
             return t_e_exam_activity_examId_code_Idx.name();
         }
     }
@@ -92,9 +94,11 @@ public enum FieldUniqueEnum {
             return t_b_exam_invigilate_user_orgId_userId_roomCode_Idx.code;
         } else if (value.trim().contains(t_b_user_loginName_orgId_Idx.name())) {
             return t_b_user_loginName_orgId_Idx.code;
-        } else if (value.trim().contains(t_ie_exam_invigilate_call_recordId_source_Idx.name())) {
-            return t_ie_exam_invigilate_call_recordId_source_Idx.code;
-        } else {
+        }
+//        else if (value.trim().contains(t_ie_exam_invigilate_call_recordId_source_Idx.name())) {
+//            return t_ie_exam_invigilate_call_recordId_source_Idx.code;
+//        }
+        else {
             return t_e_exam_activity_examId_code_Idx.code;
         }
     }

+ 3 - 1
themis-business/src/main/java/com/qmth/themis/business/enums/MonitorStatusSourceEnum.java

@@ -13,7 +13,9 @@ public enum MonitorStatusSourceEnum {
 
     STOP("停止"),
 
-    START("正常");
+    START("正常"),
+
+    FINISH("通话完成");
 
     private String code;
 

+ 5 - 0
themis-business/src/main/java/com/qmth/themis/business/enums/MqGroupEnum.java

@@ -24,6 +24,11 @@ public enum MqGroupEnum {
      */
     taskConsumerGroup("themis-group-exam-task"),
 
+    /**
+     * 日志
+     */
+    logConsumerGroup("themis-group-exam-log"),
+
     /**
      * websocket超时退出 考生 group
      */

+ 2 - 1
themis-business/src/main/java/com/qmth/themis/business/enums/MqTagEnum.java

@@ -38,7 +38,8 @@ public enum MqTagEnum {
     examRecordInit("考试记录数据初始化", "考试日志", "normal", 26),
     examBreakHistory("考试断点记录", "考试日志", "normal", 27),
     warningLog("预警日志标签", "预警日志", "normal", 28),
-    exceptionLog("异常日志标签", "异常日志", "normal", 29);
+    exceptionLog("异常日志标签", "异常日志", "normal", 29),
+    monitorLog("监考监控日志标签", "监考监控日志", "normal", 30);
 
     private MqTagEnum(String desc, String code, String type, int id) {
         this.desc = desc;

+ 9 - 1
themis-business/src/main/java/com/qmth/themis/business/enums/WebsocketTypeEnum.java

@@ -27,7 +27,11 @@ public enum WebsocketTypeEnum {
 
     BREACH_STOP_EXAM("预警强制收卷", "breachStopExam"),
 
-    HAND_STOP_EXAM("手动收卷", "handStopExam");
+    HAND_STOP_EXAM("手动收卷", "handStopExam"),
+
+    EXAM_START("考试开始", "examStart"),
+
+    EXAM_STOP("考试结束", "examStop");
 
     private String code;
     private String desc;
@@ -68,6 +72,10 @@ public enum WebsocketTypeEnum {
             return INVIGILATE_STOP_EXAM.name();
         } else if (Objects.equals(value.trim(), BREACH_STOP_EXAM.getDesc())) {
             return BREACH_STOP_EXAM.name();
+        } else if (Objects.equals(value.trim(), EXAM_START.getDesc())) {
+            return EXAM_START.name();
+        } else if (Objects.equals(value.trim(), EXAM_STOP.getDesc())) {
+            return EXAM_STOP.name();
         } else {
             return HAND_STOP_EXAM.name();
         }

+ 15 - 0
themis-business/src/main/java/com/qmth/themis/business/service/TIeExamInvigilateCallLogService.java

@@ -0,0 +1,15 @@
+package com.qmth.themis.business.service;
+
+import com.baomidou.mybatisplus.extension.service.IService;
+import com.qmth.themis.business.entity.TIeExamInvigilateCallLog;
+
+/**
+ * @Description: 监考监控通话申请日志 服务类
+ * @Param:
+ * @return:
+ * @Author: wangliang
+ * @Date: 2020/6/25
+ */
+public interface TIeExamInvigilateCallLogService extends IService<TIeExamInvigilateCallLog> {
+
+}

+ 0 - 16
themis-business/src/main/java/com/qmth/themis/business/service/TIeExamInvigilateCallService.java

@@ -1,16 +0,0 @@
-package com.qmth.themis.business.service;
-
-import com.baomidou.mybatisplus.extension.service.IService;
-import com.qmth.themis.business.entity.TIeExamInvigilateCall;
-import com.qmth.themis.business.entity.TIeExamInvigilateNotice;
-
-/**
- * @Description: 监考监控通话申请 服务类
- * @Param:
- * @return:
- * @Author: wangliang
- * @Date: 2020/6/25
- */
-public interface TIeExamInvigilateCallService extends IService<TIeExamInvigilateCall> {
-
-}

+ 19 - 0
themis-business/src/main/java/com/qmth/themis/business/service/impl/TIeExamInvigilateCallLogServiceImpl.java

@@ -0,0 +1,19 @@
+package com.qmth.themis.business.service.impl;
+
+import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
+import com.qmth.themis.business.dao.TIeExamInvigilateCallLogMapper;
+import com.qmth.themis.business.entity.TIeExamInvigilateCallLog;
+import com.qmth.themis.business.service.TIeExamInvigilateCallLogService;
+import org.springframework.stereotype.Service;
+
+/**
+ * @Description: 监考监控通话申请日志 服务实现类
+ * @Param:
+ * @return:
+ * @Author: wangliang
+ * @Date: 2020/6/25
+ */
+@Service
+public class TIeExamInvigilateCallLogServiceImpl extends ServiceImpl<TIeExamInvigilateCallLogMapper, TIeExamInvigilateCallLog> implements TIeExamInvigilateCallLogService {
+
+}

+ 0 - 19
themis-business/src/main/java/com/qmth/themis/business/service/impl/TIeExamInvigilateCallServiceImpl.java

@@ -1,19 +0,0 @@
-package com.qmth.themis.business.service.impl;
-
-import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
-import com.qmth.themis.business.dao.TIeExamInvigilateCallMapper;
-import com.qmth.themis.business.entity.TIeExamInvigilateCall;
-import com.qmth.themis.business.service.TIeExamInvigilateCallService;
-import org.springframework.stereotype.Service;
-
-/**
- * @Description: 监考通知消息记录 服务实现类
- * @Param:
- * @return:
- * @Author: wangliang
- * @Date: 2020/6/25
- */
-@Service
-public class TIeExamInvigilateCallServiceImpl extends ServiceImpl<TIeExamInvigilateCallMapper, TIeExamInvigilateCall> implements TIeExamInvigilateCallService {
-
-}

+ 1 - 1
themis-business/src/main/resources/mapper/TIeExamInvigilateCallMapper.xml → themis-business/src/main/resources/mapper/TIeExamInvigilateCallLogMapper.xml

@@ -1,5 +1,5 @@
 <?xml version="1.0" encoding="UTF-8"?>
 <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
-<mapper namespace="com.qmth.themis.business.dao.TIeExamInvigilateCallMapper">
+<mapper namespace="com.qmth.themis.business.dao.TIeExamInvigilateCallLogMapper">
 
 </mapper>

+ 2 - 2
themis-exam/src/main/java/com/qmth/themis/exam/api/TEStudentController.java

@@ -179,7 +179,7 @@ public class TEStudentController {
         TBSession tbSession = new TBSession(sessionId, String.valueOf(teStudent.getId()), authDto.getRoleCodes().toString(), source, platform.name(), deviceId, ServletUtil.getRequest().getLocalAddr(), token, expire);
         redisUtil.setUserSession(sessionId, tbSession, redisExpire);
         //mq发送消息start
-        mqDtoService.assembleSendOneWayMsg(MqTopicEnum.themisTopic.getCode(), platform.name(), tbSession,platform.name(), tbSession.getId(), teStudent.getIdentity());
+        mqDtoService.assembleSendOneWayMsg(MqTopicEnum.themisTopic.getCode(), platform.name(), tbSession, platform.name(), tbSession.getId(), teStudent.getIdentity());
         mqDtoService.assembleSendOneWayMsg(MqTopicEnum.themisTopic.getCode(), authDto.getRoleCodes().toString().contains(RoleEnum.STUDENT.name()) ? MqTagEnum.student.name() : MqTagEnum.user.name(), SystemOperationEnum.LOGIN, MqTagEnum.student.name(), teStudent.getId(), teStudent.getIdentity());
         //mq发送消息end
         //测试
@@ -241,7 +241,7 @@ public class TEStudentController {
                         objectMap.put("lastBreakTime", new Date());
                         objectMap.put("leftBreakResumeCount", leftBreakResumeCount);
                         objectMap.put("lastStartTime", new Date());
-
+                        redisUtil.setForHash(RedisKeyHelper.examRecordCacheKey(recordId), objectMap);
                         //发送mq,增加断点次数记录
                         MqDto mqDto = new MqDto(MqTopicEnum.themisTopic.getCode(), MqTagEnum.examBreakHistory.name(), JacksonUtil.parseJson(objectMap), MqTagEnum.examBreakHistory, String.valueOf(recordId), "增加断点记录");
                         mqDtoService.assembleSendOneWayMsg(mqDto);

+ 0 - 158
themis-exam/src/main/java/com/qmth/themis/exam/api/TIeInvigilateCallController.java

@@ -1,158 +0,0 @@
-package com.qmth.themis.exam.api;
-
-import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
-import com.qmth.themis.business.annotation.ApiJsonObject;
-import com.qmth.themis.business.annotation.ApiJsonProperty;
-import com.qmth.themis.business.constant.SystemConstant;
-import com.qmth.themis.business.entity.TIeExamInvigilateCall;
-import com.qmth.themis.business.enums.FieldUniqueEnum;
-import com.qmth.themis.business.service.TIeExamInvigilateCallService;
-import com.qmth.themis.common.enums.ExceptionResultEnum;
-import com.qmth.themis.common.exception.BusinessException;
-import com.qmth.themis.common.util.Result;
-import com.qmth.themis.common.util.ResultUtil;
-import io.swagger.annotations.*;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.dao.DuplicateKeyException;
-import org.springframework.transaction.annotation.Transactional;
-import org.springframework.web.bind.annotation.*;
-
-import javax.annotation.Resource;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-
-/**
- * @Description: 监考监控通话信息 前端控制器
- * @Param:
- * @return:
- * @Author: wangliang
- * @Date: 2020/6/25
- */
-@Api(tags = "监考监控通话信息Controller")
-@RestController
-@RequestMapping("/${prefix.url.exam}/monitor")
-public class TIeInvigilateCallController {
-    private final static Logger log = LoggerFactory.getLogger(TIeInvigilateCallController.class);
-
-    @Resource
-    TIeExamInvigilateCallService tIeExamInvigilateCallService;
-
-    @ApiOperation(value = "监控观看地址更新接口")
-    @RequestMapping(value = "/live_url", method = RequestMethod.POST)
-    @ApiResponses({@ApiResponse(code = 200, message = "{\"success\":true}", response = Result.class)})
-    @Transactional
-    public Result liveUrl(@ApiJsonObject(name = "liveUrl", value = {
-            @ApiJsonProperty(key = "recordId", type = "long", example = "1", description = "考试记录id", required = true),
-            @ApiJsonProperty(key = "source", description = "监考视频源", required = true),
-            @ApiJsonProperty(key = "liveUrl", description = "观看地址", required = true)
-    }) @ApiParam(value = "监控信息", required = true) @RequestBody Map<String, Object> mapParameter) {
-        if (Objects.isNull(mapParameter.get("recordId")) || Objects.equals(mapParameter.get("recordId"), "")) {
-            throw new BusinessException(ExceptionResultEnum.RECORD_ID_IS_NULL);
-        }
-        Long recordId = null;
-        try {
-            recordId = Long.parseLong(String.valueOf(mapParameter.get("recordId")));
-            if (Objects.isNull(mapParameter.get("source")) || Objects.equals(mapParameter.get("source"), "")) {
-                throw new BusinessException("监考视频源不能为空");
-            }
-            String source = String.valueOf(mapParameter.get("source"));
-            if (Objects.isNull(mapParameter.get("liveUrl")) || Objects.equals(mapParameter.get("liveUrl"), "")) {
-                throw new BusinessException("观看地址不能为空");
-            }
-            String liveUrl = String.valueOf(mapParameter.get("liveUrl"));
-            QueryWrapper<TIeExamInvigilateCall> tIeExamInvigilateCallQueryWrapper = new QueryWrapper<>();
-            tIeExamInvigilateCallQueryWrapper.lambda().eq(TIeExamInvigilateCall::getExamRecordId, recordId).eq(TIeExamInvigilateCall::getSource, source);
-            TIeExamInvigilateCall tIeExamInvigilateCall = tIeExamInvigilateCallService.getOne(tIeExamInvigilateCallQueryWrapper);
-            if (Objects.isNull(tIeExamInvigilateCall)) {
-                tIeExamInvigilateCall = new TIeExamInvigilateCall(recordId, source, liveUrl);
-                tIeExamInvigilateCallService.save(tIeExamInvigilateCall);
-            } else {
-                tIeExamInvigilateCall.setLiveUrl(liveUrl);
-                tIeExamInvigilateCallService.updateById(tIeExamInvigilateCall);
-            }
-        } catch (Exception e) {
-            e.printStackTrace();
-            if (e instanceof DuplicateKeyException) {
-                String errorColumn = e.getCause().toString();
-                String columnStr = errorColumn.substring(errorColumn.lastIndexOf("key") + 3, errorColumn.length()).replaceAll("'", "");
-                throw new BusinessException("考试记录id[" + recordId + "]下的" + FieldUniqueEnum.convertToCode(columnStr) + "数据不允许重复插入");
-            } else if (e instanceof BusinessException) {
-                throw new BusinessException(e.getMessage());
-            } else {
-                throw new RuntimeException(e);
-            }
-        }
-        return ResultUtil.ok(SystemConstant.SUCCESS);
-    }
-
-    @ApiOperation(value = "发送通话申请接口")
-    @RequestMapping(value = "/call/apply", method = RequestMethod.POST)
-    @ApiResponses({@ApiResponse(code = 200, message = "{\"success\":true}", response = Result.class)})
-    @Transactional
-    public Result callApply(@ApiJsonObject(name = "callApply", value = {
-            @ApiJsonProperty(key = "recordId", type = "long", example = "1", description = "考试记录id", required = true),
-    }) @ApiParam(value = "监控信息", required = true) @RequestBody Map<String, Object> mapParameter) {
-        if (Objects.isNull(mapParameter.get("recordId")) || Objects.equals(mapParameter.get("recordId"), "")) {
-            throw new BusinessException(ExceptionResultEnum.RECORD_ID_IS_NULL);
-        }
-        Long recordId = Long.parseLong(String.valueOf(mapParameter.get("recordId")));
-        QueryWrapper<TIeExamInvigilateCall> tIeExamInvigilateCallQueryWrapper = new QueryWrapper<>();
-        tIeExamInvigilateCallQueryWrapper.lambda().eq(TIeExamInvigilateCall::getExamRecordId, recordId);
-        TIeExamInvigilateCall tIeExamInvigilateCall = tIeExamInvigilateCallService.getOne(tIeExamInvigilateCallQueryWrapper);
-        if (Objects.isNull(tIeExamInvigilateCall)) {
-            throw new BusinessException("监控通话信息为空");
-        }
-        tIeExamInvigilateCall.setStatus("apply");
-        tIeExamInvigilateCallService.updateById(tIeExamInvigilateCall);
-        return ResultUtil.ok(SystemConstant.SUCCESS);
-    }
-
-    @ApiOperation(value = "撤销通话申请接口")
-    @RequestMapping(value = "/call/cancel", method = RequestMethod.POST)
-    @ApiResponses({@ApiResponse(code = 200, message = "{\"success\":true}", response = Result.class)})
-    @Transactional
-    public Result callCancel(@ApiJsonObject(name = "callCancel", value = {
-            @ApiJsonProperty(key = "recordId", type = "long", example = "1", description = "考试记录id", required = true),
-    }) @ApiParam(value = "监控信息", required = true) @RequestBody Map<String, Object> mapParameter) {
-        if (Objects.isNull(mapParameter.get("recordId")) || Objects.equals(mapParameter.get("recordId"), "")) {
-            throw new BusinessException(ExceptionResultEnum.RECORD_ID_IS_NULL);
-        }
-        Long recordId = Long.parseLong(String.valueOf(mapParameter.get("recordId")));
-        QueryWrapper<TIeExamInvigilateCall> tIeExamInvigilateCallQueryWrapper = new QueryWrapper<>();
-        tIeExamInvigilateCallQueryWrapper.lambda().eq(TIeExamInvigilateCall::getExamRecordId, recordId);
-        TIeExamInvigilateCall tIeExamInvigilateCall = tIeExamInvigilateCallService.getOne(tIeExamInvigilateCallQueryWrapper);
-        if (Objects.isNull(tIeExamInvigilateCall)) {
-            throw new BusinessException("监控通话信息为空");
-        }
-        tIeExamInvigilateCall.setStatus("cancel");
-        tIeExamInvigilateCallService.updateById(tIeExamInvigilateCall);
-        return ResultUtil.ok(SystemConstant.SUCCESS);
-    }
-
-    @ApiOperation(value = "监考监控通话查询接口")
-    @RequestMapping(value = "/call/list", method = RequestMethod.POST)
-    @ApiResponses({@ApiResponse(code = 200, message = "监考监控信息", response = TIeExamInvigilateCall.class)})
-    public Result callList() {
-        QueryWrapper<TIeExamInvigilateCall> tIeExamInvigilateCallQueryWrapper = new QueryWrapper<>();
-        tIeExamInvigilateCallQueryWrapper.lambda().eq(TIeExamInvigilateCall::getSource, "apply");
-        List<TIeExamInvigilateCall> tIeExamInvigilateCallList = tIeExamInvigilateCallService.list(tIeExamInvigilateCallQueryWrapper);
-        Map map = new HashMap<>();
-        map.put(SystemConstant.RECORDS, tIeExamInvigilateCallList);
-        return ResultUtil.ok(map);
-    }
-
-    @ApiOperation(value = "监考监控通话查询来源接口")
-    @RequestMapping(value = "/call/query", method = RequestMethod.POST)
-    @ApiResponses({@ApiResponse(code = 200, message = "监考监控信息", response = TIeExamInvigilateCall.class)})
-    public Result callQuery(@ApiParam(value = "考试记录id", required = true) @RequestParam(required = true) Long recordId) {
-        QueryWrapper<TIeExamInvigilateCall> tIeExamInvigilateCallQueryWrapper = new QueryWrapper<>();
-        tIeExamInvigilateCallQueryWrapper.lambda().eq(TIeExamInvigilateCall::getExamRecordId, recordId);
-        List<TIeExamInvigilateCall> tIeExamInvigilateCallList = tIeExamInvigilateCallService.list(tIeExamInvigilateCallQueryWrapper);
-        Map map = new HashMap<>();
-        map.put(SystemConstant.RECORDS, tIeExamInvigilateCallList);
-        return ResultUtil.ok(map);
-    }
-}

+ 177 - 0
themis-exam/src/main/java/com/qmth/themis/exam/api/TIeInvigilateCallLogMobileController.java

@@ -0,0 +1,177 @@
+package com.qmth.themis.exam.api;
+
+import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
+import com.qmth.themis.business.annotation.ApiJsonObject;
+import com.qmth.themis.business.annotation.ApiJsonProperty;
+import com.qmth.themis.business.cache.RedisKeyHelper;
+import com.qmth.themis.business.constant.SystemConstant;
+import com.qmth.themis.business.dto.MqDto;
+import com.qmth.themis.business.entity.TIeExamInvigilateCallLog;
+import com.qmth.themis.business.enums.MonitorStatusSourceEnum;
+import com.qmth.themis.business.enums.MonitorVideoSourceEnum;
+import com.qmth.themis.business.enums.MqTagEnum;
+import com.qmth.themis.business.enums.MqTopicEnum;
+import com.qmth.themis.business.service.MqDtoService;
+import com.qmth.themis.business.service.TIeExamInvigilateCallLogService;
+import com.qmth.themis.business.util.RedisUtil;
+import com.qmth.themis.common.enums.ExceptionResultEnum;
+import com.qmth.themis.common.exception.BusinessException;
+import com.qmth.themis.common.util.Result;
+import com.qmth.themis.common.util.ResultUtil;
+import io.swagger.annotations.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.transaction.annotation.Transactional;
+import org.springframework.web.bind.annotation.*;
+
+import javax.annotation.Resource;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * @Description: mobile监考监控通话信息 前端控制器
+ * @Param:
+ * @return:
+ * @Author: wangliang
+ * @Date: 2020/6/25
+ */
+@Api(tags = "mobile监考监控通话信息Controller")
+@RestController
+@RequestMapping("/${prefix.url.mobile}/monitor")
+public class TIeInvigilateCallLogMobileController {
+    private final static Logger log = LoggerFactory.getLogger(TIeInvigilateCallLogOeController.class);
+
+    @Resource
+    TIeExamInvigilateCallLogService tIeExamInvigilateCallLogService;
+
+    @Resource
+    RedisUtil redisUtil;
+
+    @Resource
+    MqDtoService mqDtoService;
+
+    @ApiOperation(value = "监控观看地址更新接口")
+    @RequestMapping(value = "/live_url", method = RequestMethod.POST)
+    @ApiResponses({@ApiResponse(code = 200, message = "{\"success\":true}", response = Result.class)})
+    @Transactional
+    public Result liveUrl(@ApiJsonObject(name = "liveUrlOe", value = {
+            @ApiJsonProperty(key = "recordId", type = "long", example = "1", description = "考试记录id", required = true),
+            @ApiJsonProperty(key = "source", description = "监考视频源", required = true),
+            @ApiJsonProperty(key = "liveUrl", description = "观看地址", required = true)
+    }) @ApiParam(value = "监控信息", required = true) @RequestBody Map<String, Object> mapParameter) {
+        if (Objects.isNull(mapParameter.get("recordId")) || Objects.equals(mapParameter.get("recordId"), "")) {
+            throw new BusinessException(ExceptionResultEnum.RECORD_ID_IS_NULL);
+        }
+        Long recordId = null;
+        recordId = Long.parseLong(String.valueOf(mapParameter.get("recordId")));
+        if (Objects.isNull(mapParameter.get("source")) || Objects.equals(mapParameter.get("source"), "")) {
+            throw new BusinessException("监考视频源不能为空");
+        }
+        MonitorVideoSourceEnum source = MonitorVideoSourceEnum.valueOf(String.valueOf(mapParameter.get("source")).toUpperCase());
+        if (Objects.isNull(mapParameter.get("liveUrl")) || Objects.equals(mapParameter.get("liveUrl"), "")) {
+            throw new BusinessException("观看地址不能为空");
+        }
+        String liveUrl = String.valueOf(mapParameter.get("liveUrl"));
+        TIeExamInvigilateCallLog tIeExamInvigilateCallLog = new TIeExamInvigilateCallLog(recordId, source, liveUrl, MonitorStatusSourceEnum.INIT);
+        //获取考试记录缓存
+        Map<String, Object> objectMap = redisUtil.getHashEntries(RedisKeyHelper.examRecordCacheKey(recordId));
+        objectMap.put(SystemConstant.MONITOR_LIVE_URL_ + source.name(), liveUrl);
+        objectMap.put(SystemConstant.MONITOR_STATUS_ + source.name(), tIeExamInvigilateCallLog.getStatus().name());
+        redisUtil.setForHash(RedisKeyHelper.examRecordCacheKey(recordId), objectMap);
+        //监考监控通话信息 发送mq start
+        MqDto mqDto = new MqDto(MqTopicEnum.themisTopic.getCode(), MqTagEnum.monitorLog.name(), tIeExamInvigilateCallLog, MqTagEnum.monitorLog, String.valueOf(tIeExamInvigilateCallLog.getId()), source.name());
+        mqDtoService.assembleSendOneWayMsg(mqDto);
+        //监考监控通话信息 发送mq end
+        return ResultUtil.ok(SystemConstant.SUCCESS);
+    }
+
+    @ApiOperation(value = "发送通话申请接口")
+    @RequestMapping(value = "/call/apply", method = RequestMethod.POST)
+    @ApiResponses({@ApiResponse(code = 200, message = "{\"success\":true}", response = Result.class)})
+    @Transactional
+    public Result callApply(@ApiJsonObject(name = "callApplyOe", value = {
+            @ApiJsonProperty(key = "recordId", type = "long", example = "1", description = "考试记录id", required = true),
+    }) @ApiParam(value = "监控信息", required = true) @RequestBody Map<String, Object> mapParameter) {
+        if (Objects.isNull(mapParameter.get("recordId")) || Objects.equals(mapParameter.get("recordId"), "")) {
+            throw new BusinessException(ExceptionResultEnum.RECORD_ID_IS_NULL);
+        }
+        Long recordId = Long.parseLong(String.valueOf(mapParameter.get("recordId")));
+        //获取考试记录缓存
+        Map<String, Object> objectMap = redisUtil.getHashEntries(RedisKeyHelper.examRecordCacheKey(recordId));
+        String liveUrl = null;
+        MonitorVideoSourceEnum source = null;
+        if (Objects.nonNull(objectMap.get(SystemConstant.MONITOR_LIVE_URL_ + MonitorVideoSourceEnum.MOBILE_FIRST.name()))) {
+            source = MonitorVideoSourceEnum.MOBILE_FIRST;
+            liveUrl = String.valueOf(objectMap.get(SystemConstant.MONITOR_LIVE_URL_ + MonitorVideoSourceEnum.MOBILE_FIRST.name()));
+        } else if (Objects.nonNull(objectMap.get(SystemConstant.MONITOR_LIVE_URL_ + MonitorVideoSourceEnum.MOBILE_SECOND.name()))) {
+            source = MonitorVideoSourceEnum.MOBILE_SECOND;
+            liveUrl = String.valueOf(objectMap.get(SystemConstant.MONITOR_LIVE_URL_ + MonitorVideoSourceEnum.MOBILE_SECOND.name()));
+        }
+        objectMap.put(SystemConstant.MONITOR_STATUS_ + source, MonitorStatusSourceEnum.START.name());
+        TIeExamInvigilateCallLog tIeExamInvigilateCallLog = new TIeExamInvigilateCallLog(recordId, source, liveUrl, MonitorStatusSourceEnum.START);
+        redisUtil.setForHash(RedisKeyHelper.examRecordCacheKey(recordId), objectMap);
+        //监考监控通话信息 发送mq start
+        MqDto mqDto = new MqDto(MqTopicEnum.themisTopic.getCode(), MqTagEnum.monitorLog.name(), tIeExamInvigilateCallLog, MqTagEnum.monitorLog, String.valueOf(tIeExamInvigilateCallLog.getId()), source.name());
+        mqDtoService.assembleSendOneWayMsg(mqDto);
+        //监考监控通话信息 发送mq end
+        return ResultUtil.ok(SystemConstant.SUCCESS);
+    }
+
+    @ApiOperation(value = "撤销通话申请接口")
+    @RequestMapping(value = "/call/cancel", method = RequestMethod.POST)
+    @ApiResponses({@ApiResponse(code = 200, message = "{\"success\":true}", response = Result.class)})
+    @Transactional
+    public Result callCancel(@ApiJsonObject(name = "callCancelOe", value = {
+            @ApiJsonProperty(key = "recordId", type = "long", example = "1", description = "考试记录id", required = true),
+    }) @ApiParam(value = "监控信息", required = true) @RequestBody Map<String, Object> mapParameter) {
+        if (Objects.isNull(mapParameter.get("recordId")) || Objects.equals(mapParameter.get("recordId"), "")) {
+            throw new BusinessException(ExceptionResultEnum.RECORD_ID_IS_NULL);
+        }
+        Long recordId = Long.parseLong(String.valueOf(mapParameter.get("recordId")));
+        //获取考试记录缓存
+        Map<String, Object> objectMap = redisUtil.getHashEntries(RedisKeyHelper.examRecordCacheKey(recordId));
+        String liveUrl = null;
+        MonitorVideoSourceEnum source = null;
+        if (Objects.nonNull(objectMap.get(SystemConstant.MONITOR_LIVE_URL_ + MonitorVideoSourceEnum.MOBILE_FIRST.name()))) {
+            source = MonitorVideoSourceEnum.MOBILE_FIRST;
+            liveUrl = String.valueOf(objectMap.get(SystemConstant.MONITOR_LIVE_URL_ + MonitorVideoSourceEnum.MOBILE_FIRST.name()));
+        } else if (Objects.nonNull(objectMap.get(SystemConstant.MONITOR_LIVE_URL_ + MonitorVideoSourceEnum.MOBILE_SECOND.name()))) {
+            source = MonitorVideoSourceEnum.MOBILE_SECOND;
+            liveUrl = String.valueOf(objectMap.get(SystemConstant.MONITOR_LIVE_URL_ + MonitorVideoSourceEnum.MOBILE_SECOND.name()));
+        }
+        objectMap.put(SystemConstant.MONITOR_STATUS_ + source, MonitorStatusSourceEnum.STOP.name());
+        TIeExamInvigilateCallLog tIeExamInvigilateCallLog = new TIeExamInvigilateCallLog(recordId, source, liveUrl, MonitorStatusSourceEnum.STOP);
+        redisUtil.setForHash(RedisKeyHelper.examRecordCacheKey(recordId), objectMap);
+        //监考监控通话信息 发送mq start
+        MqDto mqDto = new MqDto(MqTopicEnum.themisTopic.getCode(), MqTagEnum.monitorLog.name(), tIeExamInvigilateCallLog, MqTagEnum.monitorLog, String.valueOf(tIeExamInvigilateCallLog.getId()), source.name());
+        mqDtoService.assembleSendOneWayMsg(mqDto);
+        //监考监控通话信息 发送mq end
+        return ResultUtil.ok(SystemConstant.SUCCESS);
+    }
+
+    @ApiOperation(value = "监考监控通话查询接口")
+    @RequestMapping(value = "/call/list", method = RequestMethod.POST)
+    @ApiResponses({@ApiResponse(code = 200, message = "监考监控信息", response = TIeExamInvigilateCallLog.class)})
+    public Result callList(@ApiParam(value = "场次id", required = true) @RequestParam Long examActivityId) {
+        QueryWrapper<TIeExamInvigilateCallLog> tIeExamInvigilateCallQueryWrapper = new QueryWrapper<>();
+        tIeExamInvigilateCallQueryWrapper.lambda().eq(TIeExamInvigilateCallLog::getSource, "apply");
+        List<TIeExamInvigilateCallLog> tIeExamInvigilateCallList = tIeExamInvigilateCallLogService.list(tIeExamInvigilateCallQueryWrapper);
+        Map map = new HashMap<>();
+        map.put(SystemConstant.RECORDS, tIeExamInvigilateCallList);
+        return ResultUtil.ok(map);
+    }
+
+    @ApiOperation(value = "监考监控通话查询来源接口")
+    @RequestMapping(value = "/call/query", method = RequestMethod.POST)
+    @ApiResponses({@ApiResponse(code = 200, message = "监考监控信息", response = TIeExamInvigilateCallLog.class)})
+    public Result callQuery(@ApiParam(value = "考试记录id", required = true) @RequestParam(required = true) Long recordId) {
+        QueryWrapper<TIeExamInvigilateCallLog> tIeExamInvigilateCallQueryWrapper = new QueryWrapper<>();
+        tIeExamInvigilateCallQueryWrapper.lambda().eq(TIeExamInvigilateCallLog::getExamRecordId, recordId);
+        List<TIeExamInvigilateCallLog> tIeExamInvigilateCallList = tIeExamInvigilateCallLogService.list(tIeExamInvigilateCallQueryWrapper);
+        Map map = new HashMap<>();
+        map.put(SystemConstant.RECORDS, tIeExamInvigilateCallList);
+        return ResultUtil.ok(map);
+    }
+}

+ 177 - 0
themis-exam/src/main/java/com/qmth/themis/exam/api/TIeInvigilateCallLogOeController.java

@@ -0,0 +1,177 @@
+package com.qmth.themis.exam.api;
+
+import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
+import com.qmth.themis.business.annotation.ApiJsonObject;
+import com.qmth.themis.business.annotation.ApiJsonProperty;
+import com.qmth.themis.business.cache.RedisKeyHelper;
+import com.qmth.themis.business.constant.SystemConstant;
+import com.qmth.themis.business.dto.MqDto;
+import com.qmth.themis.business.entity.TIeExamInvigilateCallLog;
+import com.qmth.themis.business.enums.MonitorStatusSourceEnum;
+import com.qmth.themis.business.enums.MonitorVideoSourceEnum;
+import com.qmth.themis.business.enums.MqTagEnum;
+import com.qmth.themis.business.enums.MqTopicEnum;
+import com.qmth.themis.business.service.MqDtoService;
+import com.qmth.themis.business.service.TIeExamInvigilateCallLogService;
+import com.qmth.themis.business.util.RedisUtil;
+import com.qmth.themis.common.enums.ExceptionResultEnum;
+import com.qmth.themis.common.exception.BusinessException;
+import com.qmth.themis.common.util.Result;
+import com.qmth.themis.common.util.ResultUtil;
+import io.swagger.annotations.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.transaction.annotation.Transactional;
+import org.springframework.web.bind.annotation.*;
+
+import javax.annotation.Resource;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * @Description: oe监考监控通话信息 前端控制器
+ * @Param:
+ * @return:
+ * @Author: wangliang
+ * @Date: 2020/6/25
+ */
+@Api(tags = "oe监考监控通话信息Controller")
+@RestController
+@RequestMapping("/${prefix.url.exam}/monitor")
+public class TIeInvigilateCallLogOeController {
+    private final static Logger log = LoggerFactory.getLogger(TIeInvigilateCallLogOeController.class);
+
+    @Resource
+    TIeExamInvigilateCallLogService tIeExamInvigilateCallLogService;
+
+    @Resource
+    RedisUtil redisUtil;
+
+    @Resource
+    MqDtoService mqDtoService;
+
+    @ApiOperation(value = "监控观看地址更新接口")
+    @RequestMapping(value = "/live_url", method = RequestMethod.POST)
+    @ApiResponses({@ApiResponse(code = 200, message = "{\"success\":true}", response = Result.class)})
+    @Transactional
+    public Result liveUrl(@ApiJsonObject(name = "liveUrlOe", value = {
+            @ApiJsonProperty(key = "recordId", type = "long", example = "1", description = "考试记录id", required = true),
+            @ApiJsonProperty(key = "source", description = "监考视频源", required = true),
+            @ApiJsonProperty(key = "liveUrl", description = "观看地址", required = true)
+    }) @ApiParam(value = "监控信息", required = true) @RequestBody Map<String, Object> mapParameter) {
+        if (Objects.isNull(mapParameter.get("recordId")) || Objects.equals(mapParameter.get("recordId"), "")) {
+            throw new BusinessException(ExceptionResultEnum.RECORD_ID_IS_NULL);
+        }
+        Long recordId = null;
+        recordId = Long.parseLong(String.valueOf(mapParameter.get("recordId")));
+        if (Objects.isNull(mapParameter.get("source")) || Objects.equals(mapParameter.get("source"), "")) {
+            throw new BusinessException("监考视频源不能为空");
+        }
+        MonitorVideoSourceEnum source = MonitorVideoSourceEnum.valueOf(String.valueOf(mapParameter.get("source")).toUpperCase());
+        if (Objects.isNull(mapParameter.get("liveUrl")) || Objects.equals(mapParameter.get("liveUrl"), "")) {
+            throw new BusinessException("观看地址不能为空");
+        }
+        String liveUrl = String.valueOf(mapParameter.get("liveUrl"));
+        TIeExamInvigilateCallLog tIeExamInvigilateCallLog = new TIeExamInvigilateCallLog(recordId, source, liveUrl, MonitorStatusSourceEnum.INIT);
+        //获取考试记录缓存
+        Map<String, Object> objectMap = redisUtil.getHashEntries(RedisKeyHelper.examRecordCacheKey(recordId));
+        objectMap.put(SystemConstant.MONITOR_LIVE_URL_ + source.name(), liveUrl);
+        objectMap.put(SystemConstant.MONITOR_STATUS_ + source.name(), tIeExamInvigilateCallLog.getStatus().name());
+        redisUtil.setForHash(RedisKeyHelper.examRecordCacheKey(recordId), objectMap);
+        //监考监控通话信息 发送mq start
+        MqDto mqDto = new MqDto(MqTopicEnum.themisTopic.getCode(), MqTagEnum.monitorLog.name(), tIeExamInvigilateCallLog, MqTagEnum.monitorLog, String.valueOf(tIeExamInvigilateCallLog.getId()), source.name());
+        mqDtoService.assembleSendOneWayMsg(mqDto);
+        //监考监控通话信息 发送mq end
+        return ResultUtil.ok(SystemConstant.SUCCESS);
+    }
+
+    @ApiOperation(value = "发送通话申请接口")
+    @RequestMapping(value = "/call/apply", method = RequestMethod.POST)
+    @ApiResponses({@ApiResponse(code = 200, message = "{\"success\":true}", response = Result.class)})
+    @Transactional
+    public Result callApply(@ApiJsonObject(name = "callApplyOe", value = {
+            @ApiJsonProperty(key = "recordId", type = "long", example = "1", description = "考试记录id", required = true),
+    }) @ApiParam(value = "监控信息", required = true) @RequestBody Map<String, Object> mapParameter) {
+        if (Objects.isNull(mapParameter.get("recordId")) || Objects.equals(mapParameter.get("recordId"), "")) {
+            throw new BusinessException(ExceptionResultEnum.RECORD_ID_IS_NULL);
+        }
+        Long recordId = Long.parseLong(String.valueOf(mapParameter.get("recordId")));
+        //获取考试记录缓存
+        Map<String, Object> objectMap = redisUtil.getHashEntries(RedisKeyHelper.examRecordCacheKey(recordId));
+        String liveUrl = null;
+        MonitorVideoSourceEnum source = null;
+        if (Objects.nonNull(objectMap.get(SystemConstant.MONITOR_LIVE_URL_ + MonitorVideoSourceEnum.MOBILE_FIRST.name()))) {
+            source = MonitorVideoSourceEnum.MOBILE_FIRST;
+            liveUrl = String.valueOf(objectMap.get(SystemConstant.MONITOR_LIVE_URL_ + MonitorVideoSourceEnum.MOBILE_FIRST.name()));
+        } else if (Objects.nonNull(objectMap.get(SystemConstant.MONITOR_LIVE_URL_ + MonitorVideoSourceEnum.MOBILE_SECOND.name()))) {
+            source = MonitorVideoSourceEnum.MOBILE_SECOND;
+            liveUrl = String.valueOf(objectMap.get(SystemConstant.MONITOR_LIVE_URL_ + MonitorVideoSourceEnum.MOBILE_SECOND.name()));
+        }
+        objectMap.put(SystemConstant.MONITOR_STATUS_ + source, MonitorStatusSourceEnum.START.name());
+        TIeExamInvigilateCallLog tIeExamInvigilateCallLog = new TIeExamInvigilateCallLog(recordId, source, liveUrl, MonitorStatusSourceEnum.START);
+        redisUtil.setForHash(RedisKeyHelper.examRecordCacheKey(recordId), objectMap);
+        //监考监控通话信息 发送mq start
+        MqDto mqDto = new MqDto(MqTopicEnum.themisTopic.getCode(), MqTagEnum.monitorLog.name(), tIeExamInvigilateCallLog, MqTagEnum.monitorLog, String.valueOf(tIeExamInvigilateCallLog.getId()), source.name());
+        mqDtoService.assembleSendOneWayMsg(mqDto);
+        //监考监控通话信息 发送mq end
+        return ResultUtil.ok(SystemConstant.SUCCESS);
+    }
+
+    @ApiOperation(value = "撤销通话申请接口")
+    @RequestMapping(value = "/call/cancel", method = RequestMethod.POST)
+    @ApiResponses({@ApiResponse(code = 200, message = "{\"success\":true}", response = Result.class)})
+    @Transactional
+    public Result callCancel(@ApiJsonObject(name = "callCancelOe", value = {
+            @ApiJsonProperty(key = "recordId", type = "long", example = "1", description = "考试记录id", required = true),
+    }) @ApiParam(value = "监控信息", required = true) @RequestBody Map<String, Object> mapParameter) {
+        if (Objects.isNull(mapParameter.get("recordId")) || Objects.equals(mapParameter.get("recordId"), "")) {
+            throw new BusinessException(ExceptionResultEnum.RECORD_ID_IS_NULL);
+        }
+        Long recordId = Long.parseLong(String.valueOf(mapParameter.get("recordId")));
+        //获取考试记录缓存
+        Map<String, Object> objectMap = redisUtil.getHashEntries(RedisKeyHelper.examRecordCacheKey(recordId));
+        String liveUrl = null;
+        MonitorVideoSourceEnum source = null;
+        if (Objects.nonNull(objectMap.get(SystemConstant.MONITOR_LIVE_URL_ + MonitorVideoSourceEnum.MOBILE_FIRST.name()))) {
+            source = MonitorVideoSourceEnum.MOBILE_FIRST;
+            liveUrl = String.valueOf(objectMap.get(SystemConstant.MONITOR_LIVE_URL_ + MonitorVideoSourceEnum.MOBILE_FIRST.name()));
+        } else if (Objects.nonNull(objectMap.get(SystemConstant.MONITOR_LIVE_URL_ + MonitorVideoSourceEnum.MOBILE_SECOND.name()))) {
+            source = MonitorVideoSourceEnum.MOBILE_SECOND;
+            liveUrl = String.valueOf(objectMap.get(SystemConstant.MONITOR_LIVE_URL_ + MonitorVideoSourceEnum.MOBILE_SECOND.name()));
+        }
+        objectMap.put(SystemConstant.MONITOR_STATUS_ + source, MonitorStatusSourceEnum.STOP.name());
+        TIeExamInvigilateCallLog tIeExamInvigilateCallLog = new TIeExamInvigilateCallLog(recordId, source, liveUrl, MonitorStatusSourceEnum.STOP);
+        redisUtil.setForHash(RedisKeyHelper.examRecordCacheKey(recordId), objectMap);
+        //监考监控通话信息 发送mq start
+        MqDto mqDto = new MqDto(MqTopicEnum.themisTopic.getCode(), MqTagEnum.monitorLog.name(), tIeExamInvigilateCallLog, MqTagEnum.monitorLog, String.valueOf(tIeExamInvigilateCallLog.getId()), source.name());
+        mqDtoService.assembleSendOneWayMsg(mqDto);
+        //监考监控通话信息 发送mq end
+        return ResultUtil.ok(SystemConstant.SUCCESS);
+    }
+
+    @ApiOperation(value = "监考监控通话查询接口")
+    @RequestMapping(value = "/call/list", method = RequestMethod.POST)
+    @ApiResponses({@ApiResponse(code = 200, message = "监考监控信息", response = TIeExamInvigilateCallLog.class)})
+    public Result callList(@ApiParam(value = "场次id", required = true) @RequestParam Long examActivityId) {
+        QueryWrapper<TIeExamInvigilateCallLog> tIeExamInvigilateCallQueryWrapper = new QueryWrapper<>();
+        tIeExamInvigilateCallQueryWrapper.lambda().eq(TIeExamInvigilateCallLog::getSource, "apply");
+        List<TIeExamInvigilateCallLog> tIeExamInvigilateCallList = tIeExamInvigilateCallLogService.list(tIeExamInvigilateCallQueryWrapper);
+        Map map = new HashMap<>();
+        map.put(SystemConstant.RECORDS, tIeExamInvigilateCallList);
+        return ResultUtil.ok(map);
+    }
+
+    @ApiOperation(value = "监考监控通话查询来源接口")
+    @RequestMapping(value = "/call/query", method = RequestMethod.POST)
+    @ApiResponses({@ApiResponse(code = 200, message = "监考监控信息", response = TIeExamInvigilateCallLog.class)})
+    public Result callQuery(@ApiParam(value = "考试记录id", required = true) @RequestParam(required = true) Long recordId) {
+        QueryWrapper<TIeExamInvigilateCallLog> tIeExamInvigilateCallQueryWrapper = new QueryWrapper<>();
+        tIeExamInvigilateCallQueryWrapper.lambda().eq(TIeExamInvigilateCallLog::getExamRecordId, recordId);
+        List<TIeExamInvigilateCallLog> tIeExamInvigilateCallList = tIeExamInvigilateCallLogService.list(tIeExamInvigilateCallQueryWrapper);
+        Map map = new HashMap<>();
+        map.put(SystemConstant.RECORDS, tIeExamInvigilateCallList);
+        return ResultUtil.ok(map);
+    }
+}

+ 4 - 0
themis-exam/src/main/java/com/qmth/themis/exam/start/StartRunning.java

@@ -49,6 +49,10 @@ public class StartRunning implements CommandLineRunner {
          * task
          */
         rocketMessageConsumer.setRocketMQConsumer(nameServer, MqGroupEnum.taskConsumerGroup.getCode(), MqTopicEnum.themisTopic.getCode(), MqTagEnum.examStudentImport.name() + "||" + MqTagEnum.roomCodeImport.name() + "||" + MqTagEnum.roomCodeExport.name() + "||" + MqTagEnum.examPaperImport.name(), MessageModel.CLUSTERING, SpringContextHolder.getBean(TaskConcurrentlyImpl.class));
+        /**
+         * log
+         */
+        rocketMessageConsumer.setRocketMQConsumer(nameServer, MqGroupEnum.logConsumerGroup.getCode(), MqTopicEnum.themisTopic.getCode(), MqTagEnum.exceptionLog.name() + "||" + MqTagEnum.warningLog.name() + "||" + MqTagEnum.monitorLog.name(), MessageModel.CLUSTERING, SpringContextHolder.getBean(LogConcurrentlyImpl.class));
         /**
          * websocket mq start
          */

+ 309 - 0
themis-exam/src/main/java/com/qmth/themis/exam/websocket/WebSocketMobileServer.java

@@ -0,0 +1,309 @@
+package com.qmth.themis.exam.websocket;
+
+import com.alibaba.fastjson.JSONObject;
+import com.google.gson.Gson;
+import com.qmth.themis.business.constant.SpringContextHolder;
+import com.qmth.themis.business.constant.SystemConstant;
+import com.qmth.themis.business.dto.MqDto;
+import com.qmth.themis.business.dto.WebsocketDto;
+import com.qmth.themis.business.entity.TBSession;
+import com.qmth.themis.business.enums.MqTagEnum;
+import com.qmth.themis.business.enums.MqTopicEnum;
+import com.qmth.themis.business.enums.SystemOperationEnum;
+import com.qmth.themis.business.enums.WebsocketTypeEnum;
+import com.qmth.themis.business.service.MqDtoService;
+import com.qmth.themis.business.util.JacksonUtil;
+import com.qmth.themis.business.util.RedisUtil;
+import com.qmth.themis.business.util.WebsocketUtil;
+import com.qmth.themis.common.contanst.Constants;
+import com.qmth.themis.common.enums.ExceptionResultEnum;
+import com.qmth.themis.common.exception.BusinessException;
+import com.qmth.themis.common.signature.SignatureInfo;
+import com.qmth.themis.common.signature.SignatureType;
+import com.qmth.themis.exam.listener.service.MqOeLogicService;
+import com.qmth.themis.exam.websocketTemplete.WebSocketOeMessageTemplete;
+import com.qmth.themis.mq.templete.Concurrently;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+
+import javax.websocket.*;
+import javax.websocket.server.ServerEndpoint;
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.net.InetSocketAddress;
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * @Description: websocket移动端
+ * @Param:
+ * @return:
+ * @Author: wangliang
+ * @Date: 2020/7/10
+ */
+@ServerEndpoint("/mobile")
+@Component
+public class WebSocketMobileServer
+//        implements Concurrently
+{
+    private final static Logger log = LoggerFactory.getLogger(WebSocketMobileServer.class);
+    private volatile static ConcurrentHashMap<Long, WebSocketMobileServer> webSocketMap = new ConcurrentHashMap<>();
+    /**
+     * 与某个客户端的连接会话,需要通过它来给客户端发送数据
+     */
+    private Session session = null;
+    private String sessionId = null, ip = null;
+    private Long recordId = null;
+    private String platform = null, deviceId = null, Authorization = null;
+    private Long time = null;
+    private RedisUtil redisUtil;
+    private Long updateTime = null;
+    private Map<String, Object> tranMap = null;
+
+    /**
+     * 连接建立成功调用的方法
+     */
+    @OnOpen
+    public void onOpen(Session session) {
+        Map<String, List<String>> mapParameter = session.getRequestParameterMap();
+        if (Objects.isNull(mapParameter)) {
+            throw new BusinessException(ExceptionResultEnum.PARAMS_ILLEGALITY);
+        }
+        log.info("mapParameter:{}", JacksonUtil.parseJson(mapParameter));
+        log.info("uri:{}", session.getRequestURI());
+        if (Objects.isNull(mapParameter.get("platform")) || mapParameter.get("platform").size() == 0) {
+            throw new BusinessException(ExceptionResultEnum.PLATFORM_INVALID);
+        }
+        this.platform = String.valueOf(mapParameter.get("platform").get(0));
+        if (Objects.isNull(mapParameter.get("deviceId")) || mapParameter.get("deviceId").size() == 0) {
+            throw new BusinessException(ExceptionResultEnum.DEVICE_ID_INVALID);
+        }
+        this.deviceId = String.valueOf(mapParameter.get("deviceId").get(0));
+        this.Authorization = String.valueOf(mapParameter.get("Authorization").get(0));
+        this.time = Long.parseLong(String.valueOf(mapParameter.get("time").get(0)));
+        this.recordId = Long.parseLong(String.valueOf(mapParameter.get("recordId").get(0)));
+        String method = SystemConstant.GET;
+        final SignatureInfo info = SignatureInfo
+                .parse(Authorization);
+        if (Objects.nonNull(info) && info.getType() == SignatureType.TOKEN) {
+            String sessionId = info.getInvoker();
+            redisUtil = SpringContextHolder.getBean(RedisUtil.class);
+            TBSession tbSession = (TBSession) redisUtil.getUserSession(sessionId);
+            if (Objects.isNull(tbSession)) {
+                throw new BusinessException(ExceptionResultEnum.LOGIN_NO);
+            } else {
+                if (info.validate(tbSession.getAccessToken()) && info.getTimestamp() < tbSession.getExpireTime().getTime()
+                        && platform.equalsIgnoreCase(tbSession.getPlatform()) && Objects.equals(deviceId, tbSession.getDeviceId())) {
+                    this.session = session;
+                    session.setMaxIdleTimeout(SystemConstant.WEBSOCKET_MAX_TIME_OUT);
+                    this.sessionId = tbSession.getId();
+                    if (webSocketMap.containsKey(this.recordId)) {
+                        webSocketMap.remove(this.recordId);
+                        webSocketMap.put(this.recordId, this);
+                        //加入set中
+                    } else {
+                        webSocketMap.put(this.recordId, this);
+                        //加入set中
+                        addOnlineCount();
+                        //在线数加1
+                    }
+                    log.info("用户连接:" + this.sessionId + ",当前在线人数为:" + getOnlineCount());
+                    InetSocketAddress addr = (InetSocketAddress) WebsocketUtil.getFieldInstance(this.session.getAsyncRemote(), "base#socketWrapper#socket#sc#remoteAddress");
+                    this.ip = addr.toString().replace("/", "").split(":")[0];
+                    this.sendMessage("ip[" + this.ip + "]连接成功");
+                    tranMap = new HashMap<>();
+                    tranMap.put("recordId", this.recordId);
+                    tranMap.put("deviceId", this.deviceId);
+                    tranMap.put("ip", this.ip);
+                    tranMap.put("updateTime", this.updateTime);
+                } else {
+                    throw new BusinessException(ExceptionResultEnum.AUTHORIZATION_ERROR);
+                }
+            }
+        } else {
+            throw new BusinessException(ExceptionResultEnum.AUTHORIZATION_ERROR);
+        }
+    }
+
+    /**
+     * 连接关闭调用的方法
+     */
+    @OnClose
+    public void onClose() {
+        log.info("onClose is come in");
+        if (webSocketMap.containsKey(this.recordId)) {
+            webSocketMap.remove(this.recordId);
+            //从set中删除
+            subOnlineCount();
+            //判断是否是正常退出
+            Date now = new Date();
+            //大于等于超时时间,说明规定时间内都没有通信,非正常退出,因为期间会有心跳更新updateTime
+            if ((now.getTime() - this.updateTime) / 1000 >= SystemConstant.WEBSOCKET_MAX_TIME_OUT / 1000) {
+                log.info("超时退出");
+                //发送延时mq消息start
+                MqDtoService mqDtoService = SpringContextHolder.getBean(MqDtoService.class);
+                String level = "2m";
+//                String level = "30s";
+                Integer time = SystemConstant.mqDelayLevel.get(level);
+                LocalDateTime dt = LocalDateTime.now();
+                dt = dt.plusMinutes(Long.parseLong(level.replace("m", "")));
+//                dt = dt.plusSeconds(Long.parseLong(level.replace("s", "")));
+                tranMap.put("timeOut", time);
+                tranMap.put("mqExecTime", dt.toInstant(ZoneOffset.of("+8")).toEpochMilli());
+                MqDto mqDto = new MqDto(MqTopicEnum.themisTopic.getCode(), MqTagEnum.oeUnNormal.name(), SystemOperationEnum.OE_NET_UN_NORMAL, MqTagEnum.oeUnNormal, String.valueOf(this.recordId), this.tranMap, this.sessionId);
+                mqDtoService.assembleSendAsyncDelayMsg(mqDto);
+                //发送延时mq消息end
+            }
+        }
+        log.info("用户退出:{},当前在线人数为:{},updateTime:{}", this.sessionId, getOnlineCount(), this.updateTime);
+    }
+
+    /**
+     * 收到客户端消息后调用的方法
+     *
+     * @param message
+     * @param session
+     */
+    @OnMessage
+    public void onMessage(String message, Session session) {
+        //可以群发消息
+        //消息保存到数据库、redis
+        if (Objects.nonNull(message)) {
+            try {
+                //解析发送的报文
+                JSONObject jsonObject = JSONObject.parseObject(message);
+                log.info("onMessage:{}", jsonObject.toJSONString());
+                if (Objects.nonNull(jsonObject)) {
+                    WebSocketOeMessageTemplete webSocketOeMessageTemplete = SpringContextHolder.getBean(WebSocketOeMessageTemplete.class);
+                    Gson gson = new Gson();
+                    WebsocketDto websocketDto = gson.fromJson(gson.toJson(jsonObject), WebsocketDto.class);
+                    //todo 加入当前时间和time比较的校验
+                    jsonObject.getJSONObject("body").put("recordId", this.recordId);
+                    websocketDto.setBody(jsonObject.getJSONObject("body"));
+                    Method method = webSocketOeMessageTemplete.getClass().getDeclaredMethod(WebsocketTypeEnum.valueOf(websocketDto.getType()).getDesc(), String.class);
+                    WebsocketDto result = (WebsocketDto) method.invoke(webSocketOeMessageTemplete, String.valueOf(websocketDto.getBody()));
+                    this.sendMessage(JSONObject.toJSONString(result));
+                }
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        }
+    }
+
+    /**
+     * 错误
+     *
+     * @param session
+     * @param error
+     */
+    @OnError
+    public void onError(Session session, Throwable error) {
+        log.error("用户错误:{},原因:{}", this.sessionId, error.getMessage());
+        error.printStackTrace();
+        throw new BusinessException(error.getMessage());
+    }
+
+    /**
+     * 实现服务器主动推送
+     *
+     * @param message
+     * @throws IOException
+     */
+    public void sendMessage(Object message) {
+        log.info("message:{}", JacksonUtil.parseJson(message));
+        this.session.getAsyncRemote().sendText(JacksonUtil.parseJson(message));
+        this.updateTime = System.currentTimeMillis();
+    }
+
+    /**
+     * 获取在线人数
+     *
+     * @return
+     */
+    public synchronized int getOnlineCount() {
+        Object o = redisUtil.get(SystemConstant.WEBSOCKET_OE_ONLINE_COUNT);
+        return Objects.isNull(o) ? 0 : (int) o;
+    }
+
+    /**
+     * 在线人数加一
+     */
+    public synchronized void addOnlineCount() {
+        if (redisUtil.lock(SystemConstant.REDIS_LOCK_WEBSOCKET_PREFIX + this.sessionId, SystemConstant.REDIS_LOCK_WEBSOCKET_TIME_OUT)) {
+            Object o = redisUtil.get(SystemConstant.WEBSOCKET_OE_ONLINE_COUNT);
+            int count = 0;
+            if (Objects.nonNull(o)) {
+                count = (int) o;
+            }
+            count++;
+            redisUtil.set(SystemConstant.WEBSOCKET_OE_ONLINE_COUNT, count);
+        }
+    }
+
+    /**
+     * 在线人数减一
+     */
+    public synchronized void subOnlineCount() {
+        if (redisUtil.lock(SystemConstant.REDIS_LOCK_WEBSOCKET_PREFIX + this.sessionId, SystemConstant.REDIS_LOCK_WEBSOCKET_TIME_OUT)) {
+            Object o = redisUtil.get(SystemConstant.WEBSOCKET_OE_ONLINE_COUNT);
+            int count = 0;
+            if (Objects.nonNull(o)) {
+                count = (int) o;
+            }
+            count--;
+            redisUtil.set(SystemConstant.WEBSOCKET_OE_ONLINE_COUNT, count < 0 ? 0 : count);
+        }
+    }
+
+//    @Override
+//    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
+//        RedisUtil redisUtil = SpringContextHolder.getBean(RedisUtil.class);
+//        MqOeLogicService mqOeLogicService = SpringContextHolder.getBean(MqOeLogicService.class);
+//        MqDto mqDto = null;
+//        try {
+//            long threadId = Thread.currentThread().getId();
+//            String threadName = Thread.currentThread().getName();
+//            for (MessageExt messageExt : msgs) {
+//                log.info(":{}-:{} websocket oe Consumer重试次数:{}", threadId, threadName, messageExt.getReconsumeTimes());
+//                mqDto = JacksonUtil.readJson(new String(messageExt.getBody(), Constants.CHARSET), MqDto.class);
+//                log.info(":{}-:{} websocket oe Consumer接收到的消息:{}", threadId, threadName, JacksonUtil.parseJson(mqDto));
+//                int reconsumeTime = messageExt.getReconsumeTimes();
+//                if (reconsumeTime >= SystemConstant.MAXRECONSUMETIMES) {
+//                    mqOeLogicService.execMqMaxReconsumeTime(mqDto, SystemConstant.MQ_TOPIC_BUFFER_LIST);
+//                } else {
+//                    if (Objects.nonNull(mqDto.getAck()) && mqDto.getAck().intValue() != SystemConstant.STANDARD_ACK_TYPE && Objects.nonNull(redisUtil.get(SystemConstant.MQ_TOPIC_BUFFER_LIST, mqDto.getId())) && redisUtil.lock(SystemConstant.REDIS_LOCK_MQ_PREFIX + mqDto.getId(), SystemConstant.REDIS_LOCK_MQ_TIME_OUT)) {
+//                        mqOeLogicService.execMqOeLogic(mqDto, SystemConstant.MQ_TOPIC_BUFFER_LIST);
+//                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+//                    } else {
+//                        log.info(":{}-:{} 消息ack未确认,重发", threadId, threadName);
+//                        return ConsumeConcurrentlyStatus.RECONSUME_LATER;//重试
+//                    }
+//                }
+//            }
+//        } catch (Exception e) {
+//            e.printStackTrace();
+//            return ConsumeConcurrentlyStatus.RECONSUME_LATER;//重试
+//
+//        } finally {
+//            if (Objects.nonNull(mqDto)) {
+//                redisUtil.releaseLock(SystemConstant.REDIS_LOCK_MQ_PREFIX + mqDto.getId());
+//            }
+//        }
+//        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//成功
+//    }
+
+    public static ConcurrentHashMap<Long, WebSocketMobileServer> getWebSocketMap() {
+        return webSocketMap;
+    }
+
+    public Long getRecordId() {
+        return recordId;
+    }
+}
+

+ 1 - 1
themis-exam/src/main/java/com/qmth/themis/exam/websocket/WebSocketOeServer.java

@@ -41,7 +41,7 @@ import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 
 /**
- * @Description: websocker考生端
+ * @Description: websocket考生端
  * @Param:
  * @return:
  * @Author: wangliang

+ 22 - 10
themis-mq/src/main/java/com/qmth/themis/mq/service/MqLogicService.java

@@ -91,15 +91,27 @@ public interface MqLogicService {
      */
     public void execMqExamBreakHistoryLogic(MqDto mqDto, String key);
 
-	/**考试记录数据更新
-	 * @param mqDto
-	 * @param key
-	 */
-	void execMqRecordUpdateLogic(MqDto mqDto, String key);
+    /**
+     * 考试记录数据更新
+     *
+     * @param mqDto
+     * @param key
+     */
+    void execMqRecordUpdateLogic(MqDto mqDto, String key);
+
+    /**
+     * 考试记录初始化
+     *
+     * @param mqDto
+     * @param key
+     */
+    void execMqRecordInitLogic(MqDto mqDto, String key);
 
-	/**考试记录初始化
-	 * @param mqDto
-	 * @param key
-	 */
-	void execMqRecordInitLogic(MqDto mqDto, String key);
+    /**
+     * 监控通话申请逻辑
+     *
+     * @param mqDto
+     * @param key
+     */
+    public void execMqLogLogic(MqDto mqDto, String key);
 }

+ 39 - 34
themis-mq/src/main/java/com/qmth/themis/mq/service/impl/MqLogicServiceImpl.java

@@ -1,40 +1,21 @@
 package com.qmth.themis.mq.service.impl;
 
-import java.io.IOException;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-
-import javax.annotation.Resource;
-
-import com.qmth.themis.business.enums.MqTagEnum;
-import org.springframework.stereotype.Service;
-import org.springframework.transaction.annotation.Transactional;
-
 import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
 import com.google.gson.Gson;
 import com.qmth.themis.business.cache.ExamRecordCacheUtil;
 import com.qmth.themis.business.cache.RedisKeyHelper;
 import com.qmth.themis.business.constant.SpringContextHolder;
 import com.qmth.themis.business.constant.SystemConstant;
+import com.qmth.themis.business.dto.MqDto;
 import com.qmth.themis.business.entity.TBSession;
+import com.qmth.themis.business.entity.TIeExamInvigilateCallLog;
 import com.qmth.themis.business.entity.TMRocketMessage;
 import com.qmth.themis.business.entity.TOeExamBreakHistory;
 import com.qmth.themis.business.enums.BreakReasonEnum;
 import com.qmth.themis.business.enums.ExamRecordStatusEnum;
+import com.qmth.themis.business.enums.MqTagEnum;
 import com.qmth.themis.business.enums.SystemOperationEnum;
-import com.qmth.themis.business.service.CommonService;
-import com.qmth.themis.business.service.TBSessionService;
-import com.qmth.themis.business.service.TEExamService;
-import com.qmth.themis.business.service.TEExamStudentLogService;
-import com.qmth.themis.business.service.TEUserLogService;
-import com.qmth.themis.business.service.TMRocketMessageService;
-import com.qmth.themis.business.service.TOeExamBreakHistoryService;
-import com.qmth.themis.business.service.TOeExamRecordService;
-import com.qmth.themis.business.service.TOeFaceVerifyHistoryService;
-import com.qmth.themis.business.service.TOeLivenessVerifyHistoryService;
+import com.qmth.themis.business.service.*;
 import com.qmth.themis.business.templete.TaskExportTemplete;
 import com.qmth.themis.business.templete.TaskImportTemplete;
 import com.qmth.themis.business.templete.impl.TaskExamPaperImportTemplete;
@@ -45,8 +26,13 @@ import com.qmth.themis.business.threadPool.MyThreadPool;
 import com.qmth.themis.business.util.JacksonUtil;
 import com.qmth.themis.business.util.RedisUtil;
 import com.qmth.themis.common.util.SimpleBeanUtil;
-import com.qmth.themis.business.dto.MqDto;
 import com.qmth.themis.mq.service.MqLogicService;
+import org.springframework.stereotype.Service;
+import org.springframework.transaction.annotation.Transactional;
+
+import javax.annotation.Resource;
+import java.io.IOException;
+import java.util.*;
 
 /**
  * @Description: mq执行逻辑 impl
@@ -76,9 +62,6 @@ public class MqLogicServiceImpl implements MqLogicService {
     @Resource
     MyThreadPool myThreadPool;
 
-    @Resource
-    TOeExamRecordService tOeExamRecordService;
-
     @Resource
     TOeExamBreakHistoryService tOeExamBreakHistoryService;
 
@@ -92,12 +75,13 @@ public class MqLogicServiceImpl implements MqLogicService {
     TOeLivenessVerifyHistoryService livenessVerifyHistoryService;
 
     @Resource
-    TEExamService teExamService;
+    CommonService commonService;
 
     @Resource
-    CommonService commonService;
+    TIeExamInvigilateCallLogService tIeExamInvigilateCallLogService;
 
-    /*** mq最大重试次数逻辑
+    /**
+     * mq最大重试次数逻辑
      *
      * @param mqDto
      * @param key
@@ -367,8 +351,8 @@ public class MqLogicServiceImpl implements MqLogicService {
         tmRocketMessageService.saveOrUpdate(tmRocketMessage);
         redisUtil.delete(key, mqDto.getId());
     }
-    
-    
+
+
     @Override
     @Transactional
     public void execMqRecordUpdateLogic(MqDto mqDto, String key) {
@@ -378,14 +362,14 @@ public class MqLogicServiceImpl implements MqLogicService {
         String colName = (String) param.get("colName");
         Object colValue = (Object) param.get("colValue");
         Integer isDate = (Integer) param.get("isDate");
-        examRecordService.dataUpdate(recordId, colName, colValue,isDate);
+        examRecordService.dataUpdate(recordId, colName, colValue, isDate);
         mqDto.setAck(SystemConstant.STANDARD_ACK_TYPE);
         TMRocketMessage tmRocketMessage = gson.fromJson(gson.toJson(mqDto), TMRocketMessage.class);
         tmRocketMessage.setBody(JacksonUtil.parseJson(tmRocketMessage.getBody()));
         tmRocketMessageService.saveOrUpdate(tmRocketMessage);
         redisUtil.delete(key, mqDto.getId());
     }
-    
+
     @Override
     @Transactional
     public void execMqRecordInitLogic(MqDto mqDto, String key) {
@@ -398,4 +382,25 @@ public class MqLogicServiceImpl implements MqLogicService {
         tmRocketMessageService.saveOrUpdate(tmRocketMessage);
         redisUtil.delete(key, mqDto.getId());
     }
+
+    /**
+     * 监控通话申请逻辑
+     *
+     * @param mqDto
+     * @param key
+     */
+    @Override
+    @Transactional
+    public void execMqLogLogic(MqDto mqDto, String key) {
+        Gson gson = new Gson();
+        String tag = mqDto.getTag();
+        if (tag.contains(MqTagEnum.monitorLog.name())) {
+            tIeExamInvigilateCallLogService.saveOrUpdate(JacksonUtil.readJson(JacksonUtil.parseJson(mqDto.getBody()), TIeExamInvigilateCallLog.class));
+        }
+        mqDto.setAck(SystemConstant.STANDARD_ACK_TYPE);
+        TMRocketMessage tmRocketMessage = gson.fromJson(gson.toJson(mqDto), TMRocketMessage.class);
+        tmRocketMessage.setBody(JacksonUtil.parseJson(tmRocketMessage.getBody()));
+        tmRocketMessageService.saveOrUpdate(tmRocketMessage);
+        redisUtil.delete(key, mqDto.getId());
+    }
 }

+ 68 - 0
themis-mq/src/main/java/com/qmth/themis/mq/templete/impl/LogConcurrentlyImpl.java

@@ -0,0 +1,68 @@
+package com.qmth.themis.mq.templete.impl;
+
+import com.qmth.themis.business.constant.SpringContextHolder;
+import com.qmth.themis.business.constant.SystemConstant;
+import com.qmth.themis.business.dto.MqDto;
+import com.qmth.themis.business.util.JacksonUtil;
+import com.qmth.themis.business.util.RedisUtil;
+import com.qmth.themis.common.contanst.Constants;
+import com.qmth.themis.mq.service.MqLogicService;
+import com.qmth.themis.mq.templete.Concurrently;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Service;
+
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * @Description: mq 日志并行消费监听
+ * @Param:
+ * @return:
+ * @Author: wangliang
+ * @Date: 2020/8/14
+ */
+@Service
+public class LogConcurrentlyImpl implements Concurrently {
+    private final static Logger log = LoggerFactory.getLogger(LogConcurrentlyImpl.class);
+
+    @Override
+    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
+        RedisUtil redisUtil = SpringContextHolder.getBean(RedisUtil.class);
+        MqLogicService mqLogicService = SpringContextHolder.getBean(MqLogicService.class);
+        MqDto mqDto = null;
+        try {
+            long threadId = Thread.currentThread().getId();
+            String threadName = Thread.currentThread().getName();
+            for (MessageExt messageExt : msgs) {
+                log.info(":{}-:{} Log Consumer 重试次数:{}", threadId, threadName, messageExt.getReconsumeTimes());
+                mqDto = JacksonUtil.readJson(new String(messageExt.getBody(), Constants.CHARSET), MqDto.class);
+                log.info(":{}-:{} Log Consumer 接收到的消息:{}", threadId, threadName, JacksonUtil.parseJson(mqDto));
+                int reconsumeTime = messageExt.getReconsumeTimes();
+                if (reconsumeTime >= SystemConstant.MAXRECONSUMETIMES) {
+                    mqLogicService.execMqMaxReconsumeTime(mqDto, SystemConstant.MQ_TOPIC_BUFFER_LIST);
+                } else {
+                    if (Objects.nonNull(mqDto.getAck()) && mqDto.getAck().intValue() != SystemConstant.STANDARD_ACK_TYPE && Objects.nonNull(redisUtil.get(SystemConstant.MQ_TOPIC_BUFFER_LIST, mqDto.getId())) && redisUtil.lock(SystemConstant.REDIS_LOCK_MQ_PREFIX + mqDto.getId(), SystemConstant.REDIS_LOCK_MQ_TIME_OUT)) {
+                        log.info(":{}-:{} 更新db", threadId, threadName);
+                        mqLogicService.execMqLogLogic(mqDto, SystemConstant.MQ_TOPIC_BUFFER_LIST);
+                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+                    } else {
+                        log.info(":{}-:{} 消息ack未确认,重发", threadId, threadName);
+                        return ConsumeConcurrentlyStatus.RECONSUME_LATER;//重试
+                    }
+                }
+            }
+        } catch (Exception e) {
+            e.printStackTrace();
+            return ConsumeConcurrentlyStatus.RECONSUME_LATER;//重试
+        } finally {
+            if (Objects.nonNull(mqDto)) {
+                redisUtil.releaseLock(SystemConstant.REDIS_LOCK_MQ_PREFIX + mqDto.getId());
+            }
+        }
+        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//成功
+    }
+}