Skip to content

Commit

Permalink
Merge branch 'main' into narrow-loaded-check
Browse files Browse the repository at this point in the history
  • Loading branch information
keith-turner committed Dec 12, 2024
2 parents dcceb25 + 08f77aa commit ef17447
Show file tree
Hide file tree
Showing 9 changed files with 347 additions and 78 deletions.
160 changes: 92 additions & 68 deletions assemble/bin/accumulo-cluster

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion assemble/bin/accumulo-service
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ function start_service() {
pid=$(cat "$pid_file")
if kill -0 "$pid" 2>/dev/null; then
echo "$HOST : ${service_name} already running (${pid})"
exit 0
continue
fi
fi
echo "Starting $service_name on $HOST"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -520,6 +520,11 @@ ConditionalTabletMutator requireSame(TabletMetadata tabletMetadata, ColumnType t
*/
ConditionalTabletMutator requireFiles(Set<StoredTabletFile> files);

/**
* Require that a tablet have less than or equals the specified number of files.
*/
ConditionalTabletMutator requireLessOrEqualsFiles(long limit);

/**
* Requires that a tablet not have these loaded flags set.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import org.apache.accumulo.core.tabletserver.log.LogEntry;
import org.apache.accumulo.core.util.Pair;
import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.server.metadata.iterators.ColumnFamilySizeLimitIterator;
import org.apache.accumulo.server.metadata.iterators.PresentIterator;
import org.apache.accumulo.server.metadata.iterators.SetEncodingIterator;
import org.apache.accumulo.server.metadata.iterators.TabletExistsIterator;
Expand Down Expand Up @@ -345,6 +346,14 @@ public ConditionalTabletMutator requireFiles(Set<StoredTabletFile> files) {
return this;
}

@Override
public ConditionalTabletMutator requireLessOrEqualsFiles(long limit) {
Preconditions.checkState(updatesEnabled, "Cannot make updates after calling mutate.");
Condition c = ColumnFamilySizeLimitIterator.createCondition(DataFileColumnFamily.NAME, limit);
mutation.addCondition(c);
return this;
}

@Override
public ConditionalTabletMutator requireAbsentLoaded(Set<ReferencedTabletFile> files) {
Preconditions.checkState(updatesEnabled, "Cannot make updates after calling mutate.");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.accumulo.server.metadata.iterators;

import static org.apache.accumulo.server.metadata.iterators.SetEncodingIterator.getTabletRow;

import java.io.IOException;
import java.util.Collection;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;

import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.data.ByteSequence;
import org.apache.accumulo.core.data.Condition;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.PartialKey;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.iterators.IteratorEnvironment;
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
import org.apache.accumulo.core.iterators.WrappingIterator;
import org.apache.accumulo.server.metadata.ConditionalTabletMutatorImpl;
import org.apache.hadoop.io.Text;

import com.google.common.base.Preconditions;

/**
* Iterator that checks if a column family size is less than or equal a limit as part of a
* conditional mutation.
*/
public class ColumnFamilySizeLimitIterator extends WrappingIterator {

private static final String LIMIT_OPT = "limit";
private static final Text EMPTY = new Text();

private Long limit;

private Key startKey = null;
private Value topValue = null;

@Override
public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options,
IteratorEnvironment env) throws IOException {
super.init(source, options, env);
limit = Long.parseLong(options.get(LIMIT_OPT));
Preconditions.checkState(limit >= 0);
}

@Override
public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive)
throws IOException {
Text tabletRow = getTabletRow(range);
Text family = range.getStartKey().getColumnFamily();

Preconditions.checkArgument(
family.getLength() > 0 && range.getStartKey().getColumnQualifier().getLength() == 0);

startKey = new Key(tabletRow, family);
Key endKey = startKey.followingKey(PartialKey.ROW_COLFAM);

Range r = new Range(startKey, true, endKey, false);

var source = getSource();
source.seek(r, Set.of(startKey.getColumnFamilyData()), true);

long count = 0;
while (source.hasTop()) {
source.next();
count++;
}

if (count <= limit) {
topValue = new Value("1");
} else {
topValue = null;
}
}

@Override
public boolean hasTop() {
if (startKey == null) {
throw new IllegalStateException("never been seeked");
}
return topValue != null;
}

@Override
public void next() throws IOException {
if (startKey == null) {
throw new IllegalStateException("never been seeked");
}
topValue = null;
}

@Override
public Key getTopKey() {
if (startKey == null) {
throw new IllegalStateException("never been seeked");
}
if (topValue == null) {
throw new NoSuchElementException();
}

return startKey;
}

@Override
public Value getTopValue() {
if (startKey == null) {
throw new IllegalStateException("never been seeked");
}
if (topValue == null) {
throw new NoSuchElementException();
}
return topValue;
}

/**
* Create a condition that checks if the specified column family's size is less than or equal to
* the given limit.
*/
public static Condition createCondition(Text family, long limit) {
IteratorSetting is = new IteratorSetting(ConditionalTabletMutatorImpl.INITIAL_ITERATOR_PRIO,
ColumnFamilySizeLimitIterator.class);
is.addOption(LIMIT_OPT, limit + "");
return new Condition(family, EMPTY).setValue("1").setIterators(is);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -93,11 +93,11 @@ public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean i
family.getLength() > 0 && range.getStartKey().getColumnQualifier().getLength() == 0);

startKey = new Key(tabletRow, family);
Key endKey = new Key(tabletRow, family).followingKey(PartialKey.ROW_COLFAM);
Key endKey = startKey.followingKey(PartialKey.ROW_COLFAM);

Range r = new Range(startKey, true, endKey, false);

source.seek(r, Set.of(), false);
source.seek(r, Set.of(startKey.getColumnFamilyData()), true);

try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(baos)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOCATION;
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.OPID;
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.SELECTED;
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.USER_COMPACTION_REQUESTED;

import java.io.FileNotFoundException;
import java.io.IOException;
Expand Down Expand Up @@ -555,9 +556,16 @@ protected CompactionMetadata reserveCompaction(CompactionJobQueues.MetaJob metaJ
compactorAddress, externalCompactionId);

// any data that is read from the tablet to make a decision about if it can compact or not
// must be included in the requireSame call
var tabletMutator = tabletsMutator.mutateTablet(extent).requireAbsentOperation()
.requireSame(tabletMetadata, FILES, SELECTED, ECOMP);
// must be checked for changes in the conditional mutation.
var tabletMutator =
tabletsMutator.mutateTablet(extent).requireAbsentOperation().requireFiles(jobFiles);
if (metaJob.getJob().getKind() == CompactionKind.SYSTEM) {
// For system compactions the user compaction requested column is examined when deciding
// if a compaction can start so need to check for changes to this column.
tabletMutator.requireSame(tabletMetadata, SELECTED, ECOMP, USER_COMPACTION_REQUESTED);
} else {
tabletMutator.requireSame(tabletMetadata, SELECTED, ECOMP);
}

if (metaJob.getJob().getKind() == CompactionKind.SYSTEM) {
var selectedFiles = tabletMetadata.getSelectedFiles();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,9 +177,6 @@ void load(List<TabletMetadata> tablets, Files files) {
if (setTime) {
rsc.add(TIME);
}
if (pauseLimit > 0) {
rsc.add(FILES);
}

ColumnType[] requireSameCols = rsc.toArray(new ColumnType[0]);

Expand Down Expand Up @@ -237,6 +234,10 @@ void load(List<TabletMetadata> tablets, Files files) {
.requireAbsentOperation().requireAbsentLoaded(filesToLoad.keySet())
.requireSame(tablet, LOCATION, requireSameCols);

if (pauseLimit > 0) {
tabletMutator.requireLessOrEqualsFiles(pauseLimit);
}

filesToLoad.forEach((f, v) -> {
tabletMutator.putBulkFile(f, fateId);
tabletMutator.putFile(f, v);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1788,7 +1788,7 @@ public void testRequiresFiles() {
assertEquals(time2, context.getAmple().readTablet(e1).getTime());

// Test mutation is rejected when a file is given that the tablet does not have
var time3 = MetadataTime.parse("L60");
var time3 = MetadataTime.parse("L70");
try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
ctmi.mutateTablet(e1).requireAbsentOperation().requireFiles(Set.of(stf1, stf4)).putTime(time3)
.submit(tm -> false);
Expand All @@ -1798,6 +1798,83 @@ public void testRequiresFiles() {
assertEquals(time2, context.getAmple().readTablet(e1).getTime());
}

@Test
public void testFilesLimit() {
var context = cluster.getServerContext();

var stf1 = StoredTabletFile
.of(new Path("hdfs://localhost:8020/accumulo/tables/2a/default_tablet/F0000070.rf"));
var stf2 = StoredTabletFile
.of(new Path("hdfs://localhost:8020/accumulo/tables/2a/default_tablet/F0000071.rf"));
var stf3 = StoredTabletFile
.of(new Path("hdfs://localhost:8020/accumulo/tables/2a/default_tablet/F0000072.rf"));
var stf4 = StoredTabletFile
.of(new Path("hdfs://localhost:8020/accumulo/tables/2a/default_tablet/C0000073.rf"));
var dfv = new DataFileValue(100, 100);

// Add 3 of the files, skip the 4th file
try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
ctmi.mutateTablet(e1).requireAbsentOperation().putFile(stf1, dfv).putFile(stf2, dfv)
.putFile(stf3, dfv).submit(tm -> false);
assertEquals(Status.ACCEPTED, ctmi.process().get(e1).getStatus());
}
assertEquals(Set.of(stf1, stf2, stf3), context.getAmple().readTablet(e1).getFiles());

// Test mutation is accepted when # files in tablet equals limit
var time1 = MetadataTime.parse("L50");
try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
ctmi.mutateTablet(e1).requireAbsentOperation().requireLessOrEqualsFiles(3).putTime(time1)
.submit(tm -> false);
assertEquals(Status.ACCEPTED, ctmi.process().get(e1).getStatus());
}
assertEquals(time1, context.getAmple().readTablet(e1).getTime());

// Test mutation is accepted when # files in tablet is less than limit
var time2 = MetadataTime.parse("L60");
try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
ctmi.mutateTablet(e1).requireAbsentOperation().requireLessOrEqualsFiles(4).putTime(time2)
.submit(tm -> false);
assertEquals(Status.ACCEPTED, ctmi.process().get(e1).getStatus());
}
assertEquals(time2, context.getAmple().readTablet(e1).getTime());

// Test mutation is rejected when # files in tablet is greater than limit
var time3 = MetadataTime.parse("L70");
try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
ctmi.mutateTablet(e1).requireAbsentOperation().requireLessOrEqualsFiles(2).putTime(time3)
.submit(tm -> false);
assertEquals(Status.REJECTED, ctmi.process().get(e1).getStatus());
}
// Should be previous time still as the mutation was rejected
assertEquals(time2, context.getAmple().readTablet(e1).getTime());

// add fourth file
try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
ctmi.mutateTablet(e1).requireAbsentOperation().putFile(stf4, dfv).submit(tm -> false);
assertEquals(Status.ACCEPTED, ctmi.process().get(e1).getStatus());
}
assertEquals(Set.of(stf1, stf2, stf3, stf4), context.getAmple().readTablet(e1).getFiles());

// Test mutation is rejected when # files in tablet is greater than limit
try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
ctmi.mutateTablet(e1).requireAbsentOperation().requireLessOrEqualsFiles(3).putTime(time3)
.submit(tm -> false);
assertEquals(Status.REJECTED, ctmi.process().get(e1).getStatus());
}
// Should be previous time still as the mutation was rejected
assertEquals(time2, context.getAmple().readTablet(e1).getTime());

// Test mutation is accepted when # files in tablet equals limit
var time4 = MetadataTime.parse("L80");
try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
ctmi.mutateTablet(e1).requireAbsentOperation().requireLessOrEqualsFiles(4).putTime(time4)
.submit(tm -> false);
assertEquals(Status.ACCEPTED, ctmi.process().get(e1).getStatus());
}
assertEquals(time4, context.getAmple().readTablet(e1).getTime());

}

@Test
public void testRequireAbsentLoaded() {
var context = cluster.getServerContext();
Expand Down

0 comments on commit ef17447

Please sign in to comment.