Skip to content

Commit

Permalink
avoids another contentious check in compaction reservation (#5168)
Browse files Browse the repository at this point in the history
* avoids another contentious check in compaction reservation

When reserving a set of files for compaction the conditional mutation
would require the current set of compactions it read in a tablet to be
the same when adding a compaction.  On a busy tablet that has lots of
compactions starting and stopping this set is rapidly changing and this
would cause the conditional mutation to fail and retry.

To avoid this the conditional mutation was improved to check that the
set of files being added for compaction to the tablet do not exist in
any current compactions.  This check will no longer fail for changes to
compactions on unrelated files.

* fix build error
  • Loading branch information
keith-turner authored Dec 13, 2024
1 parent 69f226e commit 5cd9c07
Show file tree
Hide file tree
Showing 9 changed files with 349 additions and 164 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -530,6 +530,11 @@ ConditionalTabletMutator requireSame(TabletMetadata tabletMetadata, ColumnType t
*/
ConditionalTabletMutator requireAbsentLoaded(Set<ReferencedTabletFile> files);

/**
* Requires the given set of files are not currently involved in any running compactions.
*/
ConditionalTabletMutator requireNotCompacting(Set<StoredTabletFile> files);

/**
* <p>
* Ample provides the following features on top of the conditional writer to help automate
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,5 +97,4 @@ public void close() {
bufferingMutator.close();
executor.shutdownNow();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import org.apache.accumulo.core.util.Pair;
import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.server.metadata.iterators.ColumnFamilySizeLimitIterator;
import org.apache.accumulo.server.metadata.iterators.DisjointCompactionIterator;
import org.apache.accumulo.server.metadata.iterators.PresentIterator;
import org.apache.accumulo.server.metadata.iterators.SetEncodingIterator;
import org.apache.accumulo.server.metadata.iterators.TabletExistsIterator;
Expand Down Expand Up @@ -364,6 +365,14 @@ public ConditionalTabletMutator requireAbsentLoaded(Set<ReferencedTabletFile> fi
return this;
}

@Override
public ConditionalTabletMutator requireNotCompacting(Set<StoredTabletFile> files) {
Preconditions.checkState(updatesEnabled, "Cannot make updates after calling mutate.");
Condition condition = DisjointCompactionIterator.createCondition(files);
mutation.addCondition(condition);
return this;
}

@Override
public void submit(Ample.RejectionHandler rejectionCheck) {
Preconditions.checkState(updatesEnabled, "Cannot make updates after calling mutate.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,15 @@
*/
package org.apache.accumulo.server.metadata.iterators;

import static org.apache.accumulo.server.metadata.iterators.SetEncodingIterator.getTabletRow;

import java.io.IOException;
import java.util.Collection;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;

import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.data.ByteSequence;
import org.apache.accumulo.core.data.Condition;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.PartialKey;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.iterators.IteratorEnvironment;
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
import org.apache.accumulo.core.iterators.WrappingIterator;
import org.apache.accumulo.server.metadata.ConditionalTabletMutatorImpl;
import org.apache.hadoop.io.Text;

Expand All @@ -45,16 +36,12 @@
* Iterator that checks if a column family size is less than or equal a limit as part of a
* conditional mutation.
*/
public class ColumnFamilySizeLimitIterator extends WrappingIterator {
public class ColumnFamilySizeLimitIterator extends ColumnFamilyTransformationIterator {

private static final String LIMIT_OPT = "limit";
private static final Text EMPTY = new Text();

private Long limit;

private Key startKey = null;
private Value topValue = null;

@Override
public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options,
IteratorEnvironment env) throws IOException {
Expand All @@ -64,72 +51,18 @@ public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> op
}

@Override
public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive)
throws IOException {
Text tabletRow = getTabletRow(range);
Text family = range.getStartKey().getColumnFamily();

Preconditions.checkArgument(
family.getLength() > 0 && range.getStartKey().getColumnQualifier().getLength() == 0);

startKey = new Key(tabletRow, family);
Key endKey = startKey.followingKey(PartialKey.ROW_COLFAM);

Range r = new Range(startKey, true, endKey, false);

var source = getSource();
source.seek(r, Set.of(startKey.getColumnFamilyData()), true);

protected Value transform(SortedKeyValueIterator<Key,Value> source) throws IOException {
long count = 0;
while (source.hasTop()) {
source.next();
count++;
}

if (count <= limit) {
topValue = new Value("1");
return new Value("1");
} else {
topValue = null;
}
}

@Override
public boolean hasTop() {
if (startKey == null) {
throw new IllegalStateException("never been seeked");
}
return topValue != null;
}

@Override
public void next() throws IOException {
if (startKey == null) {
throw new IllegalStateException("never been seeked");
}
topValue = null;
}

@Override
public Key getTopKey() {
if (startKey == null) {
throw new IllegalStateException("never been seeked");
}
if (topValue == null) {
throw new NoSuchElementException();
}

return startKey;
}

@Override
public Value getTopValue() {
if (startKey == null) {
throw new IllegalStateException("never been seeked");
}
if (topValue == null) {
throw new NoSuchElementException();
return null;
}
return topValue;
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.accumulo.server.metadata.iterators;

import java.io.IOException;
import java.util.Collection;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;

import org.apache.accumulo.core.data.ByteSequence;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.PartialKey;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.iterators.IteratorEnvironment;
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
import org.apache.accumulo.core.metadata.schema.MetadataSchema;
import org.apache.hadoop.io.Text;

import com.google.common.base.Preconditions;

/**
* Provides the ability to seek to and transform an entire column family in a row into a single
* value. This class is intended to be used with conditional mutation iterators that want to check
* an entire column family.
*/
public abstract class ColumnFamilyTransformationIterator
implements SortedKeyValueIterator<Key,Value> {

protected static final Text EMPTY = new Text();
private SortedKeyValueIterator<Key,Value> source;

private Key startKey = null;
private Value topValue = null;

static Text getTabletRow(Range range) {
var row = range.getStartKey().getRow();
// expecting this range to cover a single metadata row, so validate the range meets expectations
MetadataSchema.TabletsSection.validateRow(row);
Preconditions.checkArgument(row.equals(range.getEndKey().getRow()));
return row;
}

@Override
public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive)
throws IOException {

Text tabletRow = getTabletRow(range);
Text family = range.getStartKey().getColumnFamily();

Preconditions.checkArgument(
family.getLength() > 0 && range.getStartKey().getColumnQualifier().getLength() == 0);

Key startKey = new Key(tabletRow, family);
Key endKey = startKey.followingKey(PartialKey.ROW_COLFAM);

Range r = new Range(startKey, true, endKey, false);

source.seek(r, Set.of(startKey.getColumnFamilyData()), true);

topValue = transform(source);

this.startKey = startKey;
}

/**
* An iterator that is limited to a single column family is passed in and should be transformed to
* a single Value. Can return null if this iterator should not have a top.
*/
protected abstract Value transform(SortedKeyValueIterator<Key,Value> familyIterator)
throws IOException;

@Override
public Key getTopKey() {
if (startKey == null) {
throw new IllegalStateException("never been seeked");
}
if (topValue == null) {
throw new NoSuchElementException();
}

return startKey;
}

@Override
public Value getTopValue() {
if (startKey == null) {
throw new IllegalStateException("never been seeked");
}
if (topValue == null) {
throw new NoSuchElementException();
}
return topValue;
}

@Override
public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
throw new UnsupportedOperationException();
}

@Override
public boolean isRunningLowOnMemory() {
return source.isRunningLowOnMemory();
}

@Override
public boolean hasTop() {
if (startKey == null) {
throw new IllegalStateException("never been seeked");
}
return topValue != null;
}

@Override
public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options,
IteratorEnvironment env) throws IOException {
this.source = source;
}

@Override
public void next() throws IOException {
if (startKey == null) {
throw new IllegalStateException("never been seeked");
}
topValue = null;
}

}
Loading

0 comments on commit 5cd9c07

Please sign in to comment.