Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
Roiocam committed Mar 27, 2024
1 parent fea2864 commit ee60359
Showing 1 changed file with 91 additions and 90 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;

import static com.alicp.jetcache.test.support.Tick.tick;
Expand Down Expand Up @@ -81,7 +82,7 @@ protected void baseTest() throws Exception {

asyncTest();

penetrationProtectTestWrapper(cache);
penetrationProtectTest(cache);
}

private void illegalArgTest() {
Expand Down Expand Up @@ -745,33 +746,35 @@ private void checkResult(String key, Integer value, CacheGetResult result) {

}

private static void penetrationProtectTestWrapper(Cache cache) throws Exception {
private static void penetrationProtectTestWrapper(Cache cache, Consumer<Cache> testFunc) {
boolean oldPenetrationProtect = cache.config().isCachePenetrationProtect();
Duration oldTime = cache.config().getPenetrationProtectTimeout();
long oldExpireAfterWriteInMillis = cache.config().getExpireAfterWriteInMillis();
long oldExpireAfterAccessInMillis = cache.config().getExpireAfterAccessInMillis();

cache.config().setCachePenetrationProtect(true);
cache.config().setPenetrationProtectTimeout(null);
cache.config().setExpireAfterWriteInMillis(Integer.MAX_VALUE);
cache.config().setExpireAfterAccessInMillis(Integer.MAX_VALUE);

penetrationProtectTest(cache);
testFunc.accept(cache);

cache.config().setCachePenetrationProtect(oldPenetrationProtect);
cache.config().setPenetrationProtectTimeout(oldTime);
cache.config().setExpireAfterWriteInMillis(oldExpireAfterWriteInMillis);
cache.config().setExpireAfterAccessInMillis(oldExpireAfterAccessInMillis);
}

public static void penetrationProtectTest(Cache cache) throws Exception {
boolean oldPenetrationProtect = cache.config().isCachePenetrationProtect();
Duration oldTime = cache.config().getPenetrationProtectTimeout();
cache.config().setCachePenetrationProtect(true);
public static void penetrationProtectTest(Cache cache) {
penetrationProtectTestWrapper(cache, AbstractCacheTest::penetrationProtectTestWithComputeIfAbsent);

penetrationProtectTestWithComputeIfAbsent(cache);
if (cache instanceof LoadingCache) {
penetrationProtectTestWithLoadingCache(cache);
penetrationProtectTestWrapper(cache, AbstractCacheTest::penetrationProtectTestWithLoadingCache);
}

penetrationProtectReEntryTest(cache);
penetrationProtectTestWrapper(cache, AbstractCacheTest::penetrationProtectReEntryTest);

penetrationProtectTimeoutTest(cache);

cache.config().setCachePenetrationProtect(oldPenetrationProtect);
cache.config().setPenetrationProtectTimeout(oldTime);
penetrationProtectTestWrapper(cache, AbstractCacheTest::penetrationProtectTimeoutTest);
}

/**
Expand All @@ -780,19 +783,8 @@ public static void penetrationProtectTest(Cache cache) throws Exception {
* @param cache
* @throws Exception
*/
private static void penetrationProtectTestWithComputeIfAbsent(Cache cache) throws Exception {
private static void penetrationProtectTestWithComputeIfAbsent(Cache cache) {
String keyPrefix = "penetrationProtect_";

boolean oldPenetrationProtect = cache.config().isCachePenetrationProtect();
Duration oldTime = cache.config().getPenetrationProtectTimeout();
long expireAfterWriteInMillis = cache.config().getExpireAfterWriteInMillis();
long expireAfterAccessInMillis = cache.config().getExpireAfterAccessInMillis();

cache.config().setCachePenetrationProtect(true);
cache.config().setPenetrationProtectTimeout(null);
cache.config().setExpireAfterWriteInMillis(Integer.MAX_VALUE);
cache.config().setExpireAfterAccessInMillis(Integer.MAX_VALUE);

AtomicInteger loadSuccess = new AtomicInteger(0);
Function loader = new Function() {
private AtomicInteger count1 = new AtomicInteger(0);
Expand Down Expand Up @@ -839,7 +831,11 @@ public Object apply(Object k) {
});
t.start();
}
countDownLatch.await();
try {
countDownLatch.await();
} catch (InterruptedException e) {
Assert.fail("CountDownLatch await was interrupted: " + e.getMessage());
}

Assert.assertFalse(fail.get());
Assert.assertEquals(3, loadSuccess.get());
Expand All @@ -849,14 +845,9 @@ public Object apply(Object k) {
Assert.assertTrue(cache.remove(keyPrefix + "0"));
Assert.assertTrue(cache.remove(keyPrefix + "1"));
Assert.assertTrue(cache.remove(keyPrefix + "2"));

cache.config().setCachePenetrationProtect(oldPenetrationProtect);
cache.config().setPenetrationProtectTimeout(oldTime);
cache.config().setExpireAfterWriteInMillis(expireAfterWriteInMillis);
cache.config().setExpireAfterAccessInMillis(expireAfterAccessInMillis);
}

private static void penetrationProtectTestWithLoadingCache(Cache cache) throws Exception {
private static void penetrationProtectTestWithLoadingCache(Cache cache) {
String failMsg[] = new String[1];

Function<Integer, Integer> loaderFunction = new Function<Integer, Integer>() {
Expand Down Expand Up @@ -921,7 +912,12 @@ public Integer apply(Integer key) {
}
countDownLatch.countDown();
}).start();
countDownLatch.await();

try {
countDownLatch.await();
} catch (InterruptedException e) {
Assert.fail("CountDownLatch await was interrupted: " + e.getMessage());
}

Assert.assertNull(failMsg[0]);

Expand All @@ -934,7 +930,7 @@ private static void penetrationProtectReEntryTest(Cache cache) {
Assert.assertEquals("V", v);
}

private static void penetrationProtectTimeoutTest(Cache cache) throws Exception {
private static void penetrationProtectTimeoutTest(Cache cache) {
String keyPrefix = "penetrationProtectTimeoutTest_";
final Duration SHORT_PROTECT_DURATION = Duration.ofMillis(1);
final Duration LONG_PROTECT_DURATION = Duration.ofSeconds(60);
Expand All @@ -957,60 +953,65 @@ private static void penetrationProtectTimeoutTest(Cache cache) throws Exception
duration.set(Duration.ofNanos(System.nanoTime() - start));
return normalLoader.apply(k);
};
// 2. assemble loader into thread.
Thread t1 = new Thread(() -> cache.computeIfAbsent(keyPrefix + 1, firstBarrierLoader));
Thread t2 = new Thread(() -> cache.computeIfAbsent(keyPrefix + 1, normalLoader));
// 3. start the blocking loader thread, and then verify that there is a block.
t1.start();
Thread.sleep(tick(25));
Assert.assertEquals(0, loadSuccess.intValue());
// 4. start the normal loader thread, and verify penetration protector expired after the delay.
t2.start();
Thread.sleep(tick(25));
Assert.assertEquals(1, loadSuccess.intValue());
// 5. release the barrier of blocking loader, and then verify all of the loaders execute.
firstBarrier.countDown();
t1.join();
t2.join();
Assert.assertEquals(2, loadSuccess.intValue());
Assert.assertTrue(SHORT_PROTECT_DURATION.compareTo(duration.get()) < 0);

// 6. reset the time of PenetrationProtectTimeout to a longer Duration.
// reset two threads to a new loader.
cache.config().setPenetrationProtectTimeout(LONG_PROTECT_DURATION);
CountDownLatch secondBarrier = new CountDownLatch(1);
loadSuccess.set(0);
duration.set(Duration.ZERO);
Function secondBarrierLoader = k -> {
long start = System.nanoTime();
try {
secondBarrier.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
duration.set(Duration.ofNanos(System.nanoTime() - start));
return normalLoader.apply(k);
};
t1 = new Thread(() -> cache.computeIfAbsent(keyPrefix + 2, secondBarrierLoader));
t2 = new Thread(() -> cache.computeIfAbsent(keyPrefix + 2, normalLoader));
Thread t3 = new Thread(() -> cache.computeIfAbsent(keyPrefix + 2, secondBarrierLoader));
// 7. serializing delays the startup of each thread, and then verify no any loader executes.
t1.start();
Thread.sleep(tick(25));
t2.start();
Thread.sleep(tick(25));
t3.start();
Thread.sleep(tick(25));
Assert.assertEquals(0, loadSuccess.intValue());
// 8. interrupt the second thread, and then release the barrier of blocking loader
t2.interrupt();
Thread.sleep(tick(25));
secondBarrier.countDown();
t1.join();
t2.join();
t3.join();
// 9. verify only the first and the third threads were executed, and verify PenetrationProtect didn't expire.
Assert.assertEquals(2, loadSuccess.intValue());
Assert.assertTrue(LONG_PROTECT_DURATION.compareTo(duration.get()) > 0);
try {
// 2. assemble loader into thread.
Thread t1 = new Thread(() -> cache.computeIfAbsent(keyPrefix + 1, firstBarrierLoader));
Thread t2 = new Thread(() -> cache.computeIfAbsent(keyPrefix + 1, normalLoader));
// 3. start the blocking loader thread, and then verify that there is a block.
t1.start();
Thread.sleep(tick(25));
Assert.assertEquals(0, loadSuccess.intValue());
// 4. start the normal loader thread, and verify penetration protector expired after the delay.
t2.start();
Thread.sleep(tick(25));
Assert.assertEquals(1, loadSuccess.intValue());
// 5. release the barrier of blocking loader, and then verify all of the loaders execute.
firstBarrier.countDown();
t1.join();
t2.join();
Assert.assertEquals(2, loadSuccess.intValue());
Assert.assertTrue(SHORT_PROTECT_DURATION.compareTo(duration.get()) < 0);

// 6. reset the time of PenetrationProtectTimeout to a longer Duration.
// reset two threads to a new loader.
cache.config().setPenetrationProtectTimeout(LONG_PROTECT_DURATION);
CountDownLatch secondBarrier = new CountDownLatch(1);
loadSuccess.set(0);
duration.set(Duration.ZERO);
Function secondBarrierLoader = k -> {
long start = System.nanoTime();
try {
secondBarrier.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
duration.set(Duration.ofNanos(System.nanoTime() - start));
return normalLoader.apply(k);
};
t1 = new Thread(() -> cache.computeIfAbsent(keyPrefix + 2, secondBarrierLoader));
t2 = new Thread(() -> cache.computeIfAbsent(keyPrefix + 2, normalLoader));
Thread t3 = new Thread(() -> cache.computeIfAbsent(keyPrefix + 2, secondBarrierLoader));
// 7. serializing delays the startup of each thread, and then verify no any loader executes.
t1.start();
Thread.sleep(tick(25));
t2.start();
Thread.sleep(tick(25));
t3.start();
Thread.sleep(tick(25));
Assert.assertEquals(0, loadSuccess.intValue());
// 8. interrupt the second thread, and then release the barrier of blocking loader
t2.interrupt();
Thread.sleep(tick(25));
secondBarrier.countDown();
t1.join();
t2.join();
t3.join();
// 9. verify only the first and the third threads were executed, and verify PenetrationProtect didn't expire.
Assert.assertEquals(2, loadSuccess.intValue());
Assert.assertTrue(LONG_PROTECT_DURATION.compareTo(duration.get()) > 0);

} catch (InterruptedException e) {
Assert.fail("Thread was interrupted: " + e.getMessage());
}
}
}

0 comments on commit ee60359

Please sign in to comment.