Skip to content

Commit

Permalink
ISPN-2861 Integrate the CloudTM extended statistics in Infinispan
Browse files Browse the repository at this point in the history
  • Loading branch information
pruivo authored and Mircea Markus committed Jul 15, 2013
1 parent 76f5be4 commit 51a4629
Show file tree
Hide file tree
Showing 70 changed files with 7,926 additions and 14 deletions.
4 changes: 4 additions & 0 deletions README-i18n.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,7 @@ Infinispan logs fully support internationalization. The following is a list of m
17001 - 18000 cdi integration
18001 - 19000 hbase
21001 - 22000 mongodb cachestore
22001 - 23000 jpa cache store
23001 - 24000 leveldb cache store
24001 - 25000 couchbase cache store
25001 - 26000 extended statistics
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ public class ClusteredGetCommand extends BaseRpcCommand implements FlagAffectedC
private InternalEntryFactory entryFactory;
private int topologyId;
private Equivalence keyEquivalence;
//only used by extended statistics. this boolean is local.
private boolean isWrite;

private ClusteredGetCommand() {
super(null); // For command id uniqueness test
Expand All @@ -72,6 +74,7 @@ public ClusteredGetCommand(Object key, String cacheName, Set<Flag> flags,
this.acquireRemoteLock = acquireRemoteLock;
this.gtx = gtx;
this.keyEquivalence = keyEquivalence;
this.isWrite = false;
if (acquireRemoteLock && (gtx == null))
throw new IllegalArgumentException("Cannot have null tx if we need to acquire locks");
}
Expand Down Expand Up @@ -120,6 +123,10 @@ public InternalCacheValue perform(InvocationContext context) throws Throwable {
}
}

public GlobalTransaction getGlobalTransaction() {
return gtx;
}

