Răsfoiți Sursa

加入腾讯云事件监控

wangliang 3 ani în urmă
părinte
comite
7b09a13545

+ 19 - 9
themis-mq/src/main/java/com/qmth/themis/mq/service/impl/MqLogicServiceImpl.java

@@ -331,15 +331,25 @@ public class MqLogicServiceImpl implements MqLogicService {
                 for (int i = 0; i < mediaInfos.length; i++) {
                     MediaBasicInfo mediaBasicInfo = mediaInfos[i].getBasicInfo();
                     String videoSource = SystemConstant.getMonitorRecordStreamId(mediaBasicInfo.getName());
-                    boolean lock = redisUtil.lock(SystemConstant.REDIS_LOCK_TENCENT_VIDEO_PREFIX + videoSource, SystemConstant.REDIS_LOCK_TENCENT_VIDEO_TIME_OUT);
-                    if (lock && Objects.isNull(map.get(videoSource))) {
-                        try {
-                            JSONObject jsonObject = new JSONObject();
-                            jsonObject.put(SystemConstant.VIDEO_SOURCE, videoSource);
-                            jsonObject.put(SystemConstant.VIDEO_URL, mediaBasicInfo.getMediaUrl());
-                            jsonArray.add(jsonObject);
-                        } finally {
-                            redisUtil.releaseLock(SystemConstant.REDIS_LOCK_TENCENT_VIDEO_PREFIX + videoSource);
+                    boolean lock = false;
+                    for (int y = 0; y < SystemConstant.CONSUME_MESSAGE_BATCH_MAX_SIZE; y++) {
+                        lock = redisUtil.lock(SystemConstant.REDIS_LOCK_TENCENT_VIDEO_PREFIX + videoSource, SystemConstant.REDIS_LOCK_TENCENT_VIDEO_TIME_OUT);
+                        if (lock && Objects.isNull(map.get(videoSource))) {
+                            try {
+                                JSONObject jsonObject = new JSONObject();
+                                jsonObject.put(SystemConstant.VIDEO_SOURCE, videoSource);
+                                jsonObject.put(SystemConstant.VIDEO_URL, mediaBasicInfo.getMediaUrl());
+                                jsonArray.add(jsonObject);
+                                break;
+                            } finally {
+                                redisUtil.releaseLock(SystemConstant.REDIS_LOCK_TENCENT_VIDEO_PREFIX + videoSource);
+                            }
+                        } else {
+                            try {
+                                Thread.sleep(500);
+                            } catch (InterruptedException e) {
+                                e.printStackTrace();
+                            }
                         }
                     }
                 }