Skip to content

Commit

Permalink
Fix a race condition which caused AutoDisposed to be disposed of prem…
Browse files Browse the repository at this point in the history
…aturely.
  • Loading branch information
broneill committed Feb 4, 2025
1 parent bf12472 commit 853249e
Show file tree
Hide file tree
Showing 13 changed files with 371 additions and 12 deletions.
8 changes: 1 addition & 7 deletions src/main/java/org/cojen/dirmi/core/AutoDisposer.java
Original file line number Diff line number Diff line change
Expand Up @@ -115,13 +115,7 @@ public BasicRef(StubWrapper wrapper, StubInvoker invoker) {

void removed() {
if (mInvoker.support().trySession() instanceof CoreSession session) {
// Note: Although the pipe isn't flushed immediately, this operation might
// still block. If it does, then no dispose messages will be sent for any
// sessions until the blocked one automatically disconnects. This can be
// prevented by running a task in a separate thread, but that would end up
// creating a new temporary object. Ideally, the task option should only be
// used when the pipe's output buffer is full.
session.stubDisposeAndNotify(mInvoker, null, false);
session.autoDispose(mInvoker);
}
}
}
Expand Down
111 changes: 111 additions & 0 deletions src/main/java/org/cojen/dirmi/core/AutoSkeleton.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* Copyright 2025 Cojen.org
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.cojen.dirmi.core;

import java.lang.invoke.MethodHandles;
import java.lang.invoke.VarHandle;

/**
* Base class for Skeletons which support AutoDispose.
*
* @author Brian S. O'Neill
*/
public abstract class AutoSkeleton<R> extends CoreSkeleton<R> {
private static final VarHandle cRefCountHandle;

static {
try {
var lookup = MethodHandles.lookup();
cRefCountHandle = lookup.findVarHandle(AutoSkeleton.class, "mRefCount", long.class);
} catch (Throwable e) {
throw CoreUtils.rethrow(e);
}
}

private volatile long mRefCount;

public AutoSkeleton(long id, SkeletonSupport support) {
super(id, support);
}

public AutoSkeleton(Throwable exception, long id, SkeletonSupport support) {
super(exception, id, support);
}

@Override
public void disposed() {
mRefCount = Long.MIN_VALUE;
}

@Override
public Skeleton transport() {
return transport(this);
}

private static Skeleton transport(AutoSkeleton skeleton) {
while (true) {
long count = skeleton.mRefCount;

if (count < -1) {
// Disposed and cannot be resurrected.
return skeleton;
}

if (count != -1) {
if (cRefCountHandle.compareAndSet(skeleton, count, count + 1)) {
return skeleton;
}
} else {
// The skeleton was auto disposed prematurely, so try to resurrect it.
var replacement = (AutoSkeleton) skeleton.support.skeletonFor(skeleton.server());
if (replacement == skeleton && skeleton.mRefCount < 0) {
// This case isn't expected, but return the disposed skeleton instead of
// looping forever.
return skeleton;
}
skeleton = replacement;
}
}
}

@Override
public boolean shouldDispose(long transportCount) {
while (true) {
long count = mRefCount;

if (count < 0) {
// Should already be disposed.
return true;
}

long newCount = count - transportCount;

if (newCount > 0) {
if (cRefCountHandle.compareAndSet(this, count, newCount)) {
return false;
}
} else if (newCount == 0) {
if (cRefCountHandle.compareAndSet(this, count, -1)) {
return true;
}
} else {
// Transport count is too big, which isn't expected. Go ahead and dispose.
return true;
}
}
}
}
52 changes: 49 additions & 3 deletions src/main/java/org/cojen/dirmi/core/CoreSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ abstract class CoreSession<R> extends Item implements Session<R> {
static final int
C_PING = 1, C_PONG = 2, C_MESSAGE = 3, C_KNOWN_TYPE = 4, C_REQUEST_CONNECTION = 5,
C_REQUEST_INFO = 6, C_REQUEST_INFO_TERM = 7, C_INFO_FOUND = 8, C_INFO_NOT_FOUND = 9,
C_SKELETON_DISPOSE = 10, C_STUB_DISPOSE = 11, C_ACKNOWLEDGED_S = 12, C_ACKNOWLEDGED_L = 13;
C_SKELETON_DISPOSE = 10, C_STUB_DISPOSE = 11, C_ACKNOWLEDGED_S = 12, C_ACKNOWLEDGED_L = 13,
C_SKELETON_AUTO_DISPOSE = 14;

static final int R_CLOSED = 1, R_DISCONNECTED = 2, R_PING_FAILURE = 4, R_CONTROL_FAILURE = 8;

Expand Down Expand Up @@ -801,6 +802,15 @@ void startTasks() throws IOException {
detached(skeleton, true);
}
break;
case C_SKELETON_AUTO_DISPOSE:
skeleton = mSkeletons.getOrNull(pipe.readLong());
long transportCount = pipe.readLong();
if (skeleton != null && skeleton.shouldDispose(transportCount)) {
mSkeletons.removeAuto(skeleton.id);
// Pass newTask=true to prevent blocking the control thread.
detached(skeleton, true);
}
break;
case C_STUB_DISPOSE:
id = pipe.readLong();
stubDispose(id, "Object is disposed by remote endpoint");
Expand Down Expand Up @@ -1233,18 +1243,52 @@ final boolean stubDispose(long id, String message) {
/**
* @param message optional
*/
final boolean stubDisposeAndNotify(Stub stub, String message, boolean flush) {
final boolean stubDisposeAndNotify(Stub stub, String message) {
long id = stub.id;
if (stubDispose(id, message)) {
// Notify the remote side to dispose the associated skeleton. If the command cannot
// be sent, the skeleton will be disposed anyhow due to disconnect.
trySendCommandAndId(C_SKELETON_DISPOSE, id, flush);
trySendCommandAndId(C_SKELETON_DISPOSE, id, true);
return true;
} else {
return false;
}
}

/**
* Note: Although the pipe isn't flushed immediately, this operation might still block. If
* it does, then no dispose messages will be sent for any sessions until the blocked one
* automatically disconnects. This can be prevented by running a task in a separate thread,
* but that would end up creating a new temporary object. Ideally, the task option should
* only be used when the pipe's output buffer is full.
*
* @see AutoDisposer
*/
final void autoDispose(Stub stub) {
long id = stub.id;
if (stubDispose(id, null)) {
long transportCount = stub.invoker().resetTransportCount();
mControlLock.lock();
try {
// Notify the remote side to dispose the associated skeleton. If the command
// cannot be sent, the skeleton will be disposed anyhow due to disconnect.
CorePipe pipe = controlPipe();
if (transportCount != 0) {
pipe.write(C_SKELETON_AUTO_DISPOSE);
pipe.writeLong(id);
pipe.writeLong(transportCount);
} else {
pipe.write(C_SKELETON_DISPOSE);
pipe.writeLong(id);
}
} catch (IOException e) {
// Ignore.
} finally {
mControlLock.unlock();
}
}
}

private void stubDecRefCount(long id, long amount) {
try {
StubInvoker invoker = mStubs.get(id).invoker();
Expand Down Expand Up @@ -1345,6 +1389,8 @@ final void writeBrokenSkeletonAlias(CorePipe pipe,
}

private void writeSkeleton(CorePipe pipe, Skeleton skeleton) throws IOException {
skeleton = skeleton.transport();

if (mKnownTypes.tryGet(skeleton.typeId()) != null) {
// Write the remote identifier and the remote type.
pipe.writeSkeletonHeader((byte) TypeCodes.T_REMOTE_T, skeleton);
Expand Down
5 changes: 5 additions & 0 deletions src/main/java/org/cojen/dirmi/core/CoreSkeletonSupport.java
Original file line number Diff line number Diff line change
Expand Up @@ -69,4 +69,9 @@ public void dispose(Skeleton<?> skeleton) {
public void uncaught(Throwable e) {
mSession.uncaught(e);
}

@Override
public <R> Skeleton<R> skeletonFor(R server) {
return mSession.mSkeletons.skeletonFor(server);
}
}
2 changes: 1 addition & 1 deletion src/main/java/org/cojen/dirmi/core/CoreUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public static boolean dispose(Object obj) {
throw new IllegalArgumentException();
}
if (stub.support() instanceof CoreStubSupport css) {
return css.session().stubDisposeAndNotify(stub, null, true);
return css.session().stubDisposeAndNotify(stub, null);
}
return false;
}
Expand Down
13 changes: 13 additions & 0 deletions src/main/java/org/cojen/dirmi/core/ItemMap.java
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,19 @@ final I get(long id) throws NoSuchObjectException {
throw new NoSuchObjectException(id);
}

/**
* Like tryGet but performs a synchronized double check.
*/
final I getOrNull(long id) {
I item = tryGet(id);
if (item == null) {
synchronized (this) {
item = tryGet(id);
}
}
return item;
}

/**
* Remove an item from the map by its identifier.
*/
Expand Down
29 changes: 29 additions & 0 deletions src/main/java/org/cojen/dirmi/core/Skeleton.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,35 @@ public Throwable checkException(Throwable exception) {
return exception;
}

/**
* Called when the Skeleton has been explicitly disposed.
*
* @see AutoSkeleton
*/
public void disposed() {
}

/**
* Called when the Skeleton is about to be written over a Pipe, returning the actual
* Skeleton which should be used.
*
* @see AutoSkeleton
*/
public Skeleton transport() {
return this;
}

/**
* Called when the Skeleton supports automatic disposal, and it has been unreferenced.
*
* @param transportCount number of transport counts recorded by the remote stub
* @return true if Skeleton should be disposed
* @see AutoSkeleton
*/
public boolean shouldDispose(long transportCount) {
return true;
}

public static void batchedResultCheck(Object server, String methodName, Object result) {
if (result == null) {
nullBatchedResult(server, methodName);
Expand Down
4 changes: 3 additions & 1 deletion src/main/java/org/cojen/dirmi/core/SkeletonMaker.java
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,10 @@ private SkeletonMaker(Class<R> type) {
.implement(SkeletonFactory.class).final_().sourceFile(sourceFile);
CoreUtils.allowAccess(mFactoryMaker);

Class<?> baseClass = mServerInfo.isAutoDispose() ? AutoSkeleton.class : CoreSkeleton.class;

mSkeletonMaker = mFactoryMaker.another(type.getName())
.extend(CoreSkeleton.class).final_().sourceFile(sourceFile);
.extend(baseClass).final_().sourceFile(sourceFile);

// Need to define the skeleton constructor (and its dependencies) early in order for
// the factory to see it.
Expand Down
15 changes: 15 additions & 0 deletions src/main/java/org/cojen/dirmi/core/SkeletonMap.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,21 @@ synchronized void clear() {

@Override
synchronized Skeleton remove(long id) {
Skeleton skeleton = super.remove(id);
if (id >= 0 && skeleton != null) { // only remove server if id isn't an alias
skeleton.disposed();
removeServer(skeleton.server());
}
return skeleton;
}

/**
* Same as remove except doesn't call Skeleton.disposed, allowing it to be resurrected
* later.
*
* @see AutoSkeleton
*/
synchronized Skeleton removeAuto(long id) {
Skeleton skeleton = super.remove(id);
if (id >= 0 && skeleton != null) { // only remove server if id isn't an alias
removeServer(skeleton.server());
Expand Down
5 changes: 5 additions & 0 deletions src/main/java/org/cojen/dirmi/core/SkeletonSupport.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,4 +59,9 @@ void writeBrokenSkeletonAlias(Pipe pipe, Class<?> type, long aliasId, Throwable
void dispose(Skeleton<?> skeleton);

void uncaught(Throwable e);

/**
* @see AutoSkeleton
*/
<R> Skeleton<R> skeletonFor(R server);
}
40 changes: 40 additions & 0 deletions src/main/java/org/cojen/dirmi/core/StubInvoker.java
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,22 @@ final boolean isRestorable() {
}
}

/**
* Called to track the number of times the remote side has referenced this stub invoker
* instance. It only needs to do anything for invokers which support automatic disposal.
*/
void incTransportCount() {
}

/**
* Clears the transport count value, and returns the old value. This operation is only
* valid for invokers which support automatic disposal. If zero is returned (although not
* expected), the remote skeleton should be forcibly disposed.
*/
long resetTransportCount() {
return 0;
}

/**
* Base class for invokers which support automatic disposal. This class must not declare
* any new public instance methods because they can conflict with user-specified remote
Expand All @@ -132,6 +148,20 @@ final boolean isRestorable() {
private static abstract class WithRef extends StubInvoker {
private StubWrapper.Factory wrapperFactory;

private volatile long transportCount = 1L;

private static final VarHandle cTransportCountHandle;

static {
try {
var lookup = MethodHandles.lookup();
cTransportCountHandle = lookup.findVarHandle
(WithRef.class, "transportCount", long.class);
} catch (Throwable e) {
throw CoreUtils.rethrow(e);
}
}

public WithRef(long id, StubSupport support, MethodIdWriter miw,
StubWrapper.Factory wrapperFactory)
{
Expand All @@ -144,6 +174,16 @@ final StubWrapper initWrapper() {
wrapperFactory = null; // not needed anymore
return wrapper;
}

@Override
void incTransportCount() {
cTransportCountHandle.getAndAdd(this, 1L);
}

@Override
long resetTransportCount() {
return (long) cTransportCountHandle.getAndSet(this, 0L);
}
}

/**
Expand Down
Loading

0 comments on commit 853249e

Please sign in to comment.