Skip to content

Commit

Permalink
Use one queue per worker thread to reduce contention, and adopt a str…
Browse files Browse the repository at this point in the history
…ategy used by the WorkerGroup class to keep the thread count low.
  • Loading branch information
broneill committed Oct 18, 2024
1 parent 440a7d3 commit f047dfa
Showing 1 changed file with 135 additions and 56 deletions.
191 changes: 135 additions & 56 deletions src/main/java/org/cojen/tupl/core/PendingTxnFinisher.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,13 @@

package org.cojen.tupl.core;

import java.lang.invoke.MethodHandles;
import java.lang.invoke.VarHandle;

import java.util.concurrent.ThreadLocalRandom;

import org.cojen.tupl.io.Utils;

import org.cojen.tupl.util.Latch;
import org.cojen.tupl.util.Runner;

Expand All @@ -26,96 +33,168 @@
* @author Brian S O'Neill
*/
/*P*/
final class PendingTxnFinisher extends Latch implements Runnable {
private final int mMaxThreads;
private final Latch.Condition mIdleCondition;

private PendingTxn mFirst, mLast;

private int mTotalThreads;
final class PendingTxnFinisher extends Latch {
private final Worker[] mWorkers;
private final long mMaxLag;
private int mLastSelected;

PendingTxnFinisher(int maxThreads) {
mMaxThreads = maxThreads;
mIdleCondition = new Latch.Condition();
mWorkers = new Worker[maxThreads];
for (int i=0; i<mWorkers.length; i++) {
mWorkers[i] = new Worker();
}
mMaxLag = 1_000_000; // TODO: configurable?
}

void enqueue(PendingTxn first, PendingTxn last) {
long lastCommitPos = last.commitPos();

acquireExclusive();
try {
if (mLast == null) {
mFirst = first;
} else {
mLast.setNextPlain(first);
}
mLast = last;
if (mIdleCondition.isEmpty() && mTotalThreads < mMaxThreads) {
Runner.start("PendingTxnFinisher", this);
mTotalThreads++;
} else {
mIdleCondition.signal(this);
// Start the search just lower than the last one selected, to drive tasks towards the
// lower workers. The higher workers can then idle and allow their threads to exit.
int slot = Math.max(0, mLastSelected - 1);

for (int i=0; i<mWorkers.length; i++) {
Worker w = mWorkers[slot];
if (lastCommitPos - w.commitPos() <= mMaxLag) {
w.enqueue(first, last);
mLastSelected = slot;
return;
}
slot++;
if (slot >= mWorkers.length) {
slot = 0;
}
}

mLastSelected = slot = ThreadLocalRandom.current().nextInt(mWorkers.length);

mWorkers[slot].enqueue(first, last);
} finally {
releaseExclusive();
}
}

/**
* Signal up all threads, to help them exit sooner.
* Signal all threads, to help them exit sooner.
*/
void interrupt() {
acquireExclusive();
try {
mIdleCondition.signalAll(this);
} finally {
releaseExclusive();
for (Worker w : mWorkers) {
w.interrupt();
}
}

@Override
public void run() {
while (true) {
int awaitResult = 1; // signaled
private static class Worker extends Latch implements Runnable {
private static final VarHandle cCommitPosHandle;

static {
try {
var lookup = MethodHandles.lookup();
cCommitPosHandle = lookup.findVarHandle(Worker.class, "mCommitPos", long.class);
} catch (Throwable e) {
throw Utils.rethrow(e);
}
}

private final Latch.Condition mIdleCondition;

PendingTxn first, last;
private PendingTxn mFirst, mLast;
private long mCommitPos;
private boolean mRunning;

private Worker() {
mIdleCondition = new Latch.Condition();
mCommitPos = Long.MAX_VALUE;
}

long commitPos() {
return (long) cCommitPosHandle.getOpaque(this);
}

void enqueue(PendingTxn first, PendingTxn last) {
acquireExclusive();
try {
while (true) {
first = mFirst;
if (first != null) {
last = mLast;
mFirst = null;
mLast = null;
break;
}
if (mLast == null) {
mFirst = first;
cCommitPosHandle.setOpaque(this, first.commitPos());
} else {
mLast.setNextPlain(first);
}

if (awaitResult <= 0) { // interrupted or timed out
mTotalThreads--;
return;
}
mLast = last;

// Use priorityAwait in order to force some threads to do less work,
// allowing them to exit when idle. The total amount of threads will more
// closely match the amount that's needed.
long nanosTimeout = 10_000_000_000L;
long nanosEnd = System.nanoTime() + nanosTimeout;
awaitResult = mIdleCondition.priorityAwait(this, nanosTimeout, nanosEnd);
if (!mRunning) {
try {
Runner.start("PendingTxnFinisher", this);
mRunning = true;
} catch (Throwable e) {
// Possibly out of memory. Try again later.
}
} else {
mIdleCondition.signal(this);
}
} finally {
releaseExclusive();
}
}

void interrupt() {
acquireExclusive();
mIdleCondition.signal(this);
releaseExclusive();
}

@Override
public void run() {
while (true) {
int awaitResult = 1; // signaled
long delta = 0;

PendingTxn first, last;

acquireExclusive();
try {
first.run();
} catch (Throwable e) {
// PendingTxn should catch and report any exceptions, but just in case
// something leaks out, ignore it and move on.
while (true) {
first = mFirst;
if (first != null) {
last = mLast;
mFirst = null;
mLast = null;
cCommitPosHandle.setOpaque(this, last.commitPos());
break;
}

// Indicate that this worker is all caught up.
cCommitPosHandle.setOpaque(this, Long.MAX_VALUE);

if (awaitResult <= 0) { // interrupted or timed out
mRunning = false;
return;
}

long nanosTimeout = 10_000_000_000L;
long nanosEnd = System.nanoTime() + nanosTimeout;
awaitResult = mIdleCondition.await(this, nanosTimeout, nanosEnd);
}
} finally {
releaseExclusive();
}
if (first == last) {
break;

delta = last.commitPos() - first.commitPos();

while (true) {
try {
first.run();
} catch (Throwable e) {
// PendingTxn should catch and report any exceptions, but just in case
// something leaks out, ignore it and move on.
}
if (first == last) {
break;
}
first = first.getNextPlain();
}
first = first.getNextPlain();
}
}
}
Expand Down

0 comments on commit f047dfa

Please sign in to comment.