|
@@ -23,6 +23,7 @@ import com.qmth.themis.business.util.MqUtil;
|
|
import com.qmth.themis.business.util.RedisUtil;
|
|
import com.qmth.themis.business.util.RedisUtil;
|
|
import com.qmth.themis.business.util.UidUtil;
|
|
import com.qmth.themis.business.util.UidUtil;
|
|
import com.qmth.themis.common.contanst.Constants;
|
|
import com.qmth.themis.common.contanst.Constants;
|
|
|
|
+import com.qmth.themis.common.exception.BusinessException;
|
|
import com.qmth.themis.common.util.SimpleBeanUtil;
|
|
import com.qmth.themis.common.util.SimpleBeanUtil;
|
|
import com.qmth.themis.mq.service.MqLogicService;
|
|
import com.qmth.themis.mq.service.MqLogicService;
|
|
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
|
|
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
|
|
@@ -173,7 +174,6 @@ public class MqLogicServiceImpl implements MqLogicService {
|
|
}
|
|
}
|
|
mqDto.setAck(SystemConstant.STANDARD_ACK_TYPE);
|
|
mqDto.setAck(SystemConstant.STANDARD_ACK_TYPE);
|
|
TMRocketMessage tmRocketMessage = gson.fromJson(gson.toJson(mqDto), TMRocketMessage.class);
|
|
TMRocketMessage tmRocketMessage = gson.fromJson(gson.toJson(mqDto), TMRocketMessage.class);
|
|
- tmRocketMessage.setBody(JacksonUtil.parseJson(tmRocketMessage.getBody()));
|
|
|
|
tmRocketMessageService.saveOrUpdate(tmRocketMessage);
|
|
tmRocketMessageService.saveOrUpdate(tmRocketMessage);
|
|
redisUtil.delete(key, mqDto.getId());
|
|
redisUtil.delete(key, mqDto.getId());
|
|
}
|
|
}
|
|
@@ -188,11 +188,14 @@ public class MqLogicServiceImpl implements MqLogicService {
|
|
@Transactional
|
|
@Transactional
|
|
public void execMqSessionLogic(MqDto mqDto, String key) {
|
|
public void execMqSessionLogic(MqDto mqDto, String key) {
|
|
Gson gson = new Gson();
|
|
Gson gson = new Gson();
|
|
- tbSessionService.saveSessionInfo(JacksonUtil.readJson(JacksonUtil.parseJson(mqDto.getBody()), TBSession.class),
|
|
|
|
- mqDto.getTimestamp());
|
|
|
|
|
|
+ String sessionId = mqDto.getObjId();
|
|
|
|
+ TBSession tbSession = (TBSession) redisUtil.getUserSession(sessionId);
|
|
|
|
+ if (Objects.isNull(tbSession)) {
|
|
|
|
+ throw new BusinessException("缓存session为空");
|
|
|
|
+ }
|
|
|
|
+ tbSessionService.saveOrUpdate(tbSession);
|
|
mqDto.setAck(SystemConstant.STANDARD_ACK_TYPE);
|
|
mqDto.setAck(SystemConstant.STANDARD_ACK_TYPE);
|
|
TMRocketMessage tmRocketMessage = gson.fromJson(gson.toJson(mqDto), TMRocketMessage.class);
|
|
TMRocketMessage tmRocketMessage = gson.fromJson(gson.toJson(mqDto), TMRocketMessage.class);
|
|
- tmRocketMessage.setBody(JacksonUtil.parseJson(tmRocketMessage.getBody()));
|
|
|
|
tmRocketMessageService.saveOrUpdate(tmRocketMessage);
|
|
tmRocketMessageService.saveOrUpdate(tmRocketMessage);
|
|
redisUtil.delete(key, mqDto.getId());
|
|
redisUtil.delete(key, mqDto.getId());
|
|
}
|
|
}
|