Skip to content

Commit

Permalink
LUCENE-7671 - Enhance UpgradeIndexMergePolicy with additional options
Browse files Browse the repository at this point in the history
  • Loading branch information
kelaban committed Jan 31, 2017
1 parent 0187838 commit c012074
Show file tree
Hide file tree
Showing 6 changed files with 216 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,8 @@ public void testCreateIndexWithDocValuesUpdates() throws Exception {

// TODO: on 6.0.0 release, gen the single segment indices and add here:
final static String[] oldSingleSegmentNames = {
"6.0.0.singlesegment-cfs",
"6.0.0.singlesegment-nocfs"
};

static Map<String,Directory> oldIndexDirs;
Expand Down Expand Up @@ -456,8 +458,9 @@ public static void beforeClass() throws Exception {
oldIndexDirs = new HashMap<>();
for (String name : names) {
Path dir = createTempDir(name);
InputStream resource = TestBackwardsCompatibility.class.getResourceAsStream("index." + name + ".zip");
assertNotNull("Index name " + name + " not found", resource);
String nameOnDisk = "index." + name + ".zip";
InputStream resource = TestBackwardsCompatibility.class.getResourceAsStream(nameOnDisk);
assertNotNull("Index name " + nameOnDisk + " not found", resource);
TestUtil.unzip(resource, dir);
oldIndexDirs.put(name, newFSDirectory(dir));
}
Expand Down Expand Up @@ -1281,14 +1284,18 @@ public void testNumericFields() throws Exception {
}

private int checkAllSegmentsUpgraded(Directory dir) throws IOException {
return checkAllSegmentsMatchVersion(dir, Version.LATEST);
}

private int checkAllSegmentsMatchVersion(Directory dir, Version version) throws IOException {
final SegmentInfos infos = SegmentInfos.readLatestCommit(dir);
if (VERBOSE) {
System.out.println("checkAllSegmentsUpgraded: " + infos);
System.out.println("checkAllSegmentsMatchVersion: " + version + "-" + infos);
}
for (SegmentCommitInfo si : infos) {
assertEquals(Version.LATEST, si.info.getVersion());
assertEquals(version, si.info.getVersion());
}
assertEquals(Version.LATEST, infos.getCommitLuceneVersion());
assertEquals(version, infos.getCommitLuceneVersion());
return infos.size();
}

Expand All @@ -1306,14 +1313,59 @@ public void testUpgradeOldIndex() throws Exception {
System.out.println("testUpgradeOldIndex: index=" +name);
}
Directory dir = newDirectory(oldIndexDirs.get(name));

int numSegmentsBefore = SegmentInfos.readLatestCommit(dir).size();

newIndexUpgrader(dir).upgrade(Integer.MAX_VALUE);

newIndexUpgrader(dir).upgrade();
assertEquals(numSegmentsBefore, checkAllSegmentsUpgraded(dir));

dir.close();
}
}

public void testUpgradeWithExcplicitUpgrades() throws Exception {
List<String> names = new ArrayList<>(oldNames.length + oldSingleSegmentNames.length);
names.addAll(Arrays.asList(oldNames));
names.addAll(Arrays.asList(oldSingleSegmentNames));
for(String name : names) {
if (VERBOSE) {
System.out.println("testUpgradeWithExcplicitUpgrades: index=" +name);
}
Directory dir = newDirectory(oldIndexDirs.get(name));

SegmentInfos infosBefore = SegmentInfos.readLatestCommit(dir);
int numSegmentsBefore = infosBefore.size();
Version versionBefore = infosBefore.getCommitLuceneVersion();

assertFalse("Excpected these segments to be an old version", versionBefore.equals(Version.LATEST));

UpgradeIndexMergePolicy uimp = new UpgradeIndexMergePolicy(NoMergePolicy.INSTANCE);

uimp.setRequireExplicitUpgrades(true);
uimp.setIgnoreNewSegments(true);

assertEquals(numSegmentsBefore, checkAllSegmentsMatchVersion(dir, versionBefore));

try (IndexWriter w = new IndexWriter(dir, new IndexWriterConfig(new MockAnalyzer(random())).setMergePolicy(uimp))) {
w.forceMerge(numSegmentsBefore); // Don't optimize just upgrade
}

assertEquals(numSegmentsBefore, checkAllSegmentsMatchVersion(dir, versionBefore));

checkAllSegmentsUpgraded(dir);
uimp.setUpgradeInProgress(true); // Turn on upgrades

try (IndexWriter w = new IndexWriter(dir, new IndexWriterConfig(new MockAnalyzer(random())).setMergePolicy(uimp))) {
w.forceMerge(numSegmentsBefore); // Don't optimize just upgrade
}

assertEquals(numSegmentsBefore, checkAllSegmentsUpgraded(dir));

dir.close();
}
}

