diff --git a/lucene/backward-codecs/src/test/org/apache/lucene/index/TestBackwardsCompatibility.java b/lucene/backward-codecs/src/test/org/apache/lucene/index/TestBackwardsCompatibility.java index f180b47a3eda..0e3ed61f4e70 100644 --- a/lucene/backward-codecs/src/test/org/apache/lucene/index/TestBackwardsCompatibility.java +++ b/lucene/backward-codecs/src/test/org/apache/lucene/index/TestBackwardsCompatibility.java @@ -260,7 +260,7 @@ public void testCreateIndexWithDocValuesUpdates() throws Exception { writer.commit(); // flush every 10 docs } } - + // first segment: no updates // second segment: update two fields, same gen @@ -437,19 +437,22 @@ public void testCreateIndexWithDocValuesUpdates() throws Exception { // TODO: on 6.0.0 release, gen the single segment indices and add here: final static String[] oldSingleSegmentNames = { + "6.0.0.singlesegment-cfs", + "6.0.0.singlesegment-nocfs" }; static Map oldIndexDirs; /** - * Randomizes the use of some of hte constructor variations + * Randomizes the use of some of the constructor variations */ private static IndexUpgrader newIndexUpgrader(Directory dir) { final boolean streamType = random().nextBoolean(); final int choice = TestUtil.nextInt(random(), 0, 2); + switch (choice) { case 0: return new IndexUpgrader(dir); - case 1: return new IndexUpgrader(dir, streamType ? null : InfoStream.NO_OUTPUT, false); + case 1: return new IndexUpgrader(dir, streamType ? null : InfoStream.NO_OUTPUT, false, true); case 2: return new IndexUpgrader(dir, newIndexWriterConfig(null), false); default: fail("case statement didn't get updated when random bounds changed"); } @@ -464,8 +467,9 @@ public static void beforeClass() throws Exception { oldIndexDirs = new HashMap<>(); for (String name : names) { Path dir = createTempDir(name); - InputStream resource = TestBackwardsCompatibility.class.getResourceAsStream("index." + name + ".zip"); - assertNotNull("Index name " + name + " not found", resource); + String nameOnDisk = "index." + name + ".zip"; + InputStream resource = TestBackwardsCompatibility.class.getResourceAsStream(nameOnDisk); + assertNotNull("Index name " + nameOnDisk + " not found", resource); TestUtil.unzip(resource, dir); oldIndexDirs.put(name, newFSDirectory(dir)); } @@ -1337,16 +1341,24 @@ public void testNumericFields() throws Exception { } } - private int checkAllSegmentsUpgraded(Directory dir, int indexCreatedVersion) throws IOException { + + private int checkAllSegmentsUpgraded(Directory dir, int indexCreatedVersionMajor) throws IOException { + return this.checkAllSegmentsUpgraded(dir, Version.LATEST, indexCreatedVersionMajor); + } + + private int checkAllSegmentsUpgraded(Directory dir, Version upgradedVersion, int indexCreatedVersionMajor) throws IOException { + final SegmentInfos infos = SegmentInfos.readLatestCommit(dir); if (VERBOSE) { - System.out.println("checkAllSegmentsUpgraded: " + infos); + System.out.println("checkAllSegmentsUpgraded: " + indexCreatedVersionMajor + "-" + infos); } for (SegmentCommitInfo si : infos) { - assertEquals(Version.LATEST, si.info.getVersion()); + assertEquals(upgradedVersion, si.info.getVersion()); } - assertEquals(Version.LATEST, infos.getCommitLuceneVersion()); - assertEquals(indexCreatedVersion, infos.getIndexCreatedVersionMajor()); + + assertEquals(upgradedVersion, infos.getCommitLuceneVersion()); + assertEquals(indexCreatedVersionMajor, infos.getIndexCreatedVersionMajor()); + return infos.size(); } @@ -1364,15 +1376,67 @@ public void testUpgradeOldIndex() throws Exception { System.out.println("testUpgradeOldIndex: index=" +name); } Directory dir = newDirectory(oldIndexDirs.get(name)); + int indexCreatedVersion = SegmentInfos.readLatestCommit(dir).getIndexCreatedVersionMajor(); + + int numSegmentsBefore = SegmentInfos.readLatestCommit(dir).size(); + + IndexUpgrader upgrader = newIndexUpgrader(dir); + upgrader.upgrade(Integer.MAX_VALUE); + + int expectedNumSegmentsAfter = upgrader.getIndexWriterConfig().getMergePolicy() instanceof UpgradeIndexMergePolicy ? 1 : numSegmentsBefore; + assertEquals(expectedNumSegmentsAfter, checkAllSegmentsUpgraded(dir, indexCreatedVersion)); + + + dir.close(); + } + } + + public void testUpgradeWithExplicitUpgrades() throws Exception { + List names = new ArrayList<>(oldNames.length + oldSingleSegmentNames.length); + names.addAll(Arrays.asList(oldNames)); + names.addAll(Arrays.asList(oldSingleSegmentNames)); + for(String name : names) { + if (VERBOSE) { + System.out.println("testUpgradeWithExcplicitUpgrades: index=" +name); + } + Directory dir = newDirectory(oldIndexDirs.get(name)); + + SegmentInfos infosBefore = SegmentInfos.readLatestCommit(dir); + int numSegmentsBefore = infosBefore.size(); + Version versionBefore = infosBefore.getCommitLuceneVersion(); + int createdVersionBefore = infosBefore.getIndexCreatedVersionMajor(); + + assertFalse("Excpected these segments to be an old version", versionBefore.equals(Version.LATEST)); + + LiveUpgradeSegmentsMergePolicy uimp = new LiveUpgradeSegmentsMergePolicy(NoMergePolicy.INSTANCE); + + assertEquals(numSegmentsBefore, checkAllSegmentsUpgraded(dir, versionBefore, createdVersionBefore)); + + + try (IndexWriter w = new IndexWriter(dir, new IndexWriterConfig(new MockAnalyzer(random())).setMergePolicy(uimp))) { + w.forceMerge(numSegmentsBefore); // Don't optimize just upgrade + } + + // Upgrade should not have happened yet + assertEquals(numSegmentsBefore, checkAllSegmentsUpgraded(dir, versionBefore, createdVersionBefore)); - newIndexUpgrader(dir).upgrade(); + uimp.setEnableUpgrades(true); // Turn on upgrades + + try (IndexWriter w = new IndexWriter(dir, new IndexWriterConfig(new MockAnalyzer(random())).setMergePolicy(uimp))) { + w.forceMerge(numSegmentsBefore); // Don't optimize just upgrade + } - checkAllSegmentsUpgraded(dir, indexCreatedVersion); + // Upgrade should now have happened. + int indexCreatedVersionAfter = SegmentInfos.readLatestCommit(dir).getIndexCreatedVersionMajor(); + assertEquals(numSegmentsBefore, checkAllSegmentsUpgraded(dir, indexCreatedVersionAfter)); + dir.close(); } } + + // Write a test that checks that the underlying policy gets delegated to?? public void testCommandLineArgs() throws Exception { @@ -1405,15 +1469,20 @@ public void testCommandLineArgs() throws Exception { args.add("-dir-impl"); args.add(dirImpl.getName()); } + + if(random().nextBoolean()) { + args.add("-num-segments"); + args.add(String.valueOf(Integer.MAX_VALUE)); + args.add("-include-new-segments"); + } + args.add(path); - IndexUpgrader upgrader = null; try { - upgrader = IndexUpgrader.parseArgs(args.toArray(new String[0])); + IndexUpgrader.doUpgrade(args.toArray(new String[0])); } catch (Exception e) { throw new AssertionError("unable to parse args: " + args, e); } - upgrader.upgrade(); Directory upgradedDir = newFSDirectory(dir); try { @@ -1462,7 +1531,10 @@ public void testUpgradeOldSingleSegmentIndexWithAdditions() throws Exception { IndexWriterConfig iwc = new IndexWriterConfig(null) .setMergePolicy(mp); IndexWriter w = new IndexWriter(dir, iwc); - w.addIndexes(ramDir); + DirectoryReader ramDirReader = DirectoryReader.open(ramDir); + CodecReader[] readers = new CodecReader[ramDirReader.leaves().size()]; + for(int i = 0; i < readers.length; ++i) { readers[i] = SlowCodecReaderWrapper.wrap(ramDirReader.leaves().get(i).reader()); } + w.addIndexes(readers); try { w.commit(); } finally { @@ -1474,7 +1546,7 @@ public void testUpgradeOldSingleSegmentIndexWithAdditions() throws Exception { // ensure there is only one commit assertEquals(1, DirectoryReader.listCommits(dir).size()); - newIndexUpgrader(dir).upgrade(); + newIndexUpgrader(dir).upgrade(Integer.MAX_VALUE); final int segCount = checkAllSegmentsUpgraded(dir, indexCreatedVersion); assertEquals("Index must still contain the same number of segments, as only one segment was upgraded and nothing else merged", diff --git a/lucene/backward-codecs/src/test/org/apache/lucene/index/index.6.0.0.singlesegment-cfs.zip b/lucene/backward-codecs/src/test/org/apache/lucene/index/index.6.0.0.singlesegment-cfs.zip new file mode 100644 index 000000000000..e3eafae2bffa Binary files /dev/null and b/lucene/backward-codecs/src/test/org/apache/lucene/index/index.6.0.0.singlesegment-cfs.zip differ diff --git a/lucene/backward-codecs/src/test/org/apache/lucene/index/index.6.0.0.singlesegment-nocfs.zip b/lucene/backward-codecs/src/test/org/apache/lucene/index/index.6.0.0.singlesegment-nocfs.zip new file mode 100644 index 000000000000..8c551d2f359f Binary files /dev/null and b/lucene/backward-codecs/src/test/org/apache/lucene/index/index.6.0.0.singlesegment-nocfs.zip differ diff --git a/lucene/core/src/java/org/apache/lucene/index/IndexUpgrader.java b/lucene/core/src/java/org/apache/lucene/index/IndexUpgrader.java index 00084c880f23..67c07076fca5 100644 --- a/lucene/core/src/java/org/apache/lucene/index/IndexUpgrader.java +++ b/lucene/core/src/java/org/apache/lucene/index/IndexUpgrader.java @@ -43,7 +43,7 @@ * refuses to run by default. Specify {@code -delete-prior-commits} * to override this, allowing the tool to delete all but the last commit. * From Java code this can be enabled by passing {@code true} to - * {@link #IndexUpgrader(Directory,InfoStream,boolean)}. + * {@link #IndexUpgrader(Directory,InfoStream,boolean,boolean)}. *

Warning: This tool may reorder documents if the index was partially * upgraded before execution (e.g., documents were added). If your application relies * on "monotonicity" of doc IDs (which means that the order in which the documents @@ -59,7 +59,7 @@ public final class IndexUpgrader { private static void printUsage() { System.err.println("Upgrades an index so all segments created with a previous Lucene version are rewritten."); System.err.println("Usage:"); - System.err.println(" java " + IndexUpgrader.class.getName() + " [-delete-prior-commits] [-verbose] [-dir-impl X] indexDir"); + System.err.println(" java " + IndexUpgrader.class.getName() + " [-include-new-segments] [-num-segments N] [-delete-prior-commits] [-verbose] [-include-new-segments] [-dir-impl X] indexDir"); System.err.println("This tool keeps only the last commit in an index; for this"); System.err.println("reason, if the incoming index has more than one commit, the tool"); System.err.println("refuses to run by default. Specify -delete-prior-commits to override"); @@ -67,6 +67,13 @@ private static void printUsage() { System.err.println("Specify a " + FSDirectory.class.getSimpleName() + " implementation through the -dir-impl option to force its use. If no package is specified the " + FSDirectory.class.getPackage().getName() + " package will be used."); + System.err.println("If -include-new-segments is enabled, segments which have already been upgraded will be considered as merge candidates"); + System.err.println("If -num-segments is not specified and -include-new-segments is enabled this will merge all segments into a single segment"); + System.err.println("otherwise if -include-new-segments is not enabled only segments needing to be upgraded will be merged together."); + System.err.println("If the desired outcome is for all segments to be rewritten as the latest version and for no actual merges to occur"); + System.err.println("enable -include-new-segments and set -num-segments >= current number of segments (this case will not affect segments written with the current version)."); + System.err.println("e.x. -include-new-segments -num-segments " + Integer.MAX_VALUE); + System.err.println("Note: -num-segments is only supported when -include-new-segments is enabled"); System.err.println("WARNING: This tool may reorder document IDs!"); System.exit(1); } @@ -75,13 +82,15 @@ private static void printUsage() { * command-line. */ @SuppressWarnings("deprecation") public static void main(String[] args) throws IOException { - parseArgs(args).upgrade(); + doUpgrade(args); } @SuppressForbidden(reason = "System.out required: command line tool") - static IndexUpgrader parseArgs(String[] args) throws IOException { + static void doUpgrade(String[] args) throws IOException { String path = null; boolean deletePriorCommits = false; + boolean includeNewSegments = false; + int numSegments = 1; InfoStream out = null; String dirImpl = null; int i = 0; @@ -98,9 +107,19 @@ static IndexUpgrader parseArgs(String[] args) throws IOException { } i++; dirImpl = args[i]; + } else if("-include-new-segments".equals(arg)) { + includeNewSegments = true; + } else if("-num-segments".equals(arg)) { + if (i == args.length - 1 ) { + System.err.println("ERROR: missing value for -num-segments option"); + System.exit(1); + } + i++; + numSegments = Integer.valueOf(args[i]); } else if (path == null) { path = arg; - } else { + } + else { printUsage(); } i++; @@ -116,12 +135,20 @@ static IndexUpgrader parseArgs(String[] args) throws IOException { } else { dir = CommandLineUtil.newFSDirectory(dirImpl, p); } - return new IndexUpgrader(dir, out, deletePriorCommits); + + if(numSegments > 1 && !includeNewSegments) { + System.err.println("-num-segments > 1 and -include-new-segments is not set"); + printUsage(); + System.exit(1); + } + + new IndexUpgrader(dir, out, deletePriorCommits, includeNewSegments).upgrade(numSegments); } private final Directory dir; private final IndexWriterConfig iwc; private final boolean deletePriorCommits; + private final boolean includeNewSegments; /** Creates index upgrader on the given directory, using an {@link IndexWriter} using the given * {@code matchVersion}. The tool refuses to upgrade indexes with multiple commit points. */ @@ -132,24 +159,37 @@ public IndexUpgrader(Directory dir) { /** Creates index upgrader on the given directory, using an {@link IndexWriter} using the given * {@code matchVersion}. You have the possibility to upgrade indexes with multiple commit points by removing * all older ones. If {@code infoStream} is not {@code null}, all logging output will be sent to this stream. */ - public IndexUpgrader(Directory dir, InfoStream infoStream, boolean deletePriorCommits) { - this(dir, new IndexWriterConfig(null), deletePriorCommits); + public IndexUpgrader(Directory dir, InfoStream infoStream, boolean deletePriorCommits, boolean includeNewSegments) { + this(dir, new IndexWriterConfig(null), deletePriorCommits, includeNewSegments); if (null != infoStream) { this.iwc.setInfoStream(infoStream); } } + + public IndexUpgrader(Directory dir, IndexWriterConfig iwc, boolean deletePriorCommits) { + this(dir, iwc, deletePriorCommits, false); + } /** Creates index upgrader on the given directory, using an {@link IndexWriter} using the given * config. You have the possibility to upgrade indexes with multiple commit points by removing * all older ones. */ - public IndexUpgrader(Directory dir, IndexWriterConfig iwc, boolean deletePriorCommits) { + public IndexUpgrader(Directory dir, IndexWriterConfig iwc, boolean deletePriorCommits, boolean includeNewSegments) { this.dir = dir; this.iwc = iwc; this.deletePriorCommits = deletePriorCommits; + this.includeNewSegments = includeNewSegments; + } + + public IndexWriterConfig getIndexWriterConfig() { + return iwc; + } + + public void upgrade() throws IOException { + this.upgrade(1); } /** Perform the upgrade. */ - public void upgrade() throws IOException { + public void upgrade(int maxSegements) throws IOException { if (!DirectoryReader.indexExists(dir)) { throw new IndexNotFoundException(dir.toString()); } @@ -161,7 +201,17 @@ public void upgrade() throws IOException { } } - iwc.setMergePolicy(new UpgradeIndexMergePolicy(iwc.getMergePolicy())); + MergePolicy mp; + if(includeNewSegments) { + LiveUpgradeSegmentsMergePolicy uimp = new LiveUpgradeSegmentsMergePolicy(iwc.getMergePolicy()); + uimp.setEnableUpgrades(true); + mp = uimp; + } else { + mp = new UpgradeIndexMergePolicy(iwc.getMergePolicy()); + } + + + iwc.setMergePolicy(mp); iwc.setIndexDeletionPolicy(new KeepOnlyLastCommitDeletionPolicy()); try (final IndexWriter w = new IndexWriter(dir, iwc)) { @@ -169,7 +219,7 @@ public void upgrade() throws IOException { if (infoStream.isEnabled(LOG_PREFIX)) { infoStream.message(LOG_PREFIX, "Upgrading all pre-" + Version.LATEST + " segments of index directory '" + dir + "' to version " + Version.LATEST + "..."); } - w.forceMerge(1); + w.forceMerge(maxSegements); if (infoStream.isEnabled(LOG_PREFIX)) { infoStream.message(LOG_PREFIX, "All segments upgraded to version " + Version.LATEST); infoStream.message(LOG_PREFIX, "Enforcing commit to rewrite all index metadata..."); diff --git a/lucene/core/src/java/org/apache/lucene/index/LiveUpgradeSegmentsMergePolicy.java b/lucene/core/src/java/org/apache/lucene/index/LiveUpgradeSegmentsMergePolicy.java new file mode 100644 index 000000000000..216049f63634 --- /dev/null +++ b/lucene/core/src/java/org/apache/lucene/index/LiveUpgradeSegmentsMergePolicy.java @@ -0,0 +1,225 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.lucene.index; + + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.function.Predicate; + +import org.apache.lucene.util.Version; + +/** This {@link MergePolicy} is used for upgrading all existing segments of + * an index when calling {@link IndexWriter#forceMerge(int)}. + * All other methods delegate to the base {@code MergePolicy} given to the constructor. + * Upgrades will only take place after enableUpgrades has been set to true. + * When upgrades are enabled, it will ask the wrapped policy for segments to merge, and the left over segments will be + * rewritten with the latest version (i.e. merged with themselves). + * If upgrades are not enabled upgrades are delegated to the wrapped merge policy. + * + *

In general one would use {@link IndexUpgrader}, but for a fully customizeable upgrade, + * you can use this like any other {@code MergePolicy} and call {@link IndexWriter#forceMerge(int)}: + *

+  *  IndexWriterConfig iwc = new IndexWriterConfig(Version.LUCENE_XX, new KeywordAnalyzer());
+  *  iwc.setMergePolicy(new LiveUpgradeIndexMergePolicy(iwc.getMergePolicy()));
+  *  IndexWriter w = new IndexWriter(dir, iwc);
+  *  w.setEnableUpgrades(true);
+  *  w.forceMerge(1);
+  *  w.close();
+  * 
+ *

The above example would result in a single segment in the latest version. However take this scenario: + * If there were 10 segments in the index and they all need upgrade, calling w.forceMerge(10) would leave 10 segments + * in the index written with the latest lucene version. Calling w.forceMerge(5) would delegate wrapped merge policy + * to determine which segments should be merged together, the remaining segments will be upgraded (rewritten) if need be. + *

Warning: This merge policy may reorder documents if the index was partially + * upgraded before calling forceMerge (e.g., documents were added). If your application relies + * on "monotonicity" of doc IDs (which means that the order in which the documents + * were added to the index is preserved), do a forceMerge(1) instead. Please note, the + * delegate {@code MergePolicy} may also reorder documents. + * @lucene.experimental + * @see IndexUpgrader + */ +public class LiveUpgradeSegmentsMergePolicy extends MergePolicyWrapper { + + private int maxUpgradesAtATime = 5; + private volatile boolean enableUpgrades = false; + private Predicate shouldSegmentUpgrade = LiveUpgradeSegmentsMergePolicy::segmentIsLatestVersion; + + /** Wrap the given {@link MergePolicy} and intercept forceMerge requests to + * only upgrade segments written with previous Lucene versions. */ + public LiveUpgradeSegmentsMergePolicy(MergePolicy in) { + super(in); + } + + /** + * Set whether or not it is ok for this merge policy to do an upgrade. + * When true this merge policy will look for segments to upgrade. + * + * When all upgrades are complete, this merge policy will set flag back to false + * + * @param enableUpgrades allow this policy to upgrade segments: Default false + */ + public void setEnableUpgrades(boolean enableUpgrades) { + this.enableUpgrades = enableUpgrades; + } + + public boolean getEnableUpgrades() { + return enableUpgrades; + } + + /** + * How many segment upgrades should be commited for scheduling at a time. If more segments + * than maxUpgradeSegments need to be upgraded this merge policy relies on IndexWriters cascaded + * requests to find segments to merge. Submitting a few segments at a time allows segments in need + * of an upgrade to remain candidates for a natrually triggered merge. + * + * @param maxUpgradesAtATime how many segment upgrades should be commited for scheduling at a time: Default 5 + */ + public void setMaxUpgradesAtATime(int maxUpgradesAtATime) { + this.maxUpgradesAtATime = maxUpgradesAtATime; + } + + public int getMaxUpgradesAtATime() { + return maxUpgradesAtATime; + } + + public void setShouldSegmentUpgrade(Predicate shouldSegmentUpgrade) { + this.shouldSegmentUpgrade = shouldSegmentUpgrade; + } + + /** + * Return if a segment is the latest version + * + * @param si some segment commit info + * @return if that segment is the latest version + */ + public static boolean segmentIsLatestVersion(SegmentCommitInfo si) { + return !Version.LATEST.equals(si.info.getVersion()); + } + + @Override + public MergeSpecification findForcedMerges(SegmentInfos segmentInfos, int maxSegmentCount, Map segmentsToMerge, IndexWriter writer) throws IOException { + + MergeSpecification spec = in.findForcedMerges(segmentInfos, maxSegmentCount, segmentsToMerge, writer); + + if(enableUpgrades) { + + // first find all old segments + final Map segmentsNeedingUpgrade = findSegmentsNeedingUpgrade(segmentInfos, segmentsToMerge); + + if (verbose(writer)) { + message("findForcedMerges: segmentsToUpgrade=" + segmentsNeedingUpgrade, writer); + } + + if (segmentsNeedingUpgrade.isEmpty()) { + enableUpgrades = false; // Nothing to upgrade + return spec; + } + + + // Remove segments already being merged from segments needing upgrade. + if (spec != null) { + for (final OneMerge om : spec.merges) { + segmentsNeedingUpgrade.keySet().removeAll(om.segments); + } + } + + // Add other segments missed by the wrapped merge policy to be upgraded + return maybeUpdateSpecAndUpgradeProgress(spec, segmentsNeedingUpgrade, segmentInfos, writer); + + } + + return spec; + } + + /** + * Updates the the merge spec with old segments needing upgrade. Also sets whether or not to the upgrade needs to continue (upgradeInProgress=false) + * + * @param spec the MergeSpecification to update + * @param segmentsNeedingUpgrade the segments needing upgrade + * @param segmentInfos all segment infos + * @param writer the index writer + * @return the possibly updated MergeSpecification + */ + private MergeSpecification maybeUpdateSpecAndUpgradeProgress(MergeSpecification spec, Map segmentsNeedingUpgrade, SegmentInfos segmentInfos, IndexWriter writer) { + if (!segmentsNeedingUpgrade.isEmpty()) { + if (verbose(writer)) { + message("findForcedMerges: " + in.getClass().getSimpleName() + + " does not want to merge all old segments, rewrite remaining ones into upgraded segments: " + segmentsNeedingUpgrade, writer); + } + + if (spec == null) { + spec = new MergeSpecification(); + } + + final int numOriginalMerges = spec.merges.size(); + + for (SegmentCommitInfo si: segmentInfos) { + + if(!segmentsNeedingUpgrade.containsKey(si)) { + continue; + } + + // Add a merge of only the upgrading segment to the spec + // We don't want to merge, just upgrade + spec.add(new OneMerge(Collections.singletonList(si))); + + int numMergeWeAdded = spec.merges.size() - numOriginalMerges; + if(numMergeWeAdded == maxUpgradesAtATime) { + return spec; + } + + } + + // We found we have less than the max number but greater than 0 + if(spec.merges.size() > numOriginalMerges) { + return spec; + } + + } + + // Only set this once there are 0 segments needing upgrading, because when we return a + // spec, IndexWriter may (silently!) reject that merge if some of the segments we asked + // to be merged were already being (naturally) merged: + enableUpgrades = false; + + return spec; + } + + private Map findSegmentsNeedingUpgrade(SegmentInfos segmentInfos, Map segmentsToMerge) { + final Map segmentsNeedingUpgrade = new HashMap<>(); + + for (final SegmentCommitInfo si : segmentInfos) { + final Boolean v = segmentsToMerge.get(si); + if (v != null && shouldSegmentUpgrade.test(si)) { + segmentsNeedingUpgrade.put(si, v); + } + } + + return segmentsNeedingUpgrade; + } + + private boolean verbose(IndexWriter writer) { + return writer != null && writer.infoStream.isEnabled("UPGMP"); + } + + private void message(String message, IndexWriter writer) { + writer.infoStream.message("UPGMP", message); + } +} diff --git a/lucene/core/src/test/org/apache/lucene/index/TestUpgradeIndexMergePolicy.java b/lucene/core/src/test/org/apache/lucene/index/TestUpgradeIndexMergePolicy.java index 0ab13b42defb..16fb577a4d03 100644 --- a/lucene/core/src/test/org/apache/lucene/index/TestUpgradeIndexMergePolicy.java +++ b/lucene/core/src/test/org/apache/lucene/index/TestUpgradeIndexMergePolicy.java @@ -16,7 +16,6 @@ */ package org.apache.lucene.index; - public class TestUpgradeIndexMergePolicy extends BaseMergePolicyTestCase { public MergePolicy mergePolicy() {