Skip to content

Commit

Permalink
Removed creation of recovery source in all cases
Browse files Browse the repository at this point in the history
Signed-off-by: Navneet Verma <navneev@amazon.com>
  • Loading branch information
navneet1v committed Apr 22, 2024
1 parent 2d23312 commit 6c5896a
Show file tree
Hide file tree
Showing 6 changed files with 34 additions and 41 deletions.
11 changes: 8 additions & 3 deletions server/src/main/java/org/opensearch/NewSegmentMergePolicy.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,23 @@
*/
public class NewSegmentMergePolicy extends FilterMergePolicy {
private boolean mergeCompleted;

public NewSegmentMergePolicy(MergePolicy in) {
super(in);
mergeCompleted = false;
}


/**
* Rnad doc
*/
@Override
public MergeSpecification findForcedMerges(SegmentInfos segmentInfos, int maxSegmentCount, Map<SegmentCommitInfo, Boolean> segmentsToMerge, MergeContext mergeContext) throws IOException {
if(mergeCompleted) {
public MergeSpecification findForcedMerges(
SegmentInfos segmentInfos,
int maxSegmentCount,
Map<SegmentCommitInfo, Boolean> segmentsToMerge,
MergeContext mergeContext
) throws IOException {
if (mergeCompleted) {
return null;
}
mergeCompleted = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,6 @@

package org.opensearch.action.admin.indices.forcemerge;

import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.MergePolicy;
import org.apache.lucene.index.SegmentCommitInfo;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.broadcast.node.TransportBroadcastByNodeAction;
import org.opensearch.cluster.ClusterState;
Expand All @@ -46,17 +42,14 @@
import org.opensearch.cluster.routing.ShardsIterator;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.lucene.Lucene;
import org.opensearch.core.action.support.DefaultShardOperationFailedException;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.index.engine.Engine;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.indices.IndicesService;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

/**
Expand Down
18 changes: 8 additions & 10 deletions server/src/main/java/org/opensearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.MergePolicy;
import org.apache.lucene.index.NoMergePolicy;
import org.apache.lucene.index.SegmentCommitInfo;
import org.apache.lucene.index.SegmentInfos;
Expand Down Expand Up @@ -1280,16 +1279,15 @@ public abstract void forceMerge(
) throws EngineException, IOException;

public void forceMerge(
boolean isOneMerge,
boolean flush,
int maxNumSegments,
boolean onlyExpungeDeletes,
boolean upgrade,
boolean upgradeOnlyAncientSegments,
@Nullable String forceMergeUUID
boolean isOneMerge,
boolean flush,
int maxNumSegments,
boolean onlyExpungeDeletes,
boolean upgrade,
boolean upgradeOnlyAncientSegments,
@Nullable String forceMergeUUID
) throws EngineException, IOException {
this.forceMerge(flush, maxNumSegments, onlyExpungeDeletes, upgrade, upgradeOnlyAncientSegments,
forceMergeUUID);
this.forceMerge(flush, maxNumSegments, onlyExpungeDeletes, upgrade, upgradeOnlyAncientSegments, forceMergeUUID);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2117,15 +2117,14 @@ final Map<BytesRef, VersionValue> getVersionMap() {
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}


public void forceMerge(
boolean isOneMerge,
boolean flush,
int maxNumSegments,
boolean onlyExpungeDeletes,
boolean upgrade,
boolean upgradeOnlyAncientSegments,
@Nullable String forceMergeUUID
boolean isOneMerge,
boolean flush,
int maxNumSegments,
boolean onlyExpungeDeletes,
boolean upgrade,
boolean upgradeOnlyAncientSegments,
@Nullable String forceMergeUUID
) throws EngineException, IOException {
/*
* We do NOT acquire the readlock here since we are waiting on the merges to finish
Expand All @@ -2139,7 +2138,7 @@ public void forceMerge(
* syncs calls to findForcedMerges.
*/
assert indexWriter.getConfig().getMergePolicy() instanceof OpenSearchMergePolicy : "MergePolicy is "
+ indexWriter.getConfig().getMergePolicy().getClass().getName();
+ indexWriter.getConfig().getMergePolicy().getClass().getName();
OpenSearchMergePolicy mp = (OpenSearchMergePolicy) indexWriter.getConfig().getMergePolicy();
optimizeLock.lock();
try {
Expand All @@ -2150,7 +2149,7 @@ public void forceMerge(
}
store.incRef(); // increment the ref just to ensure nobody closes the store while we optimize
try {
if(isOneMerge) {
if (isOneMerge) {
MergePolicy oldMergePolicy = indexWriter.getConfig().getMergePolicy();
indexWriter.getConfig().setMergePolicy(new NewSegmentMergePolicy(indexWriter.getConfig().getMergePolicy()));
indexWriter.forceMerge(1);
Expand Down Expand Up @@ -2199,8 +2198,6 @@ public void forceMerge(
}
}



@Override
public void forceMerge(
final boolean flush,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
package org.opensearch.index.mapper;

import org.apache.lucene.document.FieldType;
import org.apache.lucene.document.NumericDocValuesField;
import org.apache.lucene.document.StoredField;
import org.apache.lucene.index.IndexOptions;
import org.apache.lucene.search.Query;
Expand Down Expand Up @@ -211,13 +210,13 @@ public void preParse(ParseContext context) throws IOException {
final BytesRef ref = adaptedSource.toBytesRef();
context.doc().add(new StoredField(fieldType().name(), ref.bytes, ref.offset, ref.length));
}

if (originalSource != null && adaptedSource != originalSource) {
// if we omitted source or modified it we add the _recovery_source to ensure we have it for ops based recovery
BytesRef ref = originalSource.toBytesRef();
context.doc().add(new StoredField(RECOVERY_SOURCE_NAME, ref.bytes, ref.offset, ref.length));
context.doc().add(new NumericDocValuesField(RECOVERY_SOURCE_NAME, 1));
}
// Disbaling the recovery source here to ensure that if we disable the source nothing is stored during indexing
// if (originalSource != null && adaptedSource != originalSource) {
// // if we omitted source or modified it we add the _recovery_source to ensure we have it for ops based recovery
// BytesRef ref = originalSource.toBytesRef();
// context.doc().add(new StoredField(RECOVERY_SOURCE_NAME, ref.bytes, ref.offset, ref.length));
// context.doc().add(new NumericDocValuesField(RECOVERY_SOURCE_NAME, 1));
// }
}

@Nullable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1584,7 +1584,8 @@ public void forceMerge(ForceMergeRequest forceMerge) throws IOException {
logger.trace("force merge with {}", forceMerge);
}
Engine engine = getEngine();
engine.forceMerge(forceMerge.isOneMerge,
engine.forceMerge(
forceMerge.isOneMerge,
forceMerge.flush(),
forceMerge.maxNumSegments(),
forceMerge.onlyExpungeDeletes(),
Expand Down

0 comments on commit 6c5896a

Please sign in to comment.