private void acquireLocksIfNeeded() throws Throwable {
if (acquireRemoteLock) {
LockControlCommand lockControlCommand = commandsFactory.buildLockControlCommand(key, flags, gtx);
Expand All @@ -135,7 +142,7 @@ public byte getCommandId() {

@Override
public Object[] getParameters() {
return new Object[]{key, flags, acquireRemoteLock, gtx};
return new Object[]{key, flags, acquireRemoteLock, acquireRemoteLock ? gtx : null};
}

@Override
Expand Down Expand Up @@ -178,6 +185,14 @@ public String toString() {
.toString();
}

public boolean isWrite() {
return isWrite;
}

public void setWrite(boolean write) {
isWrite = write;
}

public Object getKey() {
return key;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,10 @@ public Object perform(InvocationContext ctx) throws Throwable {
return null;
}

public GlobalTransaction getGlobalTransaction() {
return gtx;
}

/**
* This only happens during state transfer.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,14 @@ public void performLocalWriteSkewCheck(DataContainer container, boolean alreadyC
// the implicit "versioning" we have in R_R creates a new wrapper "value" instance for every update.
if (actualValue != null && actualValue != valueToCompare) {
log.unableToCopyEntryForUpdate(getKey());
throw new WriteSkewException("Detected write skew.");
throw new WriteSkewException("Detected write skew.", key);
}

if (valueToCompare != null && ice == null && !isCreated()) {
// We still have a write-skew here. When this wrapper was created there was an entry in the data container
// (hence isCreated() == false) but 'ice' is now null.
log.unableToCopyEntryForUpdate(getKey());
throw new WriteSkewException("Detected write skew - concurrent removal of entry!");
throw new WriteSkewException("Detected write skew - concurrent removal of entry!", key);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,8 @@ protected boolean ignorePreviousValueOnBackup(WriteCommand command, InvocationCo
* rehash in progress, involving nodes that the key maps to.
*
* @param key key to look up
* @param isWrite {@code true} if this is triggered by a write operation
* @return an internal cache entry, or null if it cannot be located
*/
protected abstract InternalCacheEntry retrieveFromRemoteSource(Object key, InvocationContext ctx, boolean acquireRemoteLock, FlagAffectedCommand command) throws Exception;
protected abstract InternalCacheEntry retrieveFromRemoteSource(Object key, InvocationContext ctx, boolean acquireRemoteLock, FlagAffectedCommand command, boolean isWrite) throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,10 @@ public void injectDependencies(DistributionManager distributionManager, Clusteri
}

@Override
protected final InternalCacheEntry retrieveFromRemoteSource(Object key, InvocationContext ctx, boolean acquireRemoteLock, FlagAffectedCommand command) throws Exception {
GlobalTransaction gtx = acquireRemoteLock ? ((TxInvocationContext)ctx).getGlobalTransaction() : null;
protected final InternalCacheEntry retrieveFromRemoteSource(Object key, InvocationContext ctx, boolean acquireRemoteLock, FlagAffectedCommand command, boolean isWrite) throws Exception {
GlobalTransaction gtx = ctx.isInTxScope() ? ((TxInvocationContext)ctx).getGlobalTransaction() : null;
ClusteredGetCommand get = cf.buildClusteredGetCommand(key, command.getFlags(), acquireRemoteLock, gtx);
get.setWrite(isWrite);

List<Address> targets = new ArrayList<Address>(stateTransferManager.getCacheTopology().getReadConsistentHash().locateOwners(key));
// if any of the recipients has left the cluster since the command was issued, just don't wait for its response
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ private Object remoteGetBeforeWrite(InvocationContext ctx, WriteCommand command,
if (trace) log.tracef("Doing a remote get for key %s", key);

// attempt a remote lookup
InternalCacheEntry ice = retrieveFromRemoteSource(key, ctx, false, command);
InternalCacheEntry ice = retrieveFromRemoteSource(key, ctx, false, command, true);
if (ice != null) {
if (!ctx.replaceValue(key, ice)) {
entryFactory.wrapEntryForPut(ctx, key, ice, false, command, true);
Expand Down Expand Up @@ -181,7 +181,7 @@ private InternalCacheEntry localGetCacheEntry(InvocationContext ctx, Object key,

private InternalCacheEntry remoteGetCacheEntry(InvocationContext ctx, Object key, GetKeyValueCommand command) throws Throwable {
if (trace) log.tracef("Doing a remote get for key %s", key);
InternalCacheEntry ice = retrieveFromRemoteSource(key, ctx, false, command);
InternalCacheEntry ice = retrieveFromRemoteSource(key, ctx, false, command, false);
command.setRemotelyFetchedValue(ice);
if (ice != null)
return ice;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ private Object remoteGetAndStoreInL1(InvocationContext ctx, Object key, boolean
acquireRemoteLock = isWrite && isPessimisticCache && !txContext.getAffectedKeys().contains(key);
}
// attempt a remote lookup
InternalCacheEntry ice = retrieveFromRemoteSource(key, ctx, acquireRemoteLock, command);
InternalCacheEntry ice = retrieveFromRemoteSource(key, ctx, acquireRemoteLock, command, isWrite);

if (acquireRemoteLock) {
((TxInvocationContext) ctx).addAffectedKey(key);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,29 @@
* @since 5.1
*/
public class WriteSkewException extends CacheException {

private final Object key;

public WriteSkewException() {
this.key = null;
}

public WriteSkewException(Throwable cause) {
public WriteSkewException(Throwable cause, Object key) {
super(cause);
this.key = key;
}

public WriteSkewException(String msg) {
public WriteSkewException(String msg, Object key) {
super(msg);
this.key = key;
}

public WriteSkewException(String msg, Throwable cause) {
public WriteSkewException(String msg, Throwable cause, Object key) {
super(msg, cause);
this.key = key;
}

public final Object getKey() {
return key;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public static EntryVersionsMap performWriteSkewCheckAndReturnNewVersions(Version
uv.put(k, newVersion);
} else {
// Write skew check detected!
throw new WriteSkewException("Write skew detected on key " + k + " for transaction " + context.getTransaction());
throw new WriteSkewException("Write skew detected on key " + k + " for transaction " + context.getTransaction(), k);
}
}
}
Expand All @@ -70,7 +70,7 @@ public static EntryVersionsMap performTotalOrderWriteSkewCheckAndReturnNewVersio
uv.put(k, null);
} else {
// Write skew check detected!
throw new WriteSkewException("Write skew detected on key " + k + " for transaction " + context.getTransaction());
throw new WriteSkewException("Write skew detected on key " + k + " for transaction " + context.getTransaction(), k);
}
}
}
Expand Down
37 changes: 37 additions & 0 deletions extended-statistics/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.infinispan</groupId>
<artifactId>infinispan-parent</artifactId>
<version>6.0.0-SNAPSHOT</version>
<relativePath>../parent/pom.xml</relativePath>
</parent>

<artifactId>infinispan-extended-statistics</artifactId>
<packaging>jar</packaging>
<name>Infinispan Extended Statistics</name>
<description>Infinispan Extended Statistics module</description>

<dependencies>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>infinispan-core</artifactId>
</dependency>

<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>infinispan-core</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.clearspring.analytics</groupId>
<artifactId>stream</artifactId>
<version>2.2.0</version>
</dependency>
</dependencies>
</project>
Loading

0 comments on commit 51a4629

Please sign in to comment.