From 802b4d689c32db84edd5370f9ea836d7b9e7c1a8 Mon Sep 17 00:00:00 2001 From: Keith Turner Date: Thu, 12 Dec 2024 13:25:31 -0500 Subject: [PATCH] narrows check of loaded files in conditional mutation (#5166) Bulk load fate code was reading a tablets loaded flags, checking it was not in them, and then making a conditional mutation that required the set of bulk flags to to be the same. Requiring the set to be the same caused unnecessary collisions between bulk imports. Modified the conditional check to require the loaded flags for the fate operation to be absent. --- .../accumulo/core/metadata/schema/Ample.java | 5 ++ .../ConditionalTabletMutatorImpl.java | 11 ++++ .../manager/tableOps/bulkVer2/LoadFiles.java | 4 +- .../functional/AmpleConditionalWriterIT.java | 61 +++++++++++++++++++ 4 files changed, 79 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java index 699ee319a10..b29b879d891 100644 --- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java +++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java @@ -525,6 +525,11 @@ ConditionalTabletMutator requireSame(TabletMetadata tabletMetadata, ColumnType t */ ConditionalTabletMutator requireLessOrEqualsFiles(long limit); + /** + * Requires that a tablet not have these loaded flags set. + */ + ConditionalTabletMutator requireAbsentLoaded(Set files); + /** *

* Ample provides the following features on top of the conditional writer to help automate diff --git a/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletMutatorImpl.java b/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletMutatorImpl.java index 8cee60d96d5..ff54603a823 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletMutatorImpl.java +++ b/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletMutatorImpl.java @@ -45,6 +45,7 @@ import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.iterators.SortedFilesIterator; import org.apache.accumulo.core.lock.ServiceLock; +import org.apache.accumulo.core.metadata.ReferencedTabletFile; import org.apache.accumulo.core.metadata.StoredTabletFile; import org.apache.accumulo.core.metadata.schema.Ample; import org.apache.accumulo.core.metadata.schema.Ample.ConditionalTabletMutator; @@ -353,6 +354,16 @@ public ConditionalTabletMutator requireLessOrEqualsFiles(long limit) { return this; } + @Override + public ConditionalTabletMutator requireAbsentLoaded(Set files) { + Preconditions.checkState(updatesEnabled, "Cannot make updates after calling mutate."); + for (ReferencedTabletFile file : files) { + Condition c = new Condition(BulkFileColumnFamily.STR_NAME, file.insert().getMetadata()); + mutation.addCondition(c); + } + return this; + } + @Override public void submit(Ample.RejectionHandler rejectionCheck) { Preconditions.checkState(updatesEnabled, "Cannot make updates after calling mutate."); diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java index 7b0f494c19a..5ff2ca3ec39 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java @@ -174,7 +174,6 @@ void load(List tablets, Files files) { } List rsc = new ArrayList<>(); - rsc.add(LOCATION); if (setTime) { rsc.add(TIME); } @@ -232,7 +231,8 @@ void load(List tablets, Files files) { } var tabletMutator = conditionalMutator.mutateTablet(tablet.getExtent()) - .requireAbsentOperation().requireSame(tablet, LOADED, requireSameCols); + .requireAbsentOperation().requireAbsentLoaded(filesToLoad.keySet()) + .requireSame(tablet, LOCATION, requireSameCols); if (pauseLimit > 0) { tabletMutator.requireLessOrEqualsFiles(pauseLimit); diff --git a/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java b/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java index c1bc46921aa..2cbb819a058 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java @@ -1874,4 +1874,65 @@ public void testFilesLimit() { assertEquals(time4, context.getAmple().readTablet(e1).getTime()); } + + @Test + public void testRequireAbsentLoaded() { + var context = cluster.getServerContext(); + + var stf1 = StoredTabletFile + .of(new Path("hdfs://localhost:8020/accumulo/tables/2a/default_tablet/F0000070.rf")); + var stf2 = StoredTabletFile + .of(new Path("hdfs://localhost:8020/accumulo/tables/2a/default_tablet/F0000071.rf")); + var stf3 = StoredTabletFile + .of(new Path("hdfs://localhost:8020/accumulo/tables/2a/default_tablet/F0000072.rf")); + var dfv = new DataFileValue(100, 100); + + FateId fateId1 = FateId.from(FateInstanceType.USER, UUID.randomUUID()); + + try (var ctmi = new ConditionalTabletsMutatorImpl(context)) { + ctmi.mutateTablet(e1).requireAbsentOperation() + .requireAbsentLoaded(Set.of(stf1.getTabletFile(), stf2.getTabletFile())) + .putBulkFile(stf1.getTabletFile(), fateId1).putBulkFile(stf2.getTabletFile(), fateId1) + .putFile(stf1, dfv).putFile(stf2, dfv).submit(tm -> false); + assertEquals(Status.ACCEPTED, ctmi.process().get(e1).getStatus()); + } + assertEquals(Set.of(stf1, stf2), context.getAmple().readTablet(e1).getFiles()); + assertEquals(Map.of(stf1, fateId1, stf2, fateId1), + context.getAmple().readTablet(e1).getLoaded()); + + FateId fateId2 = FateId.from(FateInstanceType.USER, UUID.randomUUID()); + + try (var ctmi = new ConditionalTabletsMutatorImpl(context)) { + ctmi.mutateTablet(e1).requireAbsentOperation() + .requireAbsentLoaded(Set.of(stf3.getTabletFile())) + .putBulkFile(stf3.getTabletFile(), fateId2).putFile(stf3, dfv).submit(tm -> false); + assertEquals(Status.ACCEPTED, ctmi.process().get(e1).getStatus()); + } + assertEquals(Set.of(stf1, stf2, stf3), context.getAmple().readTablet(e1).getFiles()); + assertEquals(Map.of(stf1, fateId1, stf2, fateId1, stf3, fateId2), + context.getAmple().readTablet(e1).getLoaded()); + + // should fail because the loaded markers are present + try (var ctmi = new ConditionalTabletsMutatorImpl(context)) { + ctmi.mutateTablet(e1).requireAbsentOperation() + .requireAbsentLoaded(Set.of(stf1.getTabletFile(), stf2.getTabletFile())) + .putBulkFile(stf1.getTabletFile(), fateId1).putBulkFile(stf2.getTabletFile(), fateId1) + .putFile(stf1, dfv).putFile(stf2, dfv).putFlushId(99).submit(tm -> false); + assertEquals(Status.REJECTED, ctmi.process().get(e1).getStatus()); + } + + // should fail because the loaded markers are present + try (var ctmi = new ConditionalTabletsMutatorImpl(context)) { + ctmi.mutateTablet(e1).requireAbsentOperation() + .requireAbsentLoaded(Set.of(stf3.getTabletFile())) + .putBulkFile(stf3.getTabletFile(), fateId2).putFile(stf3, dfv).putFlushId(99) + .submit(tm -> false); + assertEquals(Status.REJECTED, ctmi.process().get(e1).getStatus()); + } + + assertEquals(Set.of(stf1, stf2, stf3), context.getAmple().readTablet(e1).getFiles()); + assertEquals(Map.of(stf1, fateId1, stf2, fateId1, stf3, fateId2), + context.getAmple().readTablet(e1).getLoaded()); + assertTrue(context.getAmple().readTablet(e1).getFlushId().isEmpty()); + } }