Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

LUCENE-7671 - Enhance UpgradeIndexMergePolicy with additional options #151

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ public void testCreateIndexWithDocValuesUpdates() throws Exception {
writer.commit(); // flush every 10 docs
}
}

// first segment: no updates

// second segment: update two fields, same gen
Expand Down Expand Up @@ -437,19 +437,22 @@ public void testCreateIndexWithDocValuesUpdates() throws Exception {

// TODO: on 6.0.0 release, gen the single segment indices and add here:
final static String[] oldSingleSegmentNames = {
"6.0.0.singlesegment-cfs",
"6.0.0.singlesegment-nocfs"
};

static Map<String,Directory> oldIndexDirs;

/**
* Randomizes the use of some of hte constructor variations
* Randomizes the use of some of the constructor variations
*/
private static IndexUpgrader newIndexUpgrader(Directory dir) {
final boolean streamType = random().nextBoolean();
final int choice = TestUtil.nextInt(random(), 0, 2);

switch (choice) {
case 0: return new IndexUpgrader(dir);
case 1: return new IndexUpgrader(dir, streamType ? null : InfoStream.NO_OUTPUT, false);
case 1: return new IndexUpgrader(dir, streamType ? null : InfoStream.NO_OUTPUT, false, true);
case 2: return new IndexUpgrader(dir, newIndexWriterConfig(null), false);
default: fail("case statement didn't get updated when random bounds changed");
}
Expand All @@ -464,8 +467,9 @@ public static void beforeClass() throws Exception {
oldIndexDirs = new HashMap<>();
for (String name : names) {
Path dir = createTempDir(name);
InputStream resource = TestBackwardsCompatibility.class.getResourceAsStream("index." + name + ".zip");
assertNotNull("Index name " + name + " not found", resource);
String nameOnDisk = "index." + name + ".zip";
InputStream resource = TestBackwardsCompatibility.class.getResourceAsStream(nameOnDisk);
assertNotNull("Index name " + nameOnDisk + " not found", resource);
TestUtil.unzip(resource, dir);
oldIndexDirs.put(name, newFSDirectory(dir));
}
Expand Down Expand Up @@ -1337,16 +1341,24 @@ public void testNumericFields() throws Exception {
}
}

private int checkAllSegmentsUpgraded(Directory dir, int indexCreatedVersion) throws IOException {

private int checkAllSegmentsUpgraded(Directory dir, int indexCreatedVersionMajor) throws IOException {
return this.checkAllSegmentsUpgraded(dir, Version.LATEST, indexCreatedVersionMajor);
}

private int checkAllSegmentsUpgraded(Directory dir, Version upgradedVersion, int indexCreatedVersionMajor) throws IOException {

final SegmentInfos infos = SegmentInfos.readLatestCommit(dir);
if (VERBOSE) {
System.out.println("checkAllSegmentsUpgraded: " + infos);
System.out.println("checkAllSegmentsUpgraded: " + indexCreatedVersionMajor + "-" + infos);
}
for (SegmentCommitInfo si : infos) {
assertEquals(Version.LATEST, si.info.getVersion());
assertEquals(upgradedVersion, si.info.getVersion());
}
assertEquals(Version.LATEST, infos.getCommitLuceneVersion());
assertEquals(indexCreatedVersion, infos.getIndexCreatedVersionMajor());

assertEquals(upgradedVersion, infos.getCommitLuceneVersion());
assertEquals(indexCreatedVersionMajor, infos.getIndexCreatedVersionMajor());

return infos.size();
}

Expand All @@ -1364,15 +1376,67 @@ public void testUpgradeOldIndex() throws Exception {
System.out.println("testUpgradeOldIndex: index=" +name);
}
Directory dir = newDirectory(oldIndexDirs.get(name));

int indexCreatedVersion = SegmentInfos.readLatestCommit(dir).getIndexCreatedVersionMajor();

int numSegmentsBefore = SegmentInfos.readLatestCommit(dir).size();

IndexUpgrader upgrader = newIndexUpgrader(dir);
upgrader.upgrade(Integer.MAX_VALUE);

int expectedNumSegmentsAfter = upgrader.getIndexWriterConfig().getMergePolicy() instanceof UpgradeIndexMergePolicy ? 1 : numSegmentsBefore;
assertEquals(expectedNumSegmentsAfter, checkAllSegmentsUpgraded(dir, indexCreatedVersion));


dir.close();
}
}

public void testUpgradeWithExplicitUpgrades() throws Exception {
List<String> names = new ArrayList<>(oldNames.length + oldSingleSegmentNames.length);
names.addAll(Arrays.asList(oldNames));
names.addAll(Arrays.asList(oldSingleSegmentNames));
for(String name : names) {
if (VERBOSE) {
System.out.println("testUpgradeWithExcplicitUpgrades: index=" +name);
}
Directory dir = newDirectory(oldIndexDirs.get(name));

SegmentInfos infosBefore = SegmentInfos.readLatestCommit(dir);
int numSegmentsBefore = infosBefore.size();
Version versionBefore = infosBefore.getCommitLuceneVersion();
int createdVersionBefore = infosBefore.getIndexCreatedVersionMajor();

assertFalse("Excpected these segments to be an old version", versionBefore.equals(Version.LATEST));

LiveUpgradeSegmentsMergePolicy uimp = new LiveUpgradeSegmentsMergePolicy(NoMergePolicy.INSTANCE);

assertEquals(numSegmentsBefore, checkAllSegmentsUpgraded(dir, versionBefore, createdVersionBefore));


try (IndexWriter w = new IndexWriter(dir, new IndexWriterConfig(new MockAnalyzer(random())).setMergePolicy(uimp))) {
w.forceMerge(numSegmentsBefore); // Don't optimize just upgrade
}

// Upgrade should not have happened yet
assertEquals(numSegmentsBefore, checkAllSegmentsUpgraded(dir, versionBefore, createdVersionBefore));

newIndexUpgrader(dir).upgrade();
uimp.setEnableUpgrades(true); // Turn on upgrades

try (IndexWriter w = new IndexWriter(dir, new IndexWriterConfig(new MockAnalyzer(random())).setMergePolicy(uimp))) {
w.forceMerge(numSegmentsBefore); // Don't optimize just upgrade
}

checkAllSegmentsUpgraded(dir, indexCreatedVersion);
// Upgrade should now have happened.
int indexCreatedVersionAfter = SegmentInfos.readLatestCommit(dir).getIndexCreatedVersionMajor();

assertEquals(numSegmentsBefore, checkAllSegmentsUpgraded(dir, indexCreatedVersionAfter));

dir.close();
}
}

// Write a test that checks that the underlying policy gets delegated to??

public void testCommandLineArgs() throws Exception {

Expand Down Expand Up @@ -1405,15 +1469,20 @@ public void testCommandLineArgs() throws Exception {
args.add("-dir-impl");
args.add(dirImpl.getName());
}

if(random().nextBoolean()) {
args.add("-num-segments");
args.add(String.valueOf(Integer.MAX_VALUE));
args.add("-include-new-segments");
}

args.add(path);

IndexUpgrader upgrader = null;
try {
upgrader = IndexUpgrader.parseArgs(args.toArray(new String[0]));
IndexUpgrader.doUpgrade(args.toArray(new String[0]));
} catch (Exception e) {
throw new AssertionError("unable to parse args: " + args, e);
}
upgrader.upgrade();

Directory upgradedDir = newFSDirectory(dir);
try {
Expand Down Expand Up @@ -1462,7 +1531,10 @@ public void testUpgradeOldSingleSegmentIndexWithAdditions() throws Exception {
IndexWriterConfig iwc = new IndexWriterConfig(null)
.setMergePolicy(mp);
IndexWriter w = new IndexWriter(dir, iwc);
w.addIndexes(ramDir);
DirectoryReader ramDirReader = DirectoryReader.open(ramDir);
CodecReader[] readers = new CodecReader[ramDirReader.leaves().size()];
for(int i = 0; i < readers.length; ++i) { readers[i] = SlowCodecReaderWrapper.wrap(ramDirReader.leaves().get(i).reader()); }
w.addIndexes(readers);
try {
w.commit();
} finally {
Expand All @@ -1474,7 +1546,7 @@ public void testUpgradeOldSingleSegmentIndexWithAdditions() throws Exception {

// ensure there is only one commit
assertEquals(1, DirectoryReader.listCommits(dir).size());
newIndexUpgrader(dir).upgrade();
newIndexUpgrader(dir).upgrade(Integer.MAX_VALUE);

final int segCount = checkAllSegmentsUpgraded(dir, indexCreatedVersion);
assertEquals("Index must still contain the same number of segments, as only one segment was upgraded and nothing else merged",
Expand Down
Binary file not shown.
Binary file not shown.
74 changes: 62 additions & 12 deletions lucene/core/src/java/org/apache/lucene/index/IndexUpgrader.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
* refuses to run by default. Specify {@code -delete-prior-commits}
* to override this, allowing the tool to delete all but the last commit.
* From Java code this can be enabled by passing {@code true} to
* {@link #IndexUpgrader(Directory,InfoStream,boolean)}.
* {@link #IndexUpgrader(Directory,InfoStream,boolean,boolean)}.
* <p><b>Warning:</b> This tool may reorder documents if the index was partially
* upgraded before execution (e.g., documents were added). If your application relies
* on &quot;monotonicity&quot; of doc IDs (which means that the order in which the documents
Expand All @@ -59,14 +59,21 @@ public final class IndexUpgrader {
private static void printUsage() {
System.err.println("Upgrades an index so all segments created with a previous Lucene version are rewritten.");
System.err.println("Usage:");
System.err.println(" java " + IndexUpgrader.class.getName() + " [-delete-prior-commits] [-verbose] [-dir-impl X] indexDir");
System.err.println(" java " + IndexUpgrader.class.getName() + " [-include-new-segments] [-num-segments N] [-delete-prior-commits] [-verbose] [-include-new-segments] [-dir-impl X] indexDir");
System.err.println("This tool keeps only the last commit in an index; for this");
System.err.println("reason, if the incoming index has more than one commit, the tool");
System.err.println("refuses to run by default. Specify -delete-prior-commits to override");
System.err.println("this, allowing the tool to delete all but the last commit.");
System.err.println("Specify a " + FSDirectory.class.getSimpleName() +
" implementation through the -dir-impl option to force its use. If no package is specified the "
+ FSDirectory.class.getPackage().getName() + " package will be used.");
System.err.println("If -include-new-segments is enabled, segments which have already been upgraded will be considered as merge candidates");
System.err.println("If -num-segments is not specified and -include-new-segments is enabled this will merge all segments into a single segment");
System.err.println("otherwise if -include-new-segments is not enabled only segments needing to be upgraded will be merged together.");
System.err.println("If the desired outcome is for all segments to be rewritten as the latest version and for no actual merges to occur");
System.err.println("enable -include-new-segments and set -num-segments >= current number of segments (this case will not affect segments written with the current version).");
System.err.println("e.x. -include-new-segments -num-segments " + Integer.MAX_VALUE);
System.err.println("Note: -num-segments is only supported when -include-new-segments is enabled");
System.err.println("WARNING: This tool may reorder document IDs!");
System.exit(1);
}
Expand All @@ -75,13 +82,15 @@ private static void printUsage() {
* command-line. */
@SuppressWarnings("deprecation")
public static void main(String[] args) throws IOException {
parseArgs(args).upgrade();
doUpgrade(args);
}

@SuppressForbidden(reason = "System.out required: command line tool")
static IndexUpgrader parseArgs(String[] args) throws IOException {
static void doUpgrade(String[] args) throws IOException {
String path = null;
boolean deletePriorCommits = false;
boolean includeNewSegments = false;
int numSegments = 1;
InfoStream out = null;
String dirImpl = null;
int i = 0;
Expand All @@ -98,9 +107,19 @@ static IndexUpgrader parseArgs(String[] args) throws IOException {
}
i++;
dirImpl = args[i];
} else if("-include-new-segments".equals(arg)) {
includeNewSegments = true;
} else if("-num-segments".equals(arg)) {
if (i == args.length - 1 ) {
System.err.println("ERROR: missing value for -num-segments option");
System.exit(1);
}
i++;
numSegments = Integer.valueOf(args[i]);
} else if (path == null) {
path = arg;
} else {
}
else {
printUsage();
}
i++;
Expand All @@ -116,12 +135,20 @@ static IndexUpgrader parseArgs(String[] args) throws IOException {
} else {
dir = CommandLineUtil.newFSDirectory(dirImpl, p);
}
return new IndexUpgrader(dir, out, deletePriorCommits);

if(numSegments > 1 && !includeNewSegments) {
System.err.println("-num-segments > 1 and -include-new-segments is not set");
printUsage();
System.exit(1);
}

new IndexUpgrader(dir, out, deletePriorCommits, includeNewSegments).upgrade(numSegments);
}

private final Directory dir;
private final IndexWriterConfig iwc;
private final boolean deletePriorCommits;
private final boolean includeNewSegments;

/** Creates index upgrader on the given directory, using an {@link IndexWriter} using the given
* {@code matchVersion}. The tool refuses to upgrade indexes with multiple commit points. */
Expand All @@ -132,24 +159,37 @@ public IndexUpgrader(Directory dir) {
/** Creates index upgrader on the given directory, using an {@link IndexWriter} using the given
* {@code matchVersion}. You have the possibility to upgrade indexes with multiple commit points by removing
* all older ones. If {@code infoStream} is not {@code null}, all logging output will be sent to this stream. */
public IndexUpgrader(Directory dir, InfoStream infoStream, boolean deletePriorCommits) {
this(dir, new IndexWriterConfig(null), deletePriorCommits);
public IndexUpgrader(Directory dir, InfoStream infoStream, boolean deletePriorCommits, boolean includeNewSegments) {
this(dir, new IndexWriterConfig(null), deletePriorCommits, includeNewSegments);
if (null != infoStream) {
this.iwc.setInfoStream(infoStream);
}
}

public IndexUpgrader(Directory dir, IndexWriterConfig iwc, boolean deletePriorCommits) {
this(dir, iwc, deletePriorCommits, false);
}

/** Creates index upgrader on the given directory, using an {@link IndexWriter} using the given
* config. You have the possibility to upgrade indexes with multiple commit points by removing
* all older ones. */
public IndexUpgrader(Directory dir, IndexWriterConfig iwc, boolean deletePriorCommits) {
public IndexUpgrader(Directory dir, IndexWriterConfig iwc, boolean deletePriorCommits, boolean includeNewSegments) {
this.dir = dir;
this.iwc = iwc;
this.deletePriorCommits = deletePriorCommits;
this.includeNewSegments = includeNewSegments;
}

public IndexWriterConfig getIndexWriterConfig() {
return iwc;
}

public void upgrade() throws IOException {
this.upgrade(1);
}

/** Perform the upgrade. */
public void upgrade() throws IOException {
public void upgrade(int maxSegements) throws IOException {
if (!DirectoryReader.indexExists(dir)) {
throw new IndexNotFoundException(dir.toString());
}
Expand All @@ -161,15 +201,25 @@ public void upgrade() throws IOException {
}
}

iwc.setMergePolicy(new UpgradeIndexMergePolicy(iwc.getMergePolicy()));
MergePolicy mp;
if(includeNewSegments) {
LiveUpgradeSegmentsMergePolicy uimp = new LiveUpgradeSegmentsMergePolicy(iwc.getMergePolicy());
uimp.setEnableUpgrades(true);
mp = uimp;
} else {
mp = new UpgradeIndexMergePolicy(iwc.getMergePolicy());
}


iwc.setMergePolicy(mp);
iwc.setIndexDeletionPolicy(new KeepOnlyLastCommitDeletionPolicy());

try (final IndexWriter w = new IndexWriter(dir, iwc)) {
InfoStream infoStream = iwc.getInfoStream();
if (infoStream.isEnabled(LOG_PREFIX)) {
infoStream.message(LOG_PREFIX, "Upgrading all pre-" + Version.LATEST + " segments of index directory '" + dir + "' to version " + Version.LATEST + "...");
}
w.forceMerge(1);
w.forceMerge(maxSegements);
if (infoStream.isEnabled(LOG_PREFIX)) {
infoStream.message(LOG_PREFIX, "All segments upgraded to version " + Version.LATEST);
infoStream.message(LOG_PREFIX, "Enforcing commit to rewrite all index metadata...");
Expand Down
Loading