Эх сурвалжийг харах

扩展core-concurrent,增加单一许可的信号量获取接口,并兼容JUC和redisson两套实现方案

Signed-off-by: luoshi <luoshi@qmth.com.cn>
luoshi 3 жил өмнө
parent
commit
de779cc0fa

+ 13 - 0
core-concurrent/src/main/java/com/qmth/boot/core/concurrent/model/Semaphore.java

@@ -0,0 +1,13 @@
+package com.qmth.boot.core.concurrent.model;
+
+/**
+ * 单一许可的抽象信号量,兼容JUC和Redisson等多种实现
+ */
+public interface Semaphore {
+
+    void acquire() throws InterruptedException;
+
+    boolean tryAcquire();
+
+    void release();
+}

+ 10 - 0
core-concurrent/src/main/java/com/qmth/boot/core/concurrent/service/ConcurrentService.java

@@ -1,5 +1,7 @@
 package com.qmth.boot.core.concurrent.service;
 
+import com.qmth.boot.core.concurrent.model.Semaphore;
+
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
 
@@ -48,4 +50,12 @@ public interface ConcurrentService {
      */
     boolean isWriteLocked(String name);
 
+    /**
+     * 获取抽象信号量
+     *
+     * @param name
+     * @return
+     */
+    Semaphore getSemaphore(String name);
+
 }

+ 36 - 0
core-concurrent/src/main/java/com/qmth/boot/core/concurrent/service/impl/MemoryConcurrentService.java

@@ -1,8 +1,10 @@
 package com.qmth.boot.core.concurrent.service.impl;
 
+import com.qmth.boot.core.concurrent.model.Semaphore;
 import com.qmth.boot.core.concurrent.service.ConcurrentService;
 
 import javax.annotation.PreDestroy;
+import javax.validation.constraints.NotNull;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.locks.Lock;
@@ -16,15 +18,19 @@ public class MemoryConcurrentService implements ConcurrentService {
 
     private final Map<String, ReentrantReadWriteLock> readWriteMap;
 
+    private final Map<String, Semaphore> semaphoreMap;
+
     public MemoryConcurrentService() {
         this.lockMap = new HashMap<>();
         this.readWriteMap = new HashMap<>();
+        this.semaphoreMap = new HashMap<>();
     }
 
     @PreDestroy
     public void close() {
         this.lockMap.clear();
         this.readWriteMap.clear();
+        this.semaphoreMap.clear();
     }
 
     private ReentrantLock lock(String name) {
@@ -71,4 +77,34 @@ public class MemoryConcurrentService implements ConcurrentService {
     public boolean isWriteLocked(String name) {
         return readWriteLock(name).isWriteLocked();
     }
+
+    @Override
+    public Semaphore getSemaphore(@NotNull String name) {
+        Semaphore semaphore = semaphoreMap.get(name);
+        if (semaphore == null) {
+            synchronized (semaphoreMap) {
+                semaphore = semaphoreMap.computeIfAbsent(name, key -> {
+                    java.util.concurrent.Semaphore se = new java.util.concurrent.Semaphore(1, true);
+                    return new Semaphore() {
+
+                        @Override
+                        public void acquire() throws InterruptedException {
+                            se.acquire();
+                        }
+
+                        @Override
+                        public boolean tryAcquire() {
+                            return se.tryAcquire();
+                        }
+
+                        @Override
+                        public void release() {
+                            se.release();
+                        }
+                    };
+                });
+            }
+        }
+        return semaphore;
+    }
 }

+ 26 - 0
data-redis/src/main/java/com/qmth/boot/redis/concurrent/RedisConcurrentService.java

@@ -1,13 +1,16 @@
 package com.qmth.boot.redis.concurrent;
 
+import com.qmth.boot.core.concurrent.model.Semaphore;
 import com.qmth.boot.core.concurrent.service.ConcurrentService;
 import com.qmth.boot.redis.constant.RedisConstant;
 import com.qmth.boot.redis.rateLimit.RedisRateLimitService;
+import org.redisson.api.RSemaphore;
 import org.redisson.api.RedissonClient;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.stereotype.Component;
 
+import javax.validation.constraints.NotNull;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
 
@@ -55,4 +58,27 @@ public class RedisConcurrentService implements ConcurrentService, RedisConstant
     public boolean isWriteLocked(String name) {
         return redissonClient.getReadWriteLock(getReadWriteLockKey(name)).writeLock().isLocked();
     }
+
+    @Override
+    public Semaphore getSemaphore(@NotNull String name) {
+        RSemaphore semaphore = redissonClient.getSemaphore(name);
+        return new Semaphore() {
+
+            @Override
+            public void acquire() throws InterruptedException {
+                semaphore.acquire();
+            }
+
+            @Override
+            public boolean tryAcquire() {
+                return semaphore.tryAcquire();
+            }
+
+            @Override
+            public void release() {
+                semaphore.release();
+            }
+        };
+    }
+
 }