Skip to content

Commit

Permalink
Merge pull request #28 from alexist/fix/#27
Browse files Browse the repository at this point in the history
feat(#27): INCR command executed on each backup
  • Loading branch information
grrolland authored Nov 22, 2023
2 parents 309429e + 70771a9 commit 5dd862b
Show file tree
Hide file tree
Showing 13 changed files with 328 additions and 408 deletions.
22 changes: 9 additions & 13 deletions dist/conf/hazelcast.xml
Original file line number Diff line number Diff line change
@@ -1,23 +1,22 @@
<?xml version="1.0" encoding="UTF-8"?>
<hazelcast xsi:schemaLocation="http://www.hazelcast.com/schema/config hazelcast-config-3.9.xsd"
<hazelcast xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://www.hazelcast.com/schema/config"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<group>
<name>ngx-dshm</name>
<password>dev-pass</password>
</group>
xsi:schemaLocation="http://www.hazelcast.com/schema/config
http://www.hazelcast.com/schema/config/hazelcast-config-5.0.xsd">

<cluster-name>ngx-dshm</cluster-name>

<network>
<port auto-increment="false">5701</port>
<join>
<multicast enabled="false" />
<multicast enabled="false"/>
<tcp-ip enabled="true">
<interface>127.0.0.1</interface>
<member-list>
<member>127.0.0.1</member>
</member-list>
</tcp-ip>
<aws enabled="false" />
<aws enabled="false"/>
</join>
</network>

Expand All @@ -27,11 +26,8 @@
<async-backup-count>0</async-backup-count>
<time-to-live-seconds>0</time-to-live-seconds>
<max-idle-seconds>0</max-idle-seconds>
<eviction-policy>NONE</eviction-policy>
<max-size policy="PER_NODE">0</max-size>
<eviction-percentage>25</eviction-percentage>
<min-eviction-check-millis>100</min-eviction-check-millis>
<merge-policy>com.hazelcast.map.merge.PutIfAbsentMapMergePolicy</merge-policy>
<eviction eviction-policy="NONE"/>
<merge-policy>PutIfAbsentMergePolicy</merge-policy>
<cache-deserialized-values>INDEX-ONLY</cache-deserialized-values>
</map>

Expand Down
18 changes: 13 additions & 5 deletions pom.xml
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>io.github.grrolland</groupId>
Expand All @@ -23,8 +25,8 @@
<connection>scm:git:[email protected]:grrolland/ngx-distributed-shm.git</connection>
<developerConnection>scm:git:https://github.com/grrolland/ngx-distributed-shm.git</developerConnection>
<url>https://github.com/grrolland/ngx-distributed-shm</url>
<tag>1.0.9</tag>
</scm>
<tag>HEAD</tag>
</scm>

<distributionManagement>
<repository>
Expand All @@ -48,9 +50,9 @@
<slf4j.version>2.0.4</slf4j.version>
<logback.version>1.4.11</logback.version>
<junit.version>4.13.2</junit.version>
<resilience4j-retry.version>1.7.0</resilience4j-retry.version>
</properties>


<dependencies>
<dependency>
<groupId>io.vertx</groupId>
Expand Down Expand Up @@ -98,6 +100,12 @@
<version>${junit.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.github.resilience4j</groupId>
<artifactId>resilience4j-retry</artifactId>
<version>${resilience4j-retry.version}</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down Expand Up @@ -139,7 +147,7 @@
<Main-Class>io.github.grrolland.hcshm.Main</Main-Class>
</manifestEntries>
</transformer>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
</transformers>
</configuration>
</execution>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@
package io.github.grrolland.hcshm.processor;

import com.hazelcast.map.EntryProcessor;
import com.hazelcast.map.IMap;
import io.github.grrolland.hcshm.HazelcastInstanceHandler;
import io.github.grrolland.hcshm.ShmRegionLocator;
import com.hazelcast.map.ExtendedMapEntry;
import io.github.grrolland.hcshm.ShmValue;

import java.io.Serializable;
Expand All @@ -46,11 +44,6 @@ public class IncrProcessor implements EntryProcessor<String, ShmValue, Object>,
*/
private final int initialExpire;

/**
* Region locator
*/
private final ShmRegionLocator regionLocator = new ShmRegionLocator();

/**
* Constructor
*
Expand All @@ -76,9 +69,11 @@ public IncrProcessor(long value, int init, int initialExpire) {
*/
@Override
public Object process(Map.Entry<String, ShmValue> entry) {

final ShmValue r = entry.getValue();
String newval;
int expire;
ExtendedMapEntry<String, ShmValue> extendedMapEntry = (ExtendedMapEntry<String, ShmValue>) entry;
if (null != r) {
try {
newval = Long.toString(Long.parseLong(r.getValue()) + value);
Expand All @@ -90,12 +85,12 @@ public Object process(Map.Entry<String, ShmValue> entry) {
newval = Long.toString(value + init);
expire = this.initialExpire;
}
IMap<String, ShmValue> map = regionLocator.getMap(HazelcastInstanceHandler.getInstance(), entry.getKey());
if (expire >= 0) {
map.set(entry.getKey(), new ShmValue(newval, expire), expire, TimeUnit.SECONDS);
extendedMapEntry.setValue(new ShmValue(newval, expire), expire, TimeUnit.SECONDS);
} else {
map.remove(entry.getKey());
extendedMapEntry.setValue(null);
}
return newval;
}

}
126 changes: 110 additions & 16 deletions src/test/java/io/github/grrolland/hcshm/AbstractHCSHMGetTestCase.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@
*/
package io.github.grrolland.hcshm;

import io.github.resilience4j.retry.Retry;
import io.github.resilience4j.retry.RetryConfig;
import io.github.resilience4j.retry.RetryRegistry;
import io.vavr.CheckedFunction0;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
Expand All @@ -28,16 +32,23 @@
import java.io.OutputStreamWriter;
import java.net.InetAddress;
import java.net.Socket;
import java.time.Duration;

/**
* Global Protocol Test Case
*/
public abstract class AbstractHCSHMGetTestCase {

// Wait duration in second
public static final int OPEN_SOCKET_WAIT_DURATION = 2;

// Max retry
public static final int SOCKET_RETRY_LOOP = 10;

/**
* Test Socket
*/
private final Socket sock = null;
private Socket sock = null;

/**
* Socket Reader
Expand All @@ -57,19 +68,52 @@ protected BufferedWriter getWriter() {
return writer;
}

/**
* get Retry
*
* @return the retry
*/
private static Retry getRetry() {
RetryConfig config = RetryConfig.custom()
// Max
.maxAttempts(SOCKET_RETRY_LOOP)
// Sleep
.waitDuration(Duration.ofSeconds(OPEN_SOCKET_WAIT_DURATION))
// Retry on IOException
.retryExceptions(IOException.class)
// Fail
.failAfterMaxAttempts(true)
//
.writableStackTraceEnabled(true)
// Build
.build();

// Create the RetryRegistry
RetryRegistry registry = RetryRegistry.of(config);

// Create the retry
return registry.retry("openSocket", config);
}

/**
* Wait (2000 ms)
*
* @throws InterruptedException
* exception when trying to sleep the current thread
*/
public static void pause() throws InterruptedException {
Thread.sleep(2000); // NOSONAR
}

/**
* Init Socket, Reader and Writer
*/
@Before
public void before() {
try {
Socket sock = new Socket(InetAddress.getByName("localhost"), 40321);
reader = new BufferedReader(new InputStreamReader(sock.getInputStream()));
writer = new BufferedWriter(new OutputStreamWriter(sock.getOutputStream()));
flush();
} catch (IOException e) {
e.printStackTrace();
}
public void before() throws Throwable {
sock = this.openSocketWithRetry();
reader = new BufferedReader(new InputStreamReader(sock.getInputStream()));
writer = new BufferedWriter(new OutputStreamWriter(sock.getOutputStream()));
flushAll();
}

/**
Expand All @@ -78,8 +122,8 @@ public void before() {
* @throws IOException
* I/O exception
*/
public void flush() throws IOException {
this.flush("");
public void flushAll() throws IOException {
this.flushAll("");
}

/**
Expand All @@ -90,7 +134,7 @@ public void flush() throws IOException {
* @throws IOException
* I/O exception
*/
public void flush(String region) throws IOException {
public void flushAll(String region) throws IOException {
getWriter().write(String.format("FLUSHALL %s\r\n", region));
getWriter().flush();
getReader().readLine();
Expand All @@ -108,30 +152,80 @@ public void after() {
if (writer != null) {
writer.close();
}
if (sock != null) {
if (sock != null && !sock.isClosed()) {
sock.shutdownInput();
sock.shutdownOutput();
sock.close();
}
} catch (IOException e) {
// Ignore shutdown issue
e.printStackTrace();
}
}

/**
* Assert a value is read
* Assert that response get the value <code>expect</code>
*
* @param expect
* expected value
* @throws IOException
* I/O Exception
*/
public void assertGetValue(String expect) throws IOException {
public void assertResponseGetValue(String expect) throws IOException {
String res = getReader().readLine();
Assert.assertEquals("LEN " + expect.length(), res);
res = getReader().readLine();
Assert.assertEquals(expect, res);
res = getReader().readLine();
Assert.assertEquals("DONE", res);
}

/**
* Assert response is key not found
*
* @throws IOException
* I/O Exception
*/
public void assertResponseNotFound() throws IOException {
String res = getReader().readLine();
Assert.assertEquals("ERROR not_found", res);
}

/**
* Assert response is malformed request
*
* @throws IOException
* I/O Exception
*/
public void assertResponseMalFormedRequest() throws IOException {
String res = getReader().readLine();
Assert.assertEquals("ERROR malformed_request", res);
}

/**
* Assert response is DONE
*
* @throws IOException
*/
public void assertResponseDone() throws IOException {
String res = getReader().readLine();
Assert.assertEquals("DONE", res);
}

/**
* Open socket with retry
*
* @return Socket
* @throws InterruptedException
* exception while opening socket
*/
private Socket openSocketWithRetry() throws Throwable {
Retry retry = getRetry();
CheckedFunction0<Socket> retryingOpenSocket = Retry.decorateCheckedSupplier(retry, this::openSocket);
return retryingOpenSocket.apply();
}

private Socket openSocket() throws IOException {
return new Socket(InetAddress.getByName("localhost"), 40321);
}
}
Loading

0 comments on commit 5dd862b

Please sign in to comment.