diff --git a/lucene/backward-codecs/src/test/org/apache/lucene/index/TestBackwardsCompatibility.java b/lucene/backward-codecs/src/test/org/apache/lucene/index/TestBackwardsCompatibility.java index 1dda6b6b7582..c69b14a52225 100644 --- a/lucene/backward-codecs/src/test/org/apache/lucene/index/TestBackwardsCompatibility.java +++ b/lucene/backward-codecs/src/test/org/apache/lucene/index/TestBackwardsCompatibility.java @@ -260,7 +260,7 @@ public void testCreateIndexWithDocValuesUpdates() throws Exception { writer.commit(); // flush every 10 docs } } - + // first segment: no updates // second segment: update two fields, same gen @@ -435,6 +435,8 @@ public void testCreateIndexWithDocValuesUpdates() throws Exception { // TODO: on 6.0.0 release, gen the single segment indices and add here: final static String[] oldSingleSegmentNames = { + "6.0.0.singlesegment-cfs", + "6.0.0.singlesegment-nocfs" }; static Map oldIndexDirs; @@ -462,8 +464,9 @@ public static void beforeClass() throws Exception { oldIndexDirs = new HashMap<>(); for (String name : names) { Path dir = createTempDir(name); - InputStream resource = TestBackwardsCompatibility.class.getResourceAsStream("index." + name + ".zip"); - assertNotNull("Index name " + name + " not found", resource); + String nameOnDisk = "index." + name + ".zip"; + InputStream resource = TestBackwardsCompatibility.class.getResourceAsStream(nameOnDisk); + assertNotNull("Index name " + nameOnDisk + " not found", resource); TestUtil.unzip(resource, dir); oldIndexDirs.put(name, newFSDirectory(dir)); } @@ -1315,14 +1318,19 @@ public void testNumericFields() throws Exception { } private int checkAllSegmentsUpgraded(Directory dir, Version indexCreatedVersion) throws IOException { + return this.checkAllSegmentsUpgraded(dir, Version.LATEST, indexCreatedVersion); + } + + private int checkAllSegmentsUpgraded(Directory dir, Version upgradedVersion, Version indexCreatedVersion) throws IOException { final SegmentInfos infos = SegmentInfos.readLatestCommit(dir); if (VERBOSE) { - System.out.println("checkAllSegmentsUpgraded: " + infos); + System.out.println("checkAllSegmentsUpgraded: " + indexCreatedVersion + "-" + infos); } for (SegmentCommitInfo si : infos) { - assertEquals(Version.LATEST, si.info.getVersion()); + assertEquals(upgradedVersion, si.info.getVersion()); } - assertEquals(Version.LATEST, infos.getCommitLuceneVersion()); + + assertEquals(upgradedVersion, infos.getCommitLuceneVersion()); assertEquals(indexCreatedVersion, infos.getIndexCreatedVersion()); return infos.size(); } @@ -1342,14 +1350,64 @@ public void testUpgradeOldIndex() throws Exception { } Directory dir = newDirectory(oldIndexDirs.get(name)); Version indexCreatedVersion = SegmentInfos.readLatestCommit(dir).getIndexCreatedVersion(); + + int numSegmentsBefore = SegmentInfos.readLatestCommit(dir).size(); + + newIndexUpgrader(dir).upgrade(Integer.MAX_VALUE); - newIndexUpgrader(dir).upgrade(); + assertEquals(numSegmentsBefore, checkAllSegmentsUpgraded(dir, indexCreatedVersion)); + + dir.close(); + } + } + + public void testUpgradeWithExcplicitUpgrades() throws Exception { + List names = new ArrayList<>(oldNames.length + oldSingleSegmentNames.length); + names.addAll(Arrays.asList(oldNames)); + names.addAll(Arrays.asList(oldSingleSegmentNames)); + for(String name : names) { + if (VERBOSE) { + System.out.println("testUpgradeWithExcplicitUpgrades: index=" +name); + } + Directory dir = newDirectory(oldIndexDirs.get(name)); + + SegmentInfos infosBefore = SegmentInfos.readLatestCommit(dir); + int numSegmentsBefore = infosBefore.size(); + Version versionBefore = infosBefore.getCommitLuceneVersion(); + Version createdVersionBefore = infosBefore.getIndexCreatedVersion(); + + assertFalse("Excpected these segments to be an old version", versionBefore.equals(Version.LATEST)); + + UpgradeIndexMergePolicy uimp = new UpgradeIndexMergePolicy(NoMergePolicy.INSTANCE); + + uimp.setRequireExplicitUpgrades(true); + uimp.setIgnoreNewSegments(true); + + assertEquals(numSegmentsBefore, checkAllSegmentsUpgraded(dir, versionBefore, createdVersionBefore)); + + try (IndexWriter w = new IndexWriter(dir, new IndexWriterConfig(new MockAnalyzer(random())).setMergePolicy(uimp))) { + w.forceMerge(numSegmentsBefore); // Don't optimize just upgrade + } + + // Upgrade should not have happened yet + assertEquals(numSegmentsBefore, checkAllSegmentsUpgraded(dir, versionBefore, createdVersionBefore)); + + uimp.setUpgradeInProgress(true); // Turn on upgrades + + try (IndexWriter w = new IndexWriter(dir, new IndexWriterConfig(new MockAnalyzer(random())).setMergePolicy(uimp))) { + w.forceMerge(numSegmentsBefore); // Don't optimize just upgrade + } - checkAllSegmentsUpgraded(dir, indexCreatedVersion); + // Upgrade should now have happened. + Version indexCreatedVersionAfter = SegmentInfos.readLatestCommit(dir).getIndexCreatedVersion(); + assertEquals(numSegmentsBefore, checkAllSegmentsUpgraded(dir, indexCreatedVersionAfter)); + dir.close(); } } + + // Write a test that checks that the underlying policy gets delegated to?? public void testCommandLineArgs() throws Exception { diff --git a/lucene/backward-codecs/src/test/org/apache/lucene/index/index.6.0.0.singlesegment-cfs.zip b/lucene/backward-codecs/src/test/org/apache/lucene/index/index.6.0.0.singlesegment-cfs.zip new file mode 100644 index 000000000000..e3eafae2bffa Binary files /dev/null and b/lucene/backward-codecs/src/test/org/apache/lucene/index/index.6.0.0.singlesegment-cfs.zip differ diff --git a/lucene/backward-codecs/src/test/org/apache/lucene/index/index.6.0.0.singlesegment-nocfs.zip b/lucene/backward-codecs/src/test/org/apache/lucene/index/index.6.0.0.singlesegment-nocfs.zip new file mode 100644 index 000000000000..8c551d2f359f Binary files /dev/null and b/lucene/backward-codecs/src/test/org/apache/lucene/index/index.6.0.0.singlesegment-nocfs.zip differ diff --git a/lucene/core/src/java/org/apache/lucene/index/IndexUpgrader.java b/lucene/core/src/java/org/apache/lucene/index/IndexUpgrader.java index 00084c880f23..ce2ebc0f7b98 100644 --- a/lucene/core/src/java/org/apache/lucene/index/IndexUpgrader.java +++ b/lucene/core/src/java/org/apache/lucene/index/IndexUpgrader.java @@ -147,9 +147,13 @@ public IndexUpgrader(Directory dir, IndexWriterConfig iwc, boolean deletePriorCo this.iwc = iwc; this.deletePriorCommits = deletePriorCommits; } + + public void upgrade() throws IOException { + this.upgrade(1); + } /** Perform the upgrade. */ - public void upgrade() throws IOException { + public void upgrade(int maxSegements) throws IOException { if (!DirectoryReader.indexExists(dir)) { throw new IndexNotFoundException(dir.toString()); } @@ -161,7 +165,9 @@ public void upgrade() throws IOException { } } - iwc.setMergePolicy(new UpgradeIndexMergePolicy(iwc.getMergePolicy())); + UpgradeIndexMergePolicy uimp = new UpgradeIndexMergePolicy(iwc.getMergePolicy()); + uimp.setIgnoreNewSegments(true); + iwc.setMergePolicy(uimp); iwc.setIndexDeletionPolicy(new KeepOnlyLastCommitDeletionPolicy()); try (final IndexWriter w = new IndexWriter(dir, iwc)) { @@ -169,7 +175,7 @@ public void upgrade() throws IOException { if (infoStream.isEnabled(LOG_PREFIX)) { infoStream.message(LOG_PREFIX, "Upgrading all pre-" + Version.LATEST + " segments of index directory '" + dir + "' to version " + Version.LATEST + "..."); } - w.forceMerge(1); + w.forceMerge(maxSegements); if (infoStream.isEnabled(LOG_PREFIX)) { infoStream.message(LOG_PREFIX, "All segments upgraded to version " + Version.LATEST); infoStream.message(LOG_PREFIX, "Enforcing commit to rewrite all index metadata..."); diff --git a/lucene/core/src/java/org/apache/lucene/index/UpgradeIndexMergePolicy.java b/lucene/core/src/java/org/apache/lucene/index/UpgradeIndexMergePolicy.java index 74cbc905695a..840a572f4c61 100644 --- a/lucene/core/src/java/org/apache/lucene/index/UpgradeIndexMergePolicy.java +++ b/lucene/core/src/java/org/apache/lucene/index/UpgradeIndexMergePolicy.java @@ -20,8 +20,7 @@ import org.apache.lucene.util.Version; import java.io.IOException; -import java.util.ArrayList; -import java.util.List; +import java.util.Collections; import java.util.Map; import java.util.HashMap; @@ -29,8 +28,9 @@ * an index when calling {@link IndexWriter#forceMerge(int)}. * All other methods delegate to the base {@code MergePolicy} given to the constructor. * This allows for an as-cheap-as possible upgrade of an older index by only upgrading segments that - * are created by previous Lucene versions. forceMerge does no longer really merge; - * it is just used to "forceMerge" older segment versions away. + * are created by previous Lucene versions. forceMerge in part still delegates to the wrapped {@code MergePolicy}; + * It will ask the wrapped policy for segments to merge, and the left over segments will be rewritten with the latest version + * (i.e. merged with themselves). *

In general one would use {@link IndexUpgrader}, but for a fully customizeable upgrade, * you can use this like any other {@code MergePolicy} and call {@link IndexWriter#forceMerge(int)}: *

@@ -40,6 +40,10 @@
   *  w.forceMerge(1);
   *  w.close();
   * 
+ *

The above example would result in a single segment in the latest version. However take this scenario: + * If there were 10 segments in the index and they all need upgrade, calling w.forceMerge(10) would leave 10 segments + * in the index written with the latest lucene version. Calling w.forceMerge(5) would delegate wrapped merge policy + * to determine which segments should be merged together, the remaining segments will be upgraded (rewritten) if need be. *

Warning: This merge policy may reorder documents if the index was partially * upgraded before calling forceMerge (e.g., documents were added). If your application relies * on "monotonicity" of doc IDs (which means that the order in which the documents @@ -49,6 +53,11 @@ * @see IndexUpgrader */ public class UpgradeIndexMergePolicy extends MergePolicyWrapper { + + private int maxUpgradesAtATime = 5; + private volatile boolean upgradeInProgress = false; + private volatile boolean requireExplicitUpgrades = false; + private boolean ignoreNewSegments = false; /** Wrap the given {@link MergePolicy} and intercept forceMerge requests to * only upgrade segments written with previous Lucene versions. */ @@ -56,6 +65,51 @@ public UpgradeIndexMergePolicy(MergePolicy in) { super(in); } + /** + * Sets whether an explicit call to {@link #setUpgradeInProgress(boolean)} must + * be called before {@link #findForcedMerges(SegmentInfos, int, Map, IndexWriter)} in order for + * an upgrade to initiate. Otherwise every request for a force merge will trigger and upgrade investigation + * + * This option is recommended if using UpgradeIndexMergePolicy as the default merge policy and fine grained control + * over when an upgrade is initiated is required + * + * @param requireExplicitUpgrades whether or not setting upgrades in progress is required: Default false + */ + public void setRequireExplicitUpgrades(boolean requireExplicitUpgrades) { + this.requireExplicitUpgrades = requireExplicitUpgrades; + } + + /** + * Set whether or not it is ok for this merge policy to do an upgrade. This + * option needs to enabled before doing a force merge for an upgrade to initiate. + * + * This option has no effect when {@code requireExplicitUpgrades} is disabled + * + * @param upgradeInProgress allow this policy to upgrade segments: Default false + */ + public void setUpgradeInProgress(boolean upgradeInProgress) { + this.upgradeInProgress = upgradeInProgress; + } + + /** + * How many segment upgrades should be commited for scheduling at a time. If more segments + * than maxUpgradeSegments need to be upgraded this merge policy relies on IndexWriters cascaded + * requests to find segments to merge. Submitting a few segments at a time allows segments in need + * of an upgrade to remain candidates for a natrually triggered merge. + * + * @param maxUpgradesAtATime how many segment upgrades should be commited for scheduling at a time: Default 5 + */ + public void setMaxUpgradesAtATime(int maxUpgradesAtATime) { + this.maxUpgradesAtATime = maxUpgradesAtATime; + } + + /** + * @param ignoreNewSegments Whether or not this merge policy should ignore already upgraded segments when force merging: Default false + */ + public void setIgnoreNewSegments(boolean ignoreNewSegments) { + this.ignoreNewSegments = ignoreNewSegments; + } + /** Returns if the given segment should be upgraded. The default implementation * will return {@code !Version.LATEST.equals(si.getVersion())}, * so all segments created with a different version number than this Lucene version will @@ -64,62 +118,115 @@ public UpgradeIndexMergePolicy(MergePolicy in) { protected boolean shouldUpgradeSegment(SegmentCommitInfo si) { return !Version.LATEST.equals(si.info.getVersion()); } - - @Override - public MergeSpecification findMerges(MergeTrigger mergeTrigger, SegmentInfos segmentInfos, IndexWriter writer) throws IOException { - return in.findMerges(null, segmentInfos, writer); - } @Override public MergeSpecification findForcedMerges(SegmentInfos segmentInfos, int maxSegmentCount, Map segmentsToMerge, IndexWriter writer) throws IOException { - // first find all old segments - final Map oldSegments = new HashMap<>(); - for (final SegmentCommitInfo si : segmentInfos) { - final Boolean v = segmentsToMerge.get(si); - if (v != null && shouldUpgradeSegment(si)) { - oldSegments.put(si, v); - } - } - if (verbose(writer)) { - message("findForcedMerges: segmentsToUpgrade=" + oldSegments, writer); - } - - if (oldSegments.isEmpty()) - return null; - - MergeSpecification spec = in.findForcedMerges(segmentInfos, maxSegmentCount, oldSegments, writer); + // Find segments to merge in directory, unless we are ignoring + // newer segments. If new segments are ignored, first old + // segments need to be discovered. + MergeSpecification spec = ignoreNewSegments ? null : in.findForcedMerges(segmentInfos, maxSegmentCount, segmentsToMerge, writer); - if (spec != null) { - // remove all segments that are in merge specification from oldSegments, - // the resulting set contains all segments that are left over - // and will be merged to one additional segment: - for (final OneMerge om : spec.merges) { - oldSegments.keySet().removeAll(om.segments); + if(upgradeInProgress || !requireExplicitUpgrades) { + + // first find all old segments + final Map oldSegments = findSegmentsNeedingUpgrade(segmentInfos, segmentsToMerge); + + if (verbose(writer)) { + message("findForcedMerges: segmentsToUpgrade=" + oldSegments, writer); + } + + if (oldSegments.isEmpty()) { + upgradeInProgress = false; // Nothing to upgrade + return spec; + } + + if(ignoreNewSegments) { + // Ask the wrapped spec now to do the merge with the old segments + spec = in.findForcedMerges(segmentInfos, maxSegmentCount, oldSegments, writer); + } + + if (spec != null) { + // remove all segments that are in merge specification from oldSegments, + // the resulting set contains all segments that are left over + // and will be rewritten + for (final OneMerge om : spec.merges) { + oldSegments.keySet().removeAll(om.segments); + } } + + // Add other segments missed by the wrapped merge policy to be upgraded + return maybeUpdateSpecAndUpgradeProgress(spec, oldSegments, segmentInfos, writer); + } + return spec; + } + + /** + * Updates the the merge spec with old segments needing upgrade. Also sets whether or not to the upgrade needs to continue (upgradeInProgress=false) + * + * @param spec the MergeSpecification to update + * @param oldSegments the segments needing upgrade + * @param segmentInfos all segment infos + * @param writer the index writer + * @return the possibly updated MergeSpecification + */ + private MergeSpecification maybeUpdateSpecAndUpgradeProgress(MergeSpecification spec, Map oldSegments, SegmentInfos segmentInfos, IndexWriter writer) { if (!oldSegments.isEmpty()) { if (verbose(writer)) { message("findForcedMerges: " + in.getClass().getSimpleName() + - " does not want to merge all old segments, merge remaining ones into new segment: " + oldSegments, writer); - } - final List newInfos = new ArrayList<>(); - for (final SegmentCommitInfo si : segmentInfos) { - if (oldSegments.containsKey(si)) { - newInfos.add(si); - } + " does not want to merge all old segments, rewrite remaining ones into upgraded segments: " + oldSegments, writer); } - // add the final merge + if (spec == null) { spec = new MergeSpecification(); } - spec.add(new OneMerge(newInfos)); + + final int numWrappedSpecMerges = spec.merges.size(); + + for (SegmentCommitInfo si: segmentInfos) { + + if(!oldSegments.containsKey(si)) { + continue; + } + + spec.add(new OneMerge(Collections.singletonList(si))); + + if((spec.merges.size() - numWrappedSpecMerges) == maxUpgradesAtATime) { + return spec; + } + + } + + // We found we have less than the max number but greater than 0 + if(spec.merges.size() > numWrappedSpecMerges) { + return spec; + } + } - + + // Only set this once there are 0 segments needing upgrading, because when we return a + // spec, IndexWriter may (silently!) reject that merge if some of the segments we asked + // to be merged were already being (naturally) merged: + upgradeInProgress = false; + return spec; } + private Map findSegmentsNeedingUpgrade(SegmentInfos segmentInfos, Map segmentsToMerge) { + final Map oldSegments = new HashMap<>(); + + for (final SegmentCommitInfo si : segmentInfos) { + final Boolean v = segmentsToMerge.get(si); + if (v != null && shouldUpgradeSegment(si)) { + oldSegments.put(si, v); + } + } + + return oldSegments; + } + private boolean verbose(IndexWriter writer) { return writer != null && writer.infoStream.isEnabled("UPGMP"); } diff --git a/lucene/core/src/test/org/apache/lucene/index/TestUpgradeIndexMergePolicy.java b/lucene/core/src/test/org/apache/lucene/index/TestUpgradeIndexMergePolicy.java index 0ab13b42defb..16fb577a4d03 100644 --- a/lucene/core/src/test/org/apache/lucene/index/TestUpgradeIndexMergePolicy.java +++ b/lucene/core/src/test/org/apache/lucene/index/TestUpgradeIndexMergePolicy.java @@ -16,7 +16,6 @@ */ package org.apache.lucene.index; - public class TestUpgradeIndexMergePolicy extends BaseMergePolicyTestCase { public MergePolicy mergePolicy() {