diff --git a/server/src/main/java/org/elasticsearch/index/engine/LiveVersionMap.java b/server/src/main/java/org/elasticsearch/index/engine/LiveVersionMap.java index da8fbcc45cb05..5315dc479e2ff 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/LiveVersionMap.java +++ b/server/src/main/java/org/elasticsearch/index/engine/LiveVersionMap.java @@ -23,7 +23,7 @@ import java.util.concurrent.atomic.AtomicLong; /** Maps _uid value to its version information. */ -public final class LiveVersionMap implements ReferenceManager.RefreshListener, Accountable { +public class LiveVersionMap implements ReferenceManager.RefreshListener, Accountable { private final KeyedLock keyedLock = new KeyedLock<>(); @@ -202,7 +202,12 @@ Maps invalidateOldMapForAssert() { */ Maps invalidateOldMap(LiveVersionMapArchive archive) { archive.afterRefresh(old); - return new Maps(current, VersionLookup.EMPTY, previousMapsNeededSafeAccess); + Maps result = new Maps(current, VersionLookup.EMPTY, previousMapsNeededSafeAccess); + // not JMM compatible, similar to beforeRefresh + if (needsSafeAccess) { + result.needsSafeAccess = true; + } + return result; } void put(BytesRef uid, VersionValue version) { @@ -282,7 +287,15 @@ public void beforeRefresh() throws IOException { // map. While reopen is running, any lookup will first // try this new map, then fallback to old, then to the // current searcher: - maps = maps.buildTransitionMap(); + Maps original = maps; + Maps transitionMap = original.buildTransitionMap(); + maps = transitionMap; + // this is still not JMM safe, but makes the test pass. There are a few options: + // 1. Do read then modify instead of writing to it in enforceSafeAccess. The read can be non-volatile, the write volatile. + // 2. Make the field volatile (but the comment on it seems to indicate that it would be bad for perf). + if (original.needsSafeAccess) { + transitionMap.needsSafeAccess = true; + } assert (unsafeKeysMap = unsafeKeysMap.buildTransitionMap()) != null; // This is not 100% correct, since concurrent indexing ops can change these counters in between our execution of the previous // line and this one, but that should be minor, and the error won't accumulate over time: @@ -345,7 +358,14 @@ boolean isUnsafe() { } void enforceSafeAccess() { - maps.needsSafeAccess = true; + Maps copy = maps; + copy.needsSafeAccess = true; + Maps nextCopy; + // loop until we have the same maps after the assignment + while ((nextCopy = maps) != copy) { + nextCopy.needsSafeAccess = true; + copy = nextCopy; + } } boolean isSafeAccessRequired() { diff --git a/server/src/test/java/org/elasticsearch/index/engine/LiveVersionMapTests.java b/server/src/test/java/org/elasticsearch/index/engine/LiveVersionMapTests.java index b6be13b9f2513..604ec6c4d1987 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/LiveVersionMapTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/LiveVersionMapTests.java @@ -16,6 +16,7 @@ import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.core.Releasable; import org.elasticsearch.core.Tuple; +import org.elasticsearch.index.mapper.Uid; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.test.ESTestCase; @@ -510,4 +511,42 @@ public void testVersionMapReclaimableRamBytes() throws IOException { assertEquals(map.reclaimableRefreshRamBytes(), 0L); assertEquals(map.ramBytesUsedForRefresh(), 0L); } + + /** + * When we only do operations that enforce safe access, we expect to stay as a safe map. + */ + public void testNotUnsafeConcurrently() throws InterruptedException { + LiveVersionMap map = new LiveVersionMap(); + AtomicBoolean running = new AtomicBoolean(true); + Thread refresher = new Thread(() -> { + while (running.get()) { + try { + map.beforeRefresh(); + map.afterRefresh(true); + } catch (IOException e) { + fail(e); + throw new RuntimeException(e); + } + } + }); + + refresher.start(); + try { + // 1000 is enough to provoke original version + for (int i = 0; i < 100000; ++i) { + BytesRef uid = Uid.encodeId(randomIdentifier()); + try (Releasable releasable = map.acquireLock(uid)) { + map.enforceSafeAccess(); + map.maybePutIndexUnderLock(uid, new IndexVersionValue(null, 0, 0, 0)); + assertFalse(map.isUnsafe()); + } + } + + } finally { + running.set(false); + refresher.join(10000); + + } + assertFalse(refresher.isAlive()); + } }