Skip to content

Commit

Permalink
Narrow the files checked by compaction commit (#5153)
Browse files Browse the repository at this point in the history
Previously the conditional mutation to commit a compaction would require
all files in the tablet be the same as read earlier and on a busy tablet
this could fail and retry often. The check has now been narrowed to only
verify that the files involved with the compaction still exist. A new
method was added to Ample called requireFiles(Set<StoredTabletFile> files)
which creates a condition for each file column to verify each one
exists.

This closes #5117
  • Loading branch information
cshannon authored Dec 9, 2024
1 parent a907748 commit 88b108e
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.function.Consumer;
import java.util.function.Predicate;

Expand Down Expand Up @@ -514,6 +515,11 @@ ConditionalTabletMutator requireSame(TabletMetadata tabletMetadata, ColumnType t

ConditionalTabletMutator requireAbsentLogs();

/**
* Require that a tablet contain all the files in the set
*/
ConditionalTabletMutator requireFiles(Set<StoredTabletFile> files);

/**
* <p>
* Ample provides the following features on top of the conditional writer to help automate
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,9 @@
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.iterators.SortedFilesIterator;
import org.apache.accumulo.core.lock.ServiceLock;
import org.apache.accumulo.core.metadata.StoredTabletFile;
import org.apache.accumulo.core.metadata.schema.Ample;
import org.apache.accumulo.core.metadata.schema.Ample.ConditionalTabletMutator;
import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.BulkFileColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.CompactedColumnFamily;
Expand Down Expand Up @@ -330,6 +332,18 @@ public Ample.ConditionalTabletMutator requireAbsentLogs() {
return this;
}

@Override
public ConditionalTabletMutator requireFiles(Set<StoredTabletFile> files) {
Preconditions.checkState(updatesEnabled, "Cannot make updates after calling mutate.");
IteratorSetting is = new IteratorSetting(INITIAL_ITERATOR_PRIO, PresentIterator.class);
for (StoredTabletFile file : files) {
Condition c = new Condition(DataFileColumnFamily.STR_NAME, file.getMetadata())
.setValue(PresentIterator.VALUE).setIterators(is);
mutation.addCondition(c);
}
return this;
}

@Override
public void submit(Ample.RejectionHandler rejectionCheck) {
Preconditions.checkState(updatesEnabled, "Cannot make updates after calling mutate.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,15 +110,16 @@ private TabletMetadata commitCompaction(ServerContext ctx, ExternalCompactionId
while (canCommitCompaction(ecid, tablet)) {
CompactionMetadata ecm = tablet.getExternalCompactions().get(ecid);

// the compacted files should not exists in the tablet already
// the compacted files should not exist in the tablet already
var tablet2 = tablet;
newDatafile.ifPresent(
newFile -> Preconditions.checkState(!tablet2.getFiles().contains(newFile.insert()),
"File already exists in tablet %s %s", newFile, tablet2.getFiles()));

try (var tabletsMutator = ctx.getAmple().conditionallyMutateTablets()) {
var tabletMutator = tabletsMutator.mutateTablet(getExtent()).requireAbsentOperation()
.requireCompaction(ecid).requireSame(tablet, FILES, LOCATION);
.requireCompaction(ecid).requireSame(tablet, LOCATION)
.requireFiles(commitData.getJobFiles());

if (ecm.getKind() == CompactionKind.USER) {
tabletMutator.requireSame(tablet, SELECTED, COMPACTED);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.accumulo.manager.compaction.coordinator.commit;

import java.io.Serializable;
import java.util.Collection;
import java.util.Set;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -56,7 +55,7 @@ public TableId getTableId() {
return KeyExtent.fromThrift(textent).tableId();
}

public Collection<StoredTabletFile> getJobFiles() {
return inputPaths.stream().map(StoredTabletFile::of).collect(Collectors.toList());
public Set<StoredTabletFile> getJobFiles() {
return inputPaths.stream().map(StoredTabletFile::of).collect(Collectors.toSet());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1746,4 +1746,55 @@ public void testErrors() {
}
}
}

@Test
public void testRequiresFiles() {
var context = cluster.getServerContext();

var stf1 = StoredTabletFile
.of(new Path("hdfs://localhost:8020/accumulo/tables/2a/default_tablet/F0000070.rf"));
var stf2 = StoredTabletFile
.of(new Path("hdfs://localhost:8020/accumulo/tables/2a/default_tablet/F0000071.rf"));
var stf3 = StoredTabletFile
.of(new Path("hdfs://localhost:8020/accumulo/tables/2a/default_tablet/F0000072.rf"));
var stf4 = StoredTabletFile
.of(new Path("hdfs://localhost:8020/accumulo/tables/2a/default_tablet/C0000073.rf"));
var dfv = new DataFileValue(100, 100);

// Add 3 of the files, skip the 4th file
try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
ctmi.mutateTablet(e1).requireAbsentOperation().putFile(stf1, dfv).putFile(stf2, dfv)
.putFile(stf3, dfv).submit(tm -> false);
assertEquals(Status.ACCEPTED, ctmi.process().get(e1).getStatus());
}
assertEquals(Set.of(stf1, stf2, stf3), context.getAmple().readTablet(e1).getFiles());

// Test mutation is accepted when given all files
var time1 = MetadataTime.parse("L50");
try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
ctmi.mutateTablet(e1).requireAbsentOperation().requireFiles(Set.of(stf1, stf2, stf3))
.putTime(time1).submit(tm -> false);
assertEquals(Status.ACCEPTED, ctmi.process().get(e1).getStatus());
}
assertEquals(time1, context.getAmple().readTablet(e1).getTime());

// Test mutation is accepted when a subset of files is given
var time2 = MetadataTime.parse("L60");
try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
ctmi.mutateTablet(e1).requireAbsentOperation().requireFiles(Set.of(stf1, stf3)).putTime(time2)
.submit(tm -> false);
assertEquals(Status.ACCEPTED, ctmi.process().get(e1).getStatus());
}
assertEquals(time2, context.getAmple().readTablet(e1).getTime());

// Test mutation is rejected when a file is given that the tablet does not have
var time3 = MetadataTime.parse("L60");
try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
ctmi.mutateTablet(e1).requireAbsentOperation().requireFiles(Set.of(stf1, stf4)).putTime(time3)
.submit(tm -> false);
assertEquals(Status.REJECTED, ctmi.process().get(e1).getStatus());
}
// Should be previous time still as the mutation was rejected
assertEquals(time2, context.getAmple().readTablet(e1).getTime());
}
}

0 comments on commit 88b108e

Please sign in to comment.