diff --git a/server/src/main/java/org/opensearch/index/engine/Engine.java b/server/src/main/java/org/opensearch/index/engine/Engine.java index c945d082c9a35..1d163f65313ca 100644 --- a/server/src/main/java/org/opensearch/index/engine/Engine.java +++ b/server/src/main/java/org/opensearch/index/engine/Engine.java @@ -1518,8 +1518,9 @@ public String getLowercase() { private final VersionType versionType; private final Origin origin; private final long startTime; + private final InternalEngine.IndexingStrategy indexingStrategy; - public Operation(Term uid, long seqNo, long primaryTerm, long version, VersionType versionType, Origin origin, long startTime) { + public Operation(Term uid, long seqNo, long primaryTerm, long version, VersionType versionType, Origin origin, long startTime, InternalEngine.IndexingStrategy indexingStrategy) { this.uid = uid; this.seqNo = seqNo; this.primaryTerm = primaryTerm; @@ -1527,6 +1528,7 @@ public Operation(Term uid, long seqNo, long primaryTerm, long version, VersionTy this.versionType = versionType; this.origin = origin; this.startTime = startTime; + this.indexingStrategy = indexingStrategy; } /** @@ -1587,6 +1589,10 @@ public long startTime() { abstract String id(); public abstract TYPE operationType(); + + public InternalEngine.IndexingStrategy indexingStrategy() { + return indexingStrategy; + }; } /** @@ -1617,7 +1623,25 @@ public Index( long ifSeqNo, long ifPrimaryTerm ) { - super(uid, seqNo, primaryTerm, version, versionType, origin, startTime); + this(uid, doc, seqNo, primaryTerm, version, versionType, origin, startTime, autoGeneratedIdTimestamp, isRetry, ifSeqNo, ifPrimaryTerm, null); + } + + public Index( + Term uid, + ParsedDocument doc, + long seqNo, + long primaryTerm, + long version, + VersionType versionType, + Origin origin, + long startTime, + long autoGeneratedIdTimestamp, + boolean isRetry, + long ifSeqNo, + long ifPrimaryTerm, + InternalEngine.IndexingStrategy indexingStrategy + ) { + super(uid, seqNo, primaryTerm, version, versionType, origin, startTime, indexingStrategy); assert (origin == Origin.PRIMARY) == (versionType != null) : "invalid version_type=" + versionType + " for origin=" + origin; assert ifPrimaryTerm >= 0 : "ifPrimaryTerm [" + ifPrimaryTerm + "] must be non negative"; assert ifSeqNo == UNASSIGNED_SEQ_NO || ifSeqNo >= 0 : "ifSeqNo [" + ifSeqNo + "] must be non negative or unset"; @@ -1630,6 +1654,7 @@ public Index( this.ifPrimaryTerm = ifPrimaryTerm; } + public Index(Term uid, long primaryTerm, ParsedDocument doc) { this(uid, primaryTerm, doc, Versions.MATCH_ANY); } // TEST ONLY @@ -1706,6 +1731,8 @@ public long getIfSeqNo() { public long getIfPrimaryTerm() { return ifPrimaryTerm; } + + } /** @@ -1732,7 +1759,7 @@ public Delete( long ifSeqNo, long ifPrimaryTerm ) { - super(uid, seqNo, primaryTerm, version, versionType, origin, startTime); + super(uid, seqNo, primaryTerm, version, versionType, origin, startTime, null); assert (origin == Origin.PRIMARY) == (versionType != null) : "invalid version_type=" + versionType + " for origin=" + origin; assert ifPrimaryTerm >= 0 : "ifPrimaryTerm [" + ifPrimaryTerm + "] must be non negative"; assert ifSeqNo == UNASSIGNED_SEQ_NO || ifSeqNo >= 0 : "ifSeqNo [" + ifSeqNo + "] must be non negative or unset"; @@ -1812,7 +1839,7 @@ public String reason() { } public NoOp(final long seqNo, final long primaryTerm, final Origin origin, final long startTime, final String reason) { - super(null, seqNo, primaryTerm, Versions.NOT_FOUND, null, origin, startTime); + super(null, seqNo, primaryTerm, Versions.NOT_FOUND, null, origin, startTime, null); this.reason = reason; } diff --git a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java index c6fc07629ae4e..51c0974342341 100644 --- a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java @@ -684,7 +684,7 @@ enum OpVsLuceneDocStatus { /** the op is more recent than the one that last modified the doc found in lucene*/ OP_NEWER, /** the op is older or the same as the one that last modified the doc found in lucene*/ - OP_STALE_OR_EQUAL, + OP_STALE_OR_EQUAL,// 一样,或者更旧的 /** no doc was found in lucene */ LUCENE_DOC_NOT_FOUND } @@ -692,8 +692,8 @@ enum OpVsLuceneDocStatus { private static OpVsLuceneDocStatus compareOpToVersionMapOnSeqNo(String id, long seqNo, long primaryTerm, VersionValue versionValue) { Objects.requireNonNull(versionValue); if (seqNo > versionValue.seqNo) { - return OpVsLuceneDocStatus.OP_NEWER; - } else if (seqNo == versionValue.seqNo) { + return OpVsLuceneDocStatus.OP_NEWER;// 新的 + } else if (seqNo == versionValue.seqNo) {// 一样 assert versionValue.term == primaryTerm : "primary term not matched; id=" + id + " seq_no=" @@ -711,12 +711,13 @@ private static OpVsLuceneDocStatus compareOpToVersionMapOnSeqNo(String id, long private OpVsLuceneDocStatus compareOpToLuceneDocBasedOnSeqNo(final Operation op) throws IOException { assert op.seqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO : "resolving ops based on seq# but no seqNo is found"; final OpVsLuceneDocStatus status; - VersionValue versionValue = getVersionFromMap(op.uid().bytes()); + + VersionValue versionValue = getVersionFromMap(op.uid().bytes());// 从maps中找下,看可以找到吗 assert incrementVersionLookup(); boolean segRepEnabled = engineConfig.getIndexSettings().isSegRepEnabledOrRemoteNode(); - if (versionValue != null) { + if (versionValue != null) {// maps中找到了 status = compareOpToVersionMapOnSeqNo(op.id(), op.seqNo(), op.primaryTerm(), versionValue); - } else { + } else {// 没找到 // load from index assert incrementIndexVersionLookup(); try (Searcher searcher = acquireSearcher("load_seq_no", SearcherScope.INTERNAL)) { @@ -906,10 +907,11 @@ public IndexResult index(Index index) throws IOException { index.getAutoGeneratedIdTimestamp(), index.isRetry(), index.getIfSeqNo(), - index.getIfPrimaryTerm() + index.getIfPrimaryTerm(), + plan ); - final boolean toAppend = plan.indexIntoLucene && plan.useLuceneUpdateDocument == false; + final boolean toAppend = plan.indexIntoLucene && plan.useLuceneUpdateDocument == false;// 不带主键的写入 if (toAppend == false) { advanceMaxSeqNoOfUpdatesOrDeletesOnPrimary(index.seqNo()); } @@ -987,7 +989,7 @@ protected final IndexingStrategy planIndexingAsNonPrimary(Index index) throws IO assert assertNonPrimaryOrigin(index); // needs to maintain the auto_id timestamp in case this replica becomes primary if (canOptimizeAddDocument(index)) {// 如果不带主键的写入,肯定可以优化 - mayHaveBeenIndexedBefore(index); + mayHaveBeenIndexedBefore(index);// 可能更新maxUnsafeAutoIdTimestamp和必须更新maxSeenAutoIdTimestamp } final IndexingStrategy plan; // unlike the primary, replicas don't really care to about creation status of documents @@ -1011,7 +1013,7 @@ protected final IndexingStrategy planIndexingAsNonPrimary(Index index) throws IO boolean segRepEnabled = engineConfig.getIndexSettings().isSegRepEnabledOrRemoteNode(); versionMap.enforceSafeAccess(); final OpVsLuceneDocStatus opVsLucene = compareOpToLuceneDocBasedOnSeqNo(index); - if (opVsLucene == OpVsLuceneDocStatus.OP_STALE_OR_EQUAL) { + if (opVsLucene == OpVsLuceneDocStatus.OP_STALE_OR_EQUAL) {// seqNo小于等于当前已经写入的 if (segRepEnabled) { // For segrep based indices, we can't completely rely on localCheckpointTracker // as the preserved checkpoint may not have all the operations present in lucene @@ -1020,7 +1022,7 @@ protected final IndexingStrategy planIndexingAsNonPrimary(Index index) throws IO } else { plan = IndexingStrategy.processAsStaleOp(index.version()); } - } else { + } else {// 更新的,或者没找到 plan = IndexingStrategy.processNormally(opVsLucene == OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND, index.version(), 0); } }