// Write a test that checks that the underlying policy gets delegated to??

public void testCommandLineArgs() throws Exception {

Expand Down
Binary file not shown.
Binary file not shown.
12 changes: 9 additions & 3 deletions lucene/core/src/java/org/apache/lucene/index/IndexUpgrader.java
Original file line number Diff line number Diff line change
Expand Up @@ -147,9 +147,13 @@ public IndexUpgrader(Directory dir, IndexWriterConfig iwc, boolean deletePriorCo
this.iwc = iwc;
this.deletePriorCommits = deletePriorCommits;
}

public void upgrade() throws IOException {
this.upgrade(1);
}

/** Perform the upgrade. */
public void upgrade() throws IOException {
public void upgrade(int maxSegements) throws IOException {
if (!DirectoryReader.indexExists(dir)) {
throw new IndexNotFoundException(dir.toString());
}
Expand All @@ -161,15 +165,17 @@ public void upgrade() throws IOException {
}
}

iwc.setMergePolicy(new UpgradeIndexMergePolicy(iwc.getMergePolicy()));
UpgradeIndexMergePolicy uimp = new UpgradeIndexMergePolicy(iwc.getMergePolicy());
uimp.setIgnoreNewSegments(true);
iwc.setMergePolicy(uimp);
iwc.setIndexDeletionPolicy(new KeepOnlyLastCommitDeletionPolicy());

try (final IndexWriter w = new IndexWriter(dir, iwc)) {
InfoStream infoStream = iwc.getInfoStream();
if (infoStream.isEnabled(LOG_PREFIX)) {
infoStream.message(LOG_PREFIX, "Upgrading all pre-" + Version.LATEST + " segments of index directory '" + dir + "' to version " + Version.LATEST + "...");
}
w.forceMerge(1);
w.forceMerge(maxSegements);
if (infoStream.isEnabled(LOG_PREFIX)) {
infoStream.message(LOG_PREFIX, "All segments upgraded to version " + Version.LATEST);
infoStream.message(LOG_PREFIX, "Enforcing commit to rewrite all index metadata...");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,17 @@
import org.apache.lucene.util.Version;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Collections;
import java.util.Map;
import java.util.HashMap;

/** This {@link MergePolicy} is used for upgrading all existing segments of
* an index when calling {@link IndexWriter#forceMerge(int)}.
* All other methods delegate to the base {@code MergePolicy} given to the constructor.
* This allows for an as-cheap-as possible upgrade of an older index by only upgrading segments that
* are created by previous Lucene versions. forceMerge does no longer really merge;
* it is just used to &quot;forceMerge&quot; older segment versions away.
* are created by previous Lucene versions. forceMerge in part still delegates to the wrapped {@code MergePolicy};
* It will ask the wrapped policy for segments to merge, and the left over segments will be rewritten with the latest version
* (i.e. merged with themselves).
* <p>In general one would use {@link IndexUpgrader}, but for a fully customizeable upgrade,
* you can use this like any other {@code MergePolicy} and call {@link IndexWriter#forceMerge(int)}:
* <pre class="prettyprint lang-java">
Expand All @@ -40,6 +40,10 @@
* w.forceMerge(1);
* w.close();
* </pre>
* <p> The above example would result in a single segment in the latest version. However take this scenario:
* If there were 10 segments in the index and they all need upgrade, calling w.forceMerge(10) would leave 10 segments
* in the index written with the latest lucene version. Calling w.forceMerge(5) would delegate wrapped merge policy
* to determine which segments should be merged together, the remaining segments will be upgraded (rewritten) if need be.
* <p><b>Warning:</b> This merge policy may reorder documents if the index was partially
* upgraded before calling forceMerge (e.g., documents were added). If your application relies
* on &quot;monotonicity&quot; of doc IDs (which means that the order in which the documents
Expand All @@ -49,13 +53,63 @@
* @see IndexUpgrader
*/
public class UpgradeIndexMergePolicy extends MergePolicyWrapper {

private int maxUpgradesAtATime = 5;
private volatile boolean upgradeInProgress = false;
private volatile boolean requireExplicitUpgrades = false;
private boolean ignoreNewSegments = false;

/** Wrap the given {@link MergePolicy} and intercept forceMerge requests to
* only upgrade segments written with previous Lucene versions. */
public UpgradeIndexMergePolicy(MergePolicy in) {
super(in);
}

/**
* Sets whether an explicit call to {@link #setUpgradeInProgress(boolean)} must
* be called before {@link #findForcedMerges(SegmentInfos, int, Map, IndexWriter)} in order for
* an upgrade to initiate. Otherwise every request for a force merge will trigger and upgrade investigation
*
* This option is recommended if using UpgradeIndexMergePolicy as the default merge policy and fine grained control
* over when an upgrade is initiated is required
*
* @param requireExplicitUpgrades whether or not setting upgrades in progress is required: Default false
*/
public void setRequireExplicitUpgrades(boolean requireExplicitUpgrades) {
this.requireExplicitUpgrades = requireExplicitUpgrades;
}

/**
* Set whether or not it is ok for this merge policy to do an upgrade. This
* option needs to enabled before doing a force merge for an upgrade to initiate.
*
* This option has no effect when {@code requireExplicitUpgrades} is disabled
*
* @param upgradeInProgress allow this policy to upgrade segments: Default false
*/
public void setUpgradeInProgress(boolean upgradeInProgress) {
this.upgradeInProgress = upgradeInProgress;
}

/**
* How many segment upgrades should be commited for scheduling at a time. If more segments
* than maxUpgradeSegments need to be upgraded this merge policy relies on IndexWriters cascaded
* requests to find segments to merge. Submitting a few segments at a time allows segments in need
* of an upgrade to remain candidates for a natrually triggered merge.
*
* @param maxUpgradesAtATime how many segment upgrades should be commited for scheduling at a time: Default 5
*/
public void setMaxUpgradesAtATime(int maxUpgradesAtATime) {
this.maxUpgradesAtATime = maxUpgradesAtATime;
}

/**
* @param ignoreNewSegments Whether or not this merge policy should ignore already upgraded segments when force merging: Default false
*/
public void setIgnoreNewSegments(boolean ignoreNewSegments) {
this.ignoreNewSegments = ignoreNewSegments;
}

/** Returns if the given segment should be upgraded. The default implementation
* will return {@code !Version.LATEST.equals(si.getVersion())},
* so all segments created with a different version number than this Lucene version will
Expand All @@ -64,62 +118,115 @@ public UpgradeIndexMergePolicy(MergePolicy in) {
protected boolean shouldUpgradeSegment(SegmentCommitInfo si) {
return !Version.LATEST.equals(si.info.getVersion());
}

@Override
public MergeSpecification findMerges(MergeTrigger mergeTrigger, SegmentInfos segmentInfos, IndexWriter writer) throws IOException {
return in.findMerges(null, segmentInfos, writer);
}

@Override
public MergeSpecification findForcedMerges(SegmentInfos segmentInfos, int maxSegmentCount, Map<SegmentCommitInfo,Boolean> segmentsToMerge, IndexWriter writer) throws IOException {
// first find all old segments
final Map<SegmentCommitInfo,Boolean> oldSegments = new HashMap<>();
for (final SegmentCommitInfo si : segmentInfos) {
final Boolean v = segmentsToMerge.get(si);
if (v != null && shouldUpgradeSegment(si)) {
oldSegments.put(si, v);
}
}

if (verbose(writer)) {
message("findForcedMerges: segmentsToUpgrade=" + oldSegments, writer);
}

if (oldSegments.isEmpty())
return null;

MergeSpecification spec = in.findForcedMerges(segmentInfos, maxSegmentCount, oldSegments, writer);
// Find segments to merge in directory, unless we are ignoring
// newer segments. If new segments are ignored, first old
// segments need to be discovered.
MergeSpecification spec = ignoreNewSegments ? null : in.findForcedMerges(segmentInfos, maxSegmentCount, segmentsToMerge, writer);

if (spec != null) {
// remove all segments that are in merge specification from oldSegments,
// the resulting set contains all segments that are left over
// and will be merged to one additional segment:
for (final OneMerge om : spec.merges) {
oldSegments.keySet().removeAll(om.segments);
if(upgradeInProgress || !requireExplicitUpgrades) {

// first find all old segments
final Map<SegmentCommitInfo,Boolean> oldSegments = findSegmentsNeedingUpgrade(segmentInfos, segmentsToMerge);

if (verbose(writer)) {
message("findForcedMerges: segmentsToUpgrade=" + oldSegments, writer);
}

if (oldSegments.isEmpty()) {
upgradeInProgress = false; // Nothing to upgrade
return spec;
}

if(ignoreNewSegments) {
// Ask the wrapped spec now to do the merge with the old segments
spec = in.findForcedMerges(segmentInfos, maxSegmentCount, oldSegments, writer);
}

if (spec != null) {
// remove all segments that are in merge specification from oldSegments,
// the resulting set contains all segments that are left over
// and will be rewritten
for (final OneMerge om : spec.merges) {
oldSegments.keySet().removeAll(om.segments);
}
}

// Add other segments missed by the wrapped merge policy to be upgraded
return maybeUpdateSpecAndUpgradeProgress(spec, oldSegments, segmentInfos, writer);

}

return spec;
}

/**
* Updates the the merge spec with old segments needing upgrade. Also sets whether or not to the upgrade needs to continue (upgradeInProgress=false)
*
* @param spec the MergeSpecification to update
* @param oldSegments the segments needing upgrade
* @param segmentInfos all segment infos
* @param writer the index writer
* @return the possibly updated MergeSpecification
*/
private MergeSpecification maybeUpdateSpecAndUpgradeProgress(MergeSpecification spec, Map<SegmentCommitInfo,Boolean> oldSegments, SegmentInfos segmentInfos, IndexWriter writer) {
if (!oldSegments.isEmpty()) {
if (verbose(writer)) {
message("findForcedMerges: " + in.getClass().getSimpleName() +
" does not want to merge all old segments, merge remaining ones into new segment: " + oldSegments, writer);
}
final List<SegmentCommitInfo> newInfos = new ArrayList<>();
for (final SegmentCommitInfo si : segmentInfos) {
if (oldSegments.containsKey(si)) {
newInfos.add(si);
}
" does not want to merge all old segments, rewrite remaining ones into upgraded segments: " + oldSegments, writer);
}
// add the final merge

if (spec == null) {
spec = new MergeSpecification();
}
spec.add(new OneMerge(newInfos));

final int numWrappedSpecMerges = spec.merges.size();

for (SegmentCommitInfo si: segmentInfos) {

if(!oldSegments.containsKey(si)) {
continue;
}

spec.add(new OneMerge(Collections.singletonList(si)));

if((spec.merges.size() - numWrappedSpecMerges) == maxUpgradesAtATime) {
return spec;
}

}

// We found we have less than the max number but greater than 0
if(spec.merges.size() > numWrappedSpecMerges) {
return spec;
}

}


// Only set this once there are 0 segments needing upgrading, because when we return a
// spec, IndexWriter may (silently!) reject that merge if some of the segments we asked
// to be merged were already being (naturally) merged:
upgradeInProgress = false;

return spec;
}

private Map<SegmentCommitInfo,Boolean> findSegmentsNeedingUpgrade(SegmentInfos segmentInfos, Map<SegmentCommitInfo,Boolean> segmentsToMerge) {
final Map<SegmentCommitInfo,Boolean> oldSegments = new HashMap<>();

for (final SegmentCommitInfo si : segmentInfos) {
final Boolean v = segmentsToMerge.get(si);
if (v != null && shouldUpgradeSegment(si)) {
oldSegments.put(si, v);
}
}

return oldSegments;
}

private boolean verbose(IndexWriter writer) {
return writer != null && writer.infoStream.isEnabled("UPGMP");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*/
package org.apache.lucene.index;


public class TestUpgradeIndexMergePolicy extends BaseMergePolicyTestCase {

public MergePolicy mergePolicy() {
Expand Down

0 comments on commit c012074

Please sign in to comment.