diff --git a/src/main/java/org/cojen/dirmi/core/AutoDisposer.java b/src/main/java/org/cojen/dirmi/core/AutoDisposer.java index de314da..6e73085 100644 --- a/src/main/java/org/cojen/dirmi/core/AutoDisposer.java +++ b/src/main/java/org/cojen/dirmi/core/AutoDisposer.java @@ -115,13 +115,7 @@ public BasicRef(StubWrapper wrapper, StubInvoker invoker) { void removed() { if (mInvoker.support().trySession() instanceof CoreSession session) { - // Note: Although the pipe isn't flushed immediately, this operation might - // still block. If it does, then no dispose messages will be sent for any - // sessions until the blocked one automatically disconnects. This can be - // prevented by running a task in a separate thread, but that would end up - // creating a new temporary object. Ideally, the task option should only be - // used when the pipe's output buffer is full. - session.stubDisposeAndNotify(mInvoker, null, false); + session.autoDispose(mInvoker); } } } diff --git a/src/main/java/org/cojen/dirmi/core/AutoSkeleton.java b/src/main/java/org/cojen/dirmi/core/AutoSkeleton.java new file mode 100644 index 0000000..32b99dc --- /dev/null +++ b/src/main/java/org/cojen/dirmi/core/AutoSkeleton.java @@ -0,0 +1,111 @@ +/* + * Copyright 2025 Cojen.org + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.cojen.dirmi.core; + +import java.lang.invoke.MethodHandles; +import java.lang.invoke.VarHandle; + +/** + * Base class for Skeletons which support AutoDispose. + * + * @author Brian S. O'Neill + */ +public abstract class AutoSkeleton extends CoreSkeleton { + private static final VarHandle cRefCountHandle; + + static { + try { + var lookup = MethodHandles.lookup(); + cRefCountHandle = lookup.findVarHandle(AutoSkeleton.class, "mRefCount", long.class); + } catch (Throwable e) { + throw CoreUtils.rethrow(e); + } + } + + private volatile long mRefCount; + + public AutoSkeleton(long id, SkeletonSupport support) { + super(id, support); + } + + public AutoSkeleton(Throwable exception, long id, SkeletonSupport support) { + super(exception, id, support); + } + + @Override + public void disposed() { + mRefCount = Long.MIN_VALUE; + } + + @Override + public Skeleton transport() { + return transport(this); + } + + private static Skeleton transport(AutoSkeleton skeleton) { + while (true) { + long count = skeleton.mRefCount; + + if (count < -1) { + // Disposed and cannot be resurrected. + return skeleton; + } + + if (count != -1) { + if (cRefCountHandle.compareAndSet(skeleton, count, count + 1)) { + return skeleton; + } + } else { + // The skeleton was auto disposed prematurely, so try to resurrect it. + var replacement = (AutoSkeleton) skeleton.support.skeletonFor(skeleton.server()); + if (replacement == skeleton && skeleton.mRefCount < 0) { + // This case isn't expected, but return the disposed skeleton instead of + // looping forever. + return skeleton; + } + skeleton = replacement; + } + } + } + + @Override + public boolean shouldDispose(long transportCount) { + while (true) { + long count = mRefCount; + + if (count < 0) { + // Should already be disposed. + return true; + } + + long newCount = count - transportCount; + + if (newCount > 0) { + if (cRefCountHandle.compareAndSet(this, count, newCount)) { + return false; + } + } else if (newCount == 0) { + if (cRefCountHandle.compareAndSet(this, count, -1)) { + return true; + } + } else { + // Transport count is too big, which isn't expected. Go ahead and dispose. + return true; + } + } + } +} diff --git a/src/main/java/org/cojen/dirmi/core/CoreSession.java b/src/main/java/org/cojen/dirmi/core/CoreSession.java index 2278b9b..d26c58e 100644 --- a/src/main/java/org/cojen/dirmi/core/CoreSession.java +++ b/src/main/java/org/cojen/dirmi/core/CoreSession.java @@ -61,7 +61,8 @@ abstract class CoreSession extends Item implements Session { static final int C_PING = 1, C_PONG = 2, C_MESSAGE = 3, C_KNOWN_TYPE = 4, C_REQUEST_CONNECTION = 5, C_REQUEST_INFO = 6, C_REQUEST_INFO_TERM = 7, C_INFO_FOUND = 8, C_INFO_NOT_FOUND = 9, - C_SKELETON_DISPOSE = 10, C_STUB_DISPOSE = 11, C_ACKNOWLEDGED_S = 12, C_ACKNOWLEDGED_L = 13; + C_SKELETON_DISPOSE = 10, C_STUB_DISPOSE = 11, C_ACKNOWLEDGED_S = 12, C_ACKNOWLEDGED_L = 13, + C_SKELETON_AUTO_DISPOSE = 14; static final int R_CLOSED = 1, R_DISCONNECTED = 2, R_PING_FAILURE = 4, R_CONTROL_FAILURE = 8; @@ -801,6 +802,15 @@ void startTasks() throws IOException { detached(skeleton, true); } break; + case C_SKELETON_AUTO_DISPOSE: + skeleton = mSkeletons.getOrNull(pipe.readLong()); + long transportCount = pipe.readLong(); + if (skeleton != null && skeleton.shouldDispose(transportCount)) { + mSkeletons.removeAuto(skeleton.id); + // Pass newTask=true to prevent blocking the control thread. + detached(skeleton, true); + } + break; case C_STUB_DISPOSE: id = pipe.readLong(); stubDispose(id, "Object is disposed by remote endpoint"); @@ -1233,18 +1243,52 @@ final boolean stubDispose(long id, String message) { /** * @param message optional */ - final boolean stubDisposeAndNotify(Stub stub, String message, boolean flush) { + final boolean stubDisposeAndNotify(Stub stub, String message) { long id = stub.id; if (stubDispose(id, message)) { // Notify the remote side to dispose the associated skeleton. If the command cannot // be sent, the skeleton will be disposed anyhow due to disconnect. - trySendCommandAndId(C_SKELETON_DISPOSE, id, flush); + trySendCommandAndId(C_SKELETON_DISPOSE, id, true); return true; } else { return false; } } + /** + * Note: Although the pipe isn't flushed immediately, this operation might still block. If + * it does, then no dispose messages will be sent for any sessions until the blocked one + * automatically disconnects. This can be prevented by running a task in a separate thread, + * but that would end up creating a new temporary object. Ideally, the task option should + * only be used when the pipe's output buffer is full. + * + * @see AutoDisposer + */ + final void autoDispose(Stub stub) { + long id = stub.id; + if (stubDispose(id, null)) { + long transportCount = stub.invoker().resetTransportCount(); + mControlLock.lock(); + try { + // Notify the remote side to dispose the associated skeleton. If the command + // cannot be sent, the skeleton will be disposed anyhow due to disconnect. + CorePipe pipe = controlPipe(); + if (transportCount != 0) { + pipe.write(C_SKELETON_AUTO_DISPOSE); + pipe.writeLong(id); + pipe.writeLong(transportCount); + } else { + pipe.write(C_SKELETON_DISPOSE); + pipe.writeLong(id); + } + } catch (IOException e) { + // Ignore. + } finally { + mControlLock.unlock(); + } + } + } + private void stubDecRefCount(long id, long amount) { try { StubInvoker invoker = mStubs.get(id).invoker(); @@ -1345,6 +1389,8 @@ final void writeBrokenSkeletonAlias(CorePipe pipe, } private void writeSkeleton(CorePipe pipe, Skeleton skeleton) throws IOException { + skeleton = skeleton.transport(); + if (mKnownTypes.tryGet(skeleton.typeId()) != null) { // Write the remote identifier and the remote type. pipe.writeSkeletonHeader((byte) TypeCodes.T_REMOTE_T, skeleton); diff --git a/src/main/java/org/cojen/dirmi/core/CoreSkeletonSupport.java b/src/main/java/org/cojen/dirmi/core/CoreSkeletonSupport.java index b577516..9e02cf6 100644 --- a/src/main/java/org/cojen/dirmi/core/CoreSkeletonSupport.java +++ b/src/main/java/org/cojen/dirmi/core/CoreSkeletonSupport.java @@ -69,4 +69,9 @@ public void dispose(Skeleton skeleton) { public void uncaught(Throwable e) { mSession.uncaught(e); } + + @Override + public Skeleton skeletonFor(R server) { + return mSession.mSkeletons.skeletonFor(server); + } } diff --git a/src/main/java/org/cojen/dirmi/core/CoreUtils.java b/src/main/java/org/cojen/dirmi/core/CoreUtils.java index b91e59e..b2d72f7 100644 --- a/src/main/java/org/cojen/dirmi/core/CoreUtils.java +++ b/src/main/java/org/cojen/dirmi/core/CoreUtils.java @@ -85,7 +85,7 @@ public static boolean dispose(Object obj) { throw new IllegalArgumentException(); } if (stub.support() instanceof CoreStubSupport css) { - return css.session().stubDisposeAndNotify(stub, null, true); + return css.session().stubDisposeAndNotify(stub, null); } return false; } diff --git a/src/main/java/org/cojen/dirmi/core/ItemMap.java b/src/main/java/org/cojen/dirmi/core/ItemMap.java index 664b20f..a3cfd9f 100644 --- a/src/main/java/org/cojen/dirmi/core/ItemMap.java +++ b/src/main/java/org/cojen/dirmi/core/ItemMap.java @@ -160,6 +160,19 @@ final I get(long id) throws NoSuchObjectException { throw new NoSuchObjectException(id); } + /** + * Like tryGet but performs a synchronized double check. + */ + final I getOrNull(long id) { + I item = tryGet(id); + if (item == null) { + synchronized (this) { + item = tryGet(id); + } + } + return item; + } + /** * Remove an item from the map by its identifier. */ diff --git a/src/main/java/org/cojen/dirmi/core/Skeleton.java b/src/main/java/org/cojen/dirmi/core/Skeleton.java index b247aee..952169c 100644 --- a/src/main/java/org/cojen/dirmi/core/Skeleton.java +++ b/src/main/java/org/cojen/dirmi/core/Skeleton.java @@ -65,6 +65,35 @@ public Throwable checkException(Throwable exception) { return exception; } + /** + * Called when the Skeleton has been explicitly disposed. + * + * @see AutoSkeleton + */ + public void disposed() { + } + + /** + * Called when the Skeleton is about to be written over a Pipe, returning the actual + * Skeleton which should be used. + * + * @see AutoSkeleton + */ + public Skeleton transport() { + return this; + } + + /** + * Called when the Skeleton supports automatic disposal, and it has been unreferenced. + * + * @param transportCount number of transport counts recorded by the remote stub + * @return true if Skeleton should be disposed + * @see AutoSkeleton + */ + public boolean shouldDispose(long transportCount) { + return true; + } + public static void batchedResultCheck(Object server, String methodName, Object result) { if (result == null) { nullBatchedResult(server, methodName); diff --git a/src/main/java/org/cojen/dirmi/core/SkeletonMaker.java b/src/main/java/org/cojen/dirmi/core/SkeletonMaker.java index dbae104..225aeaa 100644 --- a/src/main/java/org/cojen/dirmi/core/SkeletonMaker.java +++ b/src/main/java/org/cojen/dirmi/core/SkeletonMaker.java @@ -78,8 +78,10 @@ private SkeletonMaker(Class type) { .implement(SkeletonFactory.class).final_().sourceFile(sourceFile); CoreUtils.allowAccess(mFactoryMaker); + Class baseClass = mServerInfo.isAutoDispose() ? AutoSkeleton.class : CoreSkeleton.class; + mSkeletonMaker = mFactoryMaker.another(type.getName()) - .extend(CoreSkeleton.class).final_().sourceFile(sourceFile); + .extend(baseClass).final_().sourceFile(sourceFile); // Need to define the skeleton constructor (and its dependencies) early in order for // the factory to see it. diff --git a/src/main/java/org/cojen/dirmi/core/SkeletonMap.java b/src/main/java/org/cojen/dirmi/core/SkeletonMap.java index ed2e1ef..79a6a7e 100644 --- a/src/main/java/org/cojen/dirmi/core/SkeletonMap.java +++ b/src/main/java/org/cojen/dirmi/core/SkeletonMap.java @@ -61,6 +61,21 @@ synchronized void clear() { @Override synchronized Skeleton remove(long id) { + Skeleton skeleton = super.remove(id); + if (id >= 0 && skeleton != null) { // only remove server if id isn't an alias + skeleton.disposed(); + removeServer(skeleton.server()); + } + return skeleton; + } + + /** + * Same as remove except doesn't call Skeleton.disposed, allowing it to be resurrected + * later. + * + * @see AutoSkeleton + */ + synchronized Skeleton removeAuto(long id) { Skeleton skeleton = super.remove(id); if (id >= 0 && skeleton != null) { // only remove server if id isn't an alias removeServer(skeleton.server()); diff --git a/src/main/java/org/cojen/dirmi/core/SkeletonSupport.java b/src/main/java/org/cojen/dirmi/core/SkeletonSupport.java index b2739e5..d6f2c8f 100644 --- a/src/main/java/org/cojen/dirmi/core/SkeletonSupport.java +++ b/src/main/java/org/cojen/dirmi/core/SkeletonSupport.java @@ -59,4 +59,9 @@ void writeBrokenSkeletonAlias(Pipe pipe, Class type, long aliasId, Throwable void dispose(Skeleton skeleton); void uncaught(Throwable e); + + /** + * @see AutoSkeleton + */ + Skeleton skeletonFor(R server); } diff --git a/src/main/java/org/cojen/dirmi/core/StubInvoker.java b/src/main/java/org/cojen/dirmi/core/StubInvoker.java index 209bdf4..5c3d4d7 100644 --- a/src/main/java/org/cojen/dirmi/core/StubInvoker.java +++ b/src/main/java/org/cojen/dirmi/core/StubInvoker.java @@ -124,6 +124,22 @@ final boolean isRestorable() { } } + /** + * Called to track the number of times the remote side has referenced this stub invoker + * instance. It only needs to do anything for invokers which support automatic disposal. + */ + void incTransportCount() { + } + + /** + * Clears the transport count value, and returns the old value. This operation is only + * valid for invokers which support automatic disposal. If zero is returned (although not + * expected), the remote skeleton should be forcibly disposed. + */ + long resetTransportCount() { + return 0; + } + /** * Base class for invokers which support automatic disposal. This class must not declare * any new public instance methods because they can conflict with user-specified remote @@ -132,6 +148,20 @@ final boolean isRestorable() { private static abstract class WithRef extends StubInvoker { private StubWrapper.Factory wrapperFactory; + private volatile long transportCount = 1L; + + private static final VarHandle cTransportCountHandle; + + static { + try { + var lookup = MethodHandles.lookup(); + cTransportCountHandle = lookup.findVarHandle + (WithRef.class, "transportCount", long.class); + } catch (Throwable e) { + throw CoreUtils.rethrow(e); + } + } + public WithRef(long id, StubSupport support, MethodIdWriter miw, StubWrapper.Factory wrapperFactory) { @@ -144,6 +174,16 @@ final StubWrapper initWrapper() { wrapperFactory = null; // not needed anymore return wrapper; } + + @Override + void incTransportCount() { + cTransportCountHandle.getAndAdd(this, 1L); + } + + @Override + long resetTransportCount() { + return (long) cTransportCountHandle.getAndSet(this, 0L); + } } /** diff --git a/src/main/java/org/cojen/dirmi/core/StubMap.java b/src/main/java/org/cojen/dirmi/core/StubMap.java index 21cd22b..ad15b62 100644 --- a/src/main/java/org/cojen/dirmi/core/StubMap.java +++ b/src/main/java/org/cojen/dirmi/core/StubMap.java @@ -35,6 +35,7 @@ synchronized Stub putAndSelectStub(StubInvoker invoker) { if (existing.id == invoker.id) { Stub selected = ((StubInvoker) existing).select(); if (selected != null) { + selected.invoker().incTransportCount(); return selected; } break; diff --git a/src/test/java/org/cojen/dirmi/AutoDisposeTest.java b/src/test/java/org/cojen/dirmi/AutoDisposeTest.java index be3abec..d3fd756 100644 --- a/src/test/java/org/cojen/dirmi/AutoDisposeTest.java +++ b/src/test/java/org/cojen/dirmi/AutoDisposeTest.java @@ -16,6 +16,8 @@ package org.cojen.dirmi; +import java.io.IOException; + import java.net.ServerSocket; import org.junit.*; @@ -147,6 +149,67 @@ public void noReply() throws Exception { } } + @Test + public void race() throws Exception { + // Test that an auto disposed object isn't disposed before all the server side + // instances have been received. + + R2Server.mDetached = 0; + + R1 root = mSession.root(); + + R2 first = root.r2(); + + Pipe p = root.receive(null); + p.flush(); + R2 second = (R2) p.readObject(); + second.updateAndWaitForAck("hello"); + + // Allow the r2 objects to be locally collected, but the remote skeleton for the second + // cannot be disposed yet. + first = null; + second = null; + + // Wait for the first r2 object to be disposed and detached. + waitForDetached(1); + + assertTrue(root.doFlush()); + + // Force a few collections, which should have no effect. + for (int i=1; i<=2; i++) { + System.gc(); + Thread.sleep(1000); + } + + assertEquals(1, R2Server.mDetached); + + second = (R2) p.readObject(); + p.close(); + p = null; + + assertEquals("hello", second.getMessage()); + second = null; + + // Wait for the second r2 object to be disposed and detached. + waitForDetached(2); + } + + private static void waitForDetached(int detached) throws Exception { + for (int i=0; i<100; i++) { + System.gc(); + int actual = R2Server.mDetached; + if (actual == detached) { + return; + } + if (actual > detached) { + break; + } + Thread.sleep(100); + } + + assertEquals(detached, R2Server.mDetached); + } + private static long extractId(Object obj) { String str = obj.toString(); int ix = str.indexOf("id="); @@ -171,6 +234,10 @@ public static interface R1 extends Remote { @Restorable R2 r2() throws RemoteException; + + Pipe receive(Pipe pipe) throws IOException; + + boolean doFlush() throws IOException; } @AutoDispose @@ -178,6 +245,8 @@ public static interface R2 extends Remote { @NoReply void update(String msg) throws RemoteException; + void updateAndWaitForAck(String msg) throws RemoteException; + String getMessage() throws RemoteException; } @@ -210,6 +279,30 @@ public R2 r2() { } return mR2; } + + @Override + public synchronized Pipe receive(Pipe pipe) throws IOException { + mPipe = pipe; + mR2 = new R2Server(); + pipe.writeObject(mR2); + pipe.flush(); + pipe.writeObject(mR2); + return null; + } + + private Pipe mPipe; + + @Override + public synchronized boolean doFlush() throws IOException { + if (mPipe == null) { + return false; + } else { + mPipe.flush(); + mPipe.close(); + mPipe = null; + return true; + } + } } private static class R2Server implements R2, SessionAware { @@ -222,6 +315,11 @@ public void update(String msg) { mMessage = msg; } + @Override + public void updateAndWaitForAck(String msg) { + mMessage = msg; + } + @Override public String getMessage() { return mMessage;