Skip to content

Commit

Permalink
narrows check of loaded files in conditional mutation (#5166)
Browse files Browse the repository at this point in the history
Bulk load fate code was reading a tablets loaded flags, checking it was
not in them, and then making a conditional mutation that required the
set of bulk flags to to be the same.  Requiring the set to be the same
caused unnecessary  collisions between bulk imports.  Modified the
conditional check to require the loaded flags for the fate operation
to be absent.
  • Loading branch information
keith-turner authored Dec 12, 2024
1 parent 08f77aa commit 802b4d6
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -525,6 +525,11 @@ ConditionalTabletMutator requireSame(TabletMetadata tabletMetadata, ColumnType t
*/
ConditionalTabletMutator requireLessOrEqualsFiles(long limit);

/**
* Requires that a tablet not have these loaded flags set.
*/
ConditionalTabletMutator requireAbsentLoaded(Set<ReferencedTabletFile> files);

/**
* <p>
* Ample provides the following features on top of the conditional writer to help automate
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.iterators.SortedFilesIterator;
import org.apache.accumulo.core.lock.ServiceLock;
import org.apache.accumulo.core.metadata.ReferencedTabletFile;
import org.apache.accumulo.core.metadata.StoredTabletFile;
import org.apache.accumulo.core.metadata.schema.Ample;
import org.apache.accumulo.core.metadata.schema.Ample.ConditionalTabletMutator;
Expand Down Expand Up @@ -353,6 +354,16 @@ public ConditionalTabletMutator requireLessOrEqualsFiles(long limit) {
return this;
}

@Override
public ConditionalTabletMutator requireAbsentLoaded(Set<ReferencedTabletFile> files) {
Preconditions.checkState(updatesEnabled, "Cannot make updates after calling mutate.");
for (ReferencedTabletFile file : files) {
Condition c = new Condition(BulkFileColumnFamily.STR_NAME, file.insert().getMetadata());
mutation.addCondition(c);
}
return this;
}

@Override
public void submit(Ample.RejectionHandler rejectionCheck) {
Preconditions.checkState(updatesEnabled, "Cannot make updates after calling mutate.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,6 @@ void load(List<TabletMetadata> tablets, Files files) {
}

List<ColumnType> rsc = new ArrayList<>();
rsc.add(LOCATION);
if (setTime) {
rsc.add(TIME);
}
Expand Down Expand Up @@ -232,7 +231,8 @@ void load(List<TabletMetadata> tablets, Files files) {
}

var tabletMutator = conditionalMutator.mutateTablet(tablet.getExtent())
.requireAbsentOperation().requireSame(tablet, LOADED, requireSameCols);
.requireAbsentOperation().requireAbsentLoaded(filesToLoad.keySet())
.requireSame(tablet, LOCATION, requireSameCols);

if (pauseLimit > 0) {
tabletMutator.requireLessOrEqualsFiles(pauseLimit);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1874,4 +1874,65 @@ public void testFilesLimit() {
assertEquals(time4, context.getAmple().readTablet(e1).getTime());

}

@Test
public void testRequireAbsentLoaded() {
var context = cluster.getServerContext();

var stf1 = StoredTabletFile
.of(new Path("hdfs://localhost:8020/accumulo/tables/2a/default_tablet/F0000070.rf"));
var stf2 = StoredTabletFile
.of(new Path("hdfs://localhost:8020/accumulo/tables/2a/default_tablet/F0000071.rf"));
var stf3 = StoredTabletFile
.of(new Path("hdfs://localhost:8020/accumulo/tables/2a/default_tablet/F0000072.rf"));
var dfv = new DataFileValue(100, 100);

FateId fateId1 = FateId.from(FateInstanceType.USER, UUID.randomUUID());

try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
ctmi.mutateTablet(e1).requireAbsentOperation()
.requireAbsentLoaded(Set.of(stf1.getTabletFile(), stf2.getTabletFile()))
.putBulkFile(stf1.getTabletFile(), fateId1).putBulkFile(stf2.getTabletFile(), fateId1)
.putFile(stf1, dfv).putFile(stf2, dfv).submit(tm -> false);
assertEquals(Status.ACCEPTED, ctmi.process().get(e1).getStatus());
}
assertEquals(Set.of(stf1, stf2), context.getAmple().readTablet(e1).getFiles());
assertEquals(Map.of(stf1, fateId1, stf2, fateId1),
context.getAmple().readTablet(e1).getLoaded());

FateId fateId2 = FateId.from(FateInstanceType.USER, UUID.randomUUID());

try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
ctmi.mutateTablet(e1).requireAbsentOperation()
.requireAbsentLoaded(Set.of(stf3.getTabletFile()))
.putBulkFile(stf3.getTabletFile(), fateId2).putFile(stf3, dfv).submit(tm -> false);
assertEquals(Status.ACCEPTED, ctmi.process().get(e1).getStatus());
}
assertEquals(Set.of(stf1, stf2, stf3), context.getAmple().readTablet(e1).getFiles());
assertEquals(Map.of(stf1, fateId1, stf2, fateId1, stf3, fateId2),
context.getAmple().readTablet(e1).getLoaded());

// should fail because the loaded markers are present
try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
ctmi.mutateTablet(e1).requireAbsentOperation()
.requireAbsentLoaded(Set.of(stf1.getTabletFile(), stf2.getTabletFile()))
.putBulkFile(stf1.getTabletFile(), fateId1).putBulkFile(stf2.getTabletFile(), fateId1)
.putFile(stf1, dfv).putFile(stf2, dfv).putFlushId(99).submit(tm -> false);
assertEquals(Status.REJECTED, ctmi.process().get(e1).getStatus());
}

// should fail because the loaded markers are present
try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
ctmi.mutateTablet(e1).requireAbsentOperation()
.requireAbsentLoaded(Set.of(stf3.getTabletFile()))
.putBulkFile(stf3.getTabletFile(), fateId2).putFile(stf3, dfv).putFlushId(99)
.submit(tm -> false);
assertEquals(Status.REJECTED, ctmi.process().get(e1).getStatus());
}

assertEquals(Set.of(stf1, stf2, stf3), context.getAmple().readTablet(e1).getFiles());
assertEquals(Map.of(stf1, fateId1, stf2, fateId1, stf3, fateId2),
context.getAmple().readTablet(e1).getLoaded());
assertTrue(context.getAmple().readTablet(e1).getFlushId().isEmpty());
}
}

0 comments on commit 802b4d6

Please sign in to comment.