Skip to content

Commit

Permalink
GEODE-6696: EntryEvenImpl.offHeapLock created only if off-heap in use
Browse files Browse the repository at this point in the history
  • Loading branch information
Mario Ivanac committed May 13, 2019
1 parent d0b1241 commit 9d45d63
Show file tree
Hide file tree
Showing 11 changed files with 443 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ public void afterCreate(EntryEvent event) {
long t1 = 0;
try {
t1 = System.currentTimeMillis();
EntryEventImpl event = new EntryEventImpl((Object) null);
EntryEventImpl event = new EntryEventImpl((Object) null, false);
try {
event.setEventId(new EventID(new byte[] {1}, 1, 1));
PutOp.execute(conn, proxy, testRegion.getFullPath(), "key1", "val1", event, null, false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,15 @@ public static synchronized void publishClientStats(PoolImpl pool) {
ServerRegionProxy regionProxy =
new ServerRegionProxy(ClientHealthMonitoringRegion.ADMIN_REGION_NAME, pool);

boolean isOffHeap;
if (ds.getOffHeapStore() != null) {
isOffHeap = true;
} else {
isOffHeap = false;
}
EventID eventId = new EventID(ds);
@Released
EntryEventImpl event = new EntryEventImpl((Object) null);
EntryEventImpl event = new EntryEventImpl((Object) null, isOffHeap);
try {
event.setEventId(eventId);
regionProxy.putForMetaRegion(ds.getDistributedMember(), stats, null, event, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.geode.distributed.DistributedSystem;
import org.apache.geode.distributed.internal.DistributionConfig;
import org.apache.geode.distributed.internal.DistributionMessage;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.Assert;
import org.apache.geode.internal.ByteArrayDataInput;
Expand Down Expand Up @@ -184,6 +185,7 @@ public class EntryEventImpl implements InternalEntryEvent, InternalCacheEvent,
public static final Object SUSPECT_TOKEN = new Object();

public EntryEventImpl() {
this.offHeapLock = null;
// do nothing
}

Expand Down Expand Up @@ -235,6 +237,13 @@ public void fromData(DataInput in) throws IOException, ClassNotFoundException {
protected EntryEventImpl(InternalRegion region, Operation op, Object key, boolean originRemote,
DistributedMember distributedMember, boolean generateCallbacks, boolean fromRILocalDestroy) {
this.region = region;
InternalDistributedSystem ds =
(InternalDistributedSystem) region.getCache().getDistributedSystem();
if (ds.getOffHeapStore() != null) {
this.offHeapLock = new Object();
} else {
this.offHeapLock = null;
}
this.op = op;
this.keyInfo = region.getKeyInfo(key);
setOriginRemote(originRemote);
Expand All @@ -253,6 +262,13 @@ protected EntryEventImpl(final InternalRegion region, Operation op, Object key,
DistributedMember distributedMember, boolean generateCallbacks, boolean initializeId) {

this.region = region;
InternalDistributedSystem ds =
(InternalDistributedSystem) region.getCache().getDistributedSystem();
if (ds.getOffHeapStore() != null) {
this.offHeapLock = new Object();
} else {
this.offHeapLock = null;
}
this.op = op;
this.keyInfo = region.getKeyInfo(key, newVal, callbackArgument);

Expand Down Expand Up @@ -301,6 +317,11 @@ public EntryEventImpl(
@Retained({ENTRY_EVENT_NEW_VALUE, ENTRY_EVENT_OLD_VALUE}) EntryEventImpl other,
boolean setOldValue) {
setRegion(other.getRegion());
if (other.offHeapLock != null) {
this.offHeapLock = new Object();
} else {
this.offHeapLock = null;
}

this.eventID = other.eventID;
basicSetNewValue(other.basicGetNewValue(), false);
Expand Down Expand Up @@ -332,8 +353,13 @@ public EntryEventImpl(
}

@Retained
public EntryEventImpl(Object key2) {
public EntryEventImpl(Object key2, boolean isOffHeap) {
this.keyInfo = new KeyInfo(key2, null, null);
if (isOffHeap) {
this.offHeapLock = new Object();
} else {
this.offHeapLock = null;
}
}

/**
Expand Down Expand Up @@ -1086,7 +1112,7 @@ private <T, R> R callWithOffHeapLock(T value, Function<T, R> function) {
}
}

private final Object offHeapLock = new Object();
private final Object offHeapLock;

public String getNewValueStringForm() {
return StringUtils.forceToString(basicGetNewValue());
Expand Down Expand Up @@ -2123,20 +2149,29 @@ public String toString() {
buf.append(this.getKey());
if (Boolean.getBoolean("gemfire.insecure-logvalues")) {
buf.append(";oldValue=");
try {
if (mayHaveOffHeapReferences()) {
synchronized (this.offHeapLock) {
ArrayUtils.objectStringNonRecursive(basicGetOldValue(), buf);
try {
ArrayUtils.objectStringNonRecursive(basicGetOldValue(), buf);
} catch (IllegalStateException ignore) {
buf.append("OFFHEAP_VALUE_FREED");
}
}
} catch (IllegalStateException ignore) {
buf.append("OFFHEAP_VALUE_FREED");
} else {
ArrayUtils.objectStringNonRecursive(basicGetOldValue(), buf);
}

buf.append(";newValue=");
try {
if (mayHaveOffHeapReferences()) {
synchronized (this.offHeapLock) {
ArrayUtils.objectStringNonRecursive(basicGetNewValue(), buf);
try {
ArrayUtils.objectStringNonRecursive(basicGetNewValue(), buf);
} catch (IllegalStateException ignore) {
buf.append("OFFHEAP_VALUE_FREED");
}
}
} catch (IllegalStateException ignore) {
buf.append("OFFHEAP_VALUE_FREED");
} else {
ArrayUtils.objectStringNonRecursive(basicGetNewValue(), buf);
}
}
buf.append(";callbackArg=");
Expand Down Expand Up @@ -2871,11 +2906,15 @@ public void release() {
* Return true if this EntryEvent may have off-heap references.
*/
private boolean mayHaveOffHeapReferences() {
if (this.offHeapLock == null) {
return false;
}

InternalRegion lr = getRegion();
if (lr != null) {
return lr.getOffHeap();
}
// if region field is null it is possible that we have off-heap values

return true;
}

Expand All @@ -2891,9 +2930,14 @@ public void disallowOffHeapValues() {
if (isOffHeapReference(this.newValue) || isOffHeapReference(this.oldValue)) {
throw new IllegalStateException("This event already has off-heap values");
}
synchronized (this.offHeapLock) {
if (mayHaveOffHeapReferences()) {
synchronized (this.offHeapLock) {
this.offHeapOk = false;
}
} else {
this.offHeapOk = false;
}

}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.apache.geode.cache.Scope;
import org.apache.geode.cache.TransactionId;
import org.apache.geode.cache.client.internal.ServerRegionProxy;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.Version;
import org.apache.geode.internal.cache.entries.DiskEntry.RecoveredEntry;
Expand Down Expand Up @@ -1209,6 +1210,13 @@ protected TestableAbstractRegionMap(boolean withConcurrencyChecks, boolean isDis
when(owner.getDataPolicy()).thenReturn(DataPolicy.REPLICATE);
when(owner.getScope()).thenReturn(Scope.LOCAL);
when(owner.isInitialized()).thenReturn(true);

InternalCache cache = mock(InternalCache.class);
InternalDistributedSystem ids = mock(InternalDistributedSystem.class);
when(owner.getCache()).thenReturn(cache);
when(cache.getDistributedSystem()).thenReturn(ids);
when(ids.getOffHeapStore()).thenReturn(null);

doThrow(EntryNotFoundException.class).when(owner).checkEntryNotFound(any());
initialize(owner, new Attributes(), null, false);
if (map != null) {
Expand Down Expand Up @@ -1243,6 +1251,13 @@ protected TestableVMLRURegionMap() {

private static LocalRegion createOwner(boolean withConcurrencyChecks) {
LocalRegion owner = mock(LocalRegion.class);

InternalCache cache = mock(InternalCache.class);
InternalDistributedSystem ids = mock(InternalDistributedSystem.class);
when(owner.getCache()).thenReturn(cache);
when(cache.getDistributedSystem()).thenReturn(ids);
when(ids.getOffHeapStore()).thenReturn(null);

CachePerfStats cachePerfStats = mock(CachePerfStats.class);
when(owner.getCachePerfStats()).thenReturn(cachePerfStats);
when(owner.getEvictionAttributes()).thenReturn(evictionAttributes);
Expand Down Expand Up @@ -1286,14 +1301,21 @@ protected TxTestableAbstractRegionMap(boolean isInitialized) {
owner = mock(DistributedRegion.class);
when(owner.isInitialized()).thenReturn(false);
}

InternalCache cache = mock(InternalCache.class);
InternalDistributedSystem ids = mock(InternalDistributedSystem.class);

KeyInfo keyInfo = mock(KeyInfo.class);
when(keyInfo.getKey()).thenReturn(KEY);
when(owner.getKeyInfo(eq(KEY), any(), any())).thenReturn(keyInfo);
when(owner.getMyId()).thenReturn(mock(InternalDistributedMember.class));
when(owner.getCache()).thenReturn(mock(InternalCache.class));
when(owner.getCache()).thenReturn(cache);
when(owner.isAllEvents()).thenReturn(true);
when(owner.shouldNotifyBridgeClients()).thenReturn(true);
when(owner.lockWhenRegionIsInitializing()).thenCallRealMethod();

when(cache.getDistributedSystem()).thenReturn(ids);
when(ids.getOffHeapStore()).thenReturn(null);
initialize(owner, new Attributes(), null, false);
}

Expand All @@ -1312,6 +1334,12 @@ public void txApplyPutOnSecondaryConstructsPendingCallbacksWhenRegionEntryExists
EventID eventId = mock(EventID.class);
Object newValue = "value";

InternalCache cache = mock(InternalCache.class);
InternalDistributedSystem ids = mock(InternalDistributedSystem.class);
when(arm._getOwner().getCache()).thenReturn(cache);
when(cache.getDistributedSystem()).thenReturn(ids);
when(ids.getOffHeapStore()).thenReturn(null);

arm.txApplyPut(Operation.UPDATE, KEY, newValue, false, txId, txRmtEvent, eventId, null,
pendingCallbacks, null, null, null, null, 1);

Expand All @@ -1332,6 +1360,12 @@ public void txApplyPutOnPrimaryConstructsPendingCallbacksWhenPutIfAbsentReturnsE
TXEntryState txEntryState = mock(TXEntryState.class);
Object newValue = "value";

InternalCache cache = mock(InternalCache.class);
InternalDistributedSystem ids = mock(InternalDistributedSystem.class);
when(arm._getOwner().getCache()).thenReturn(cache);
when(cache.getDistributedSystem()).thenReturn(ids);
when(ids.getOffHeapStore()).thenReturn(null);

arm.txApplyPut(Operation.UPDATE, KEY, newValue, false, txId, null, eventId, null,
pendingCallbacks, null, null, txEntryState, null, 1);

Expand Down Expand Up @@ -1387,6 +1421,12 @@ public void txApplyPutDoesNotLockWhenRegionIsInitialized() {
EventID eventId = mock(EventID.class);
TXRmtEvent txRmtEvent = mock(TXRmtEvent.class);

InternalCache cache = mock(InternalCache.class);
InternalDistributedSystem ids = mock(InternalDistributedSystem.class);
when(arm._getOwner().getCache()).thenReturn(cache);
when(cache.getDistributedSystem()).thenReturn(ids);
when(ids.getOffHeapStore()).thenReturn(null);

arm.txApplyPut(Operation.UPDATE, KEY, "", false, txId, txRmtEvent, eventId, null,
new ArrayList<>(), null, null, null, null, 1);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.geode.cache.Operation;
import org.apache.geode.cache.query.internal.index.IndexManager;
import org.apache.geode.cache.query.internal.index.IndexProtocol;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.cache.TXEntryState.DistTxThinEntryState;
import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID;
Expand Down Expand Up @@ -1602,10 +1603,15 @@ private void givenBucketRegion() {
}

private void setupLocalRegion() {

InternalCache cache = mock(InternalCache.class);
InternalDistributedSystem ids = mock(InternalDistributedSystem.class);
when(owner.getCachePerfStats()).thenReturn(cachePerfStats);
when(owner.getCache()).thenReturn(mock(InternalCache.class));
when(owner.getCache()).thenReturn(cache);
when(owner.getMyId()).thenReturn(myId);
when(owner.getKeyInfo(any(), any(), any())).thenReturn(keyInfo);
when(cache.getDistributedSystem()).thenReturn(ids);
when(ids.getOffHeapStore()).thenReturn(null);
}

private void doTxApplyDestroy() {
Expand Down
Loading

0 comments on commit 9d45d63

Please sign in to comment.