diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java index 699ee319a10..b29b879d891 100644 --- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java +++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java @@ -525,6 +525,11 @@ ConditionalTabletMutator requireSame(TabletMetadata tabletMetadata, ColumnType t */ ConditionalTabletMutator requireLessOrEqualsFiles(long limit); + /** + * Requires that a tablet not have these loaded flags set. + */ + ConditionalTabletMutator requireAbsentLoaded(Set files); + /** *

* Ample provides the following features on top of the conditional writer to help automate diff --git a/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletMutatorImpl.java b/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletMutatorImpl.java index 8cee60d96d5..ff54603a823 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletMutatorImpl.java +++ b/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletMutatorImpl.java @@ -45,6 +45,7 @@ import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.iterators.SortedFilesIterator; import org.apache.accumulo.core.lock.ServiceLock; +import org.apache.accumulo.core.metadata.ReferencedTabletFile; import org.apache.accumulo.core.metadata.StoredTabletFile; import org.apache.accumulo.core.metadata.schema.Ample; import org.apache.accumulo.core.metadata.schema.Ample.ConditionalTabletMutator; @@ -353,6 +354,16 @@ public ConditionalTabletMutator requireLessOrEqualsFiles(long limit) { return this; } + @Override + public ConditionalTabletMutator requireAbsentLoaded(Set files) { + Preconditions.checkState(updatesEnabled, "Cannot make updates after calling mutate."); + for (ReferencedTabletFile file : files) { + Condition c = new Condition(BulkFileColumnFamily.STR_NAME, file.insert().getMetadata()); + mutation.addCondition(c); + } + return this; + } + @Override public void submit(Ample.RejectionHandler rejectionCheck) { Preconditions.checkState(updatesEnabled, "Cannot make updates after calling mutate."); diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java index 7b0f494c19a..5ff2ca3ec39 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java @@ -174,7 +174,6 @@ void load(List tablets, Files files) { } List rsc = new ArrayList<>(); - rsc.add(LOCATION); if (setTime) { rsc.add(TIME); } @@ -232,7 +231,8 @@ void load(List tablets, Files files) { } var tabletMutator = conditionalMutator.mutateTablet(tablet.getExtent()) - .requireAbsentOperation().requireSame(tablet, LOADED, requireSameCols); + .requireAbsentOperation().requireAbsentLoaded(filesToLoad.keySet()) + .requireSame(tablet, LOCATION, requireSameCols); if (pauseLimit > 0) { tabletMutator.requireLessOrEqualsFiles(pauseLimit); diff --git a/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java b/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java index c1bc46921aa..2cbb819a058 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java @@ -1874,4 +1874,65 @@ public void testFilesLimit() { assertEquals(time4, context.getAmple().readTablet(e1).getTime()); } + + @Test + public void testRequireAbsentLoaded() { + var context = cluster.getServerContext(); + + var stf1 = StoredTabletFile + .of(new Path("hdfs://localhost:8020/accumulo/tables/2a/default_tablet/F0000070.rf")); + var stf2 = StoredTabletFile + .of(new Path("hdfs://localhost:8020/accumulo/tables/2a/default_tablet/F0000071.rf")); + var stf3 = StoredTabletFile + .of(new Path("hdfs://localhost:8020/accumulo/tables/2a/default_tablet/F0000072.rf")); + var dfv = new DataFileValue(100, 100); + + FateId fateId1 = FateId.from(FateInstanceType.USER, UUID.randomUUID()); + + try (var ctmi = new ConditionalTabletsMutatorImpl(context)) { + ctmi.mutateTablet(e1).requireAbsentOperation() + .requireAbsentLoaded(Set.of(stf1.getTabletFile(), stf2.getTabletFile())) + .putBulkFile(stf1.getTabletFile(), fateId1).putBulkFile(stf2.getTabletFile(), fateId1) + .putFile(stf1, dfv).putFile(stf2, dfv).submit(tm -> false); + assertEquals(Status.ACCEPTED, ctmi.process().get(e1).getStatus()); + } + assertEquals(Set.of(stf1, stf2), context.getAmple().readTablet(e1).getFiles()); + assertEquals(Map.of(stf1, fateId1, stf2, fateId1), + context.getAmple().readTablet(e1).getLoaded()); + + FateId fateId2 = FateId.from(FateInstanceType.USER, UUID.randomUUID()); + + try (var ctmi = new ConditionalTabletsMutatorImpl(context)) { + ctmi.mutateTablet(e1).requireAbsentOperation() + .requireAbsentLoaded(Set.of(stf3.getTabletFile())) + .putBulkFile(stf3.getTabletFile(), fateId2).putFile(stf3, dfv).submit(tm -> false); + assertEquals(Status.ACCEPTED, ctmi.process().get(e1).getStatus()); + } + assertEquals(Set.of(stf1, stf2, stf3), context.getAmple().readTablet(e1).getFiles()); + assertEquals(Map.of(stf1, fateId1, stf2, fateId1, stf3, fateId2), + context.getAmple().readTablet(e1).getLoaded()); + + // should fail because the loaded markers are present + try (var ctmi = new ConditionalTabletsMutatorImpl(context)) { + ctmi.mutateTablet(e1).requireAbsentOperation() + .requireAbsentLoaded(Set.of(stf1.getTabletFile(), stf2.getTabletFile())) + .putBulkFile(stf1.getTabletFile(), fateId1).putBulkFile(stf2.getTabletFile(), fateId1) + .putFile(stf1, dfv).putFile(stf2, dfv).putFlushId(99).submit(tm -> false); + assertEquals(Status.REJECTED, ctmi.process().get(e1).getStatus()); + } + + // should fail because the loaded markers are present + try (var ctmi = new ConditionalTabletsMutatorImpl(context)) { + ctmi.mutateTablet(e1).requireAbsentOperation() + .requireAbsentLoaded(Set.of(stf3.getTabletFile())) + .putBulkFile(stf3.getTabletFile(), fateId2).putFile(stf3, dfv).putFlushId(99) + .submit(tm -> false); + assertEquals(Status.REJECTED, ctmi.process().get(e1).getStatus()); + } + + assertEquals(Set.of(stf1, stf2, stf3), context.getAmple().readTablet(e1).getFiles()); + assertEquals(Map.of(stf1, fateId1, stf2, fateId1, stf3, fateId2), + context.getAmple().readTablet(e1).getLoaded()); + assertTrue(context.getAmple().readTablet(e1).getFlushId().isEmpty()); + } }