Skip to content

Commit

Permalink
Improve BulkSplitOptimizationIT
Browse files Browse the repository at this point in the history
  • Loading branch information
DomGarguilo committed Dec 13, 2024
1 parent 32d3ca8 commit 7d44617
Showing 1 changed file with 42 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,49 +18,63 @@
*/
package org.apache.accumulo.test.functional;

import static java.util.concurrent.TimeUnit.SECONDS;

import java.time.Duration;
import java.util.HashMap;
import java.util.Map;

import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.admin.NewTableConfiguration;
import org.apache.accumulo.core.client.admin.TabletAvailability;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.harness.AccumuloClusterHarness;
import org.apache.accumulo.minicluster.ServerType;
import org.apache.accumulo.test.VerifyIngest;
import org.apache.accumulo.test.VerifyIngest.VerifyParams;
import org.apache.accumulo.test.util.Wait;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* This test verifies that when a lot of files are bulk imported into a table with one tablet and
* then splits that not all data files go to the children tablets.
*/
public class BulkSplitOptimizationIT extends AccumuloClusterHarness {
private static final Logger log = LoggerFactory.getLogger(BulkSplitOptimizationIT.class);

Path testDir;

@Override
protected Duration defaultTimeout() {
return Duration.ofMinutes(2);
return Duration.ofMinutes(5);
}

@BeforeEach
public void alterConfig() throws Exception {
try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) {
getClusterControl().stopAllServers(ServerType.TABLET_SERVER);
getClusterControl().startAllServers(ServerType.TABLET_SERVER);

FileSystem fs = cluster.getFileSystem();
testDir = new Path(cluster.getTemporaryPath(), "testmf");
fs.deleteOnExit(testDir);
FunctionalTestUtils.createRFiles(client, fs, testDir.toString(), ROWS, SPLITS, 8);
FileStatus[] stats = fs.listStatus(testDir);

log.info("Number of generated files: {}", stats.length);
}
}

@AfterEach
public void resetConfig() throws Exception {
try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) {
getClusterControl().stopAllServers(ServerType.TABLET_SERVER);
getClusterControl().startAllServers(ServerType.TABLET_SERVER);
}
getClusterControl().stopAllServers(ServerType.TABLET_SERVER);
getClusterControl().startAllServers(ServerType.TABLET_SERVER);
}

private static final int ROWS = 100000;
Expand All @@ -70,33 +84,36 @@ public void resetConfig() throws Exception {
public void testBulkSplitOptimization() throws Exception {
try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) {
final String tableName = getUniqueNames(1)[0];
c.tableOperations().create(tableName);
c.tableOperations().setProperty(tableName, Property.TABLE_MAJC_RATIO.getKey(), "1000");
c.tableOperations().setProperty(tableName, Property.TABLE_FILE_MAX.getKey(), "1000");
c.tableOperations().setProperty(tableName, Property.TABLE_SPLIT_THRESHOLD.getKey(), "1G");
FileSystem fs = cluster.getFileSystem();
Path testDir = new Path(cluster.getTemporaryPath(), "testmf");
fs.deleteOnExit(testDir);
FunctionalTestUtils.createRFiles(c, fs, testDir.toString(), ROWS, SPLITS, 8);
FileStatus[] stats = fs.listStatus(testDir);

System.out.println("Number of generated files: " + stats.length);
Map<String,String> tableProps = new HashMap<>();
tableProps.put(Property.TABLE_MAJC_RATIO.getKey(), "1000");
tableProps.put(Property.TABLE_FILE_MAX.getKey(), "1000");
tableProps.put(Property.TABLE_SPLIT_THRESHOLD.getKey(), "1G");
c.tableOperations().create(tableName, new NewTableConfiguration().setProperties(tableProps)
.withInitialTabletAvailability(TabletAvailability.HOSTED));

log.info("Starting bulk import");
c.tableOperations().importDirectory(testDir.toString()).to(tableName).load();

FunctionalTestUtils.checkSplits(c, tableName, 0, 0);
FunctionalTestUtils.checkRFiles(c, tableName, 1, 1, 100, 100);

// initiate splits
log.info("Lowering split threshold to 100K to initiate splits");
c.tableOperations().setProperty(tableName, Property.TABLE_SPLIT_THRESHOLD.getKey(), "100K");

Thread.sleep(SECONDS.toMillis(2));

// wait until over split threshold -- should be 78 splits
while (c.tableOperations().listSplits(tableName).size() < 50) {
Thread.sleep(500);
}
Wait.waitFor(() -> {
try {
FunctionalTestUtils.checkSplits(c, tableName, 50, 100);
} catch (Exception e) {
if (e.getMessage().contains("splits points out of range")) {
return false;
} else {
throw e;
}
}
return true;
});

FunctionalTestUtils.checkSplits(c, tableName, 50, 100);
VerifyParams params = new VerifyParams(getClientProps(), tableName, ROWS);
params.timestamp = 1;
params.dataSize = 50;
Expand Down

0 comments on commit 7d44617

Please sign in to comment.