Skip to content
This repository has been archived by the owner on Apr 30, 2019. It is now read-only.

Add sanity check in GC to prevent active ledger deletion #15

Open
wants to merge 3 commits into
base: yahoo-4.3
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions bookkeeper-server/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,12 @@
<version>4.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<version>1.10.19</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.SortedMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.bookkeeper.bookie.GarbageCollectorThread.CompactableLedgerStorage;
import org.apache.bookkeeper.client.BKException;
Expand Down Expand Up @@ -87,13 +88,13 @@ public void gc(GarbageCleaner garbageCleaner) {
// Get a set of all ledgers on the bookie
NavigableSet<Long> bkActiveLedgers = Sets.newTreeSet(ledgerStorage.getActiveLedgersInRange(0, Long.MAX_VALUE));

Semaphore semaphore = new Semaphore(MAX_CONCURRENT_ZK_REQUESTS);
// Iterate over all the ledger on the metadata store
LedgerRangeIterator ledgerRangeIterator = ledgerManager.getLedgerRanges();

if (!ledgerRangeIterator.hasNext()) {
// Empty global active ledgers, need to remove all local active ledgers.
for (long ledgerId : bkActiveLedgers) {
garbageCleaner.clean(ledgerId);
gcLedgerSafely(garbageCleaner, ledgerId, ledgerManager, semaphore);
}
}

Expand Down Expand Up @@ -127,7 +128,7 @@ public void gc(GarbageCleaner garbageCleaner) {
}
for (Long bkLid : subBkActiveLedgers) {
if (!ledgersInMetadata.contains(bkLid)) {
garbageCleaner.clean(bkLid);
gcLedgerSafely(garbageCleaner, bkLid, ledgerManager, semaphore);
}
}
lastEnd = end;
Expand All @@ -138,6 +139,38 @@ public void gc(GarbageCleaner garbageCleaner) {
}
}

/**
* Cleans ledger safely by verifying ledger-metadata is deleted from the zk else it skips ledger-gc if ledger-metada
* exists in zk.
*
* @param garbageCleaner
* @param ledgerId
* @param ledgerManager
* @param semaphore
* @throws InterruptedException
*/
private void gcLedgerSafely(GarbageCleaner garbageCleaner, long ledgerId, LedgerManager ledgerManager,
Semaphore semaphore) throws InterruptedException {
semaphore.acquire();
CountDownLatch latch = new CountDownLatch(1);
AtomicBoolean ledgerDeleted = new AtomicBoolean(false);
ledgerManager.existLedgerMetadata(ledgerId, (rc, exists) -> {
if (rc == BKException.Code.NoSuchLedgerExistsException) {
ledgerDeleted.set(true);
} else if (rc == BKException.Code.OK) {
LOG.warn("Can't delete ledger {} with metadata exists in zk ", ledgerId);
} else {
LOG.warn("Fail to check {} exists in zk {}", ledgerId, BKException.getMessage(rc));
}
latch.countDown();
semaphore.release();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need a semaphore at all. Except during unit tests the gc method is called by scheduleAtFixedRate, which guarantees that later calls to gc will not happen until the previous ones finish.

if garbageCleaner.clean was inside the callback then the semaphore would make since, but with the CountDownLatch this is a blocking call, so it really doesn't.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, i missed to remove it after adding latch. will fix it.

});
latch.await();
if (ledgerDeleted.get()) {
garbageCleaner.clean(ledgerId);
}
}

private Set<Long> removeOverReplicatedledgers(Set<Long> bkActiveledgers, GarbageCleaner garbageCleaner)
throws InterruptedException, KeeperException {
Set<Long> overReplicatedLedgers = Sets.newHashSet();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import java.util.List;
import java.util.NavigableSet;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
Expand All @@ -37,7 +36,6 @@
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.MultiCallback;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.Processor;
import org.apache.bookkeeper.util.BookKeeperConstants;
import org.apache.bookkeeper.util.StringUtils;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.bookkeeper.versioning.Version;
import org.apache.zookeeper.AsyncCallback;
Expand Down Expand Up @@ -350,6 +348,23 @@ public void readLedgerMetadata(final long ledgerId, final GenericCallback<Ledger
readLedgerMetadata(ledgerId, readCb, null);
}

@Override
public void existLedgerMetadata(final long ledgerId, final GenericCallback<Boolean> callback) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo, 'exists' instead of 'exit'?

zk.exists(getLedgerPath(ledgerId), false, (int rc, String path, Object ctx, Stat stat) -> {
if (rc == KeeperException.Code.NONODE.intValue()) {
callback.operationComplete(BKException.Code.NoSuchLedgerExistsException, false);
return;
} else if (rc != KeeperException.Code.OK.intValue()) {
LOG.error("Could not check metadata exists for ledger: " + ledgerId,
KeeperException.create(KeeperException.Code.get(rc), path));
callback.operationComplete(BKException.Code.ZKException, false);
return;
} else {
callback.operationComplete(BKException.Code.OK, true);
}
}, null);
}

protected void readLedgerMetadata(final long ledgerId, final GenericCallback<LedgerMetadata> readCb,
Watcher watcher) {
zk.getData(getLedgerPath(ledgerId), watcher, new DataCallback() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,20 @@ public void readLedgerMetadata(long ledgerId,
}
}

@Override
public void existLedgerMetadata(final long ledgerId, final GenericCallback<Boolean> callback) {
closeLock.readLock().lock();
try {
if (closed) {
callback.operationComplete(BKException.Code.ClientClosedException, false);
return;
}
underlying.existLedgerMetadata(ledgerId, new CleanupGenericCallback<Boolean>(callback));
} finally {
closeLock.readLock().unlock();
}
}

@Override
public void writeLedgerMetadata(long ledgerId, LedgerMetadata metadata,
GenericCallback<Void> cb) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,20 @@ public interface LedgerManager extends Closeable {
*/
public void readLedgerMetadata(long ledgerId, GenericCallback<LedgerMetadata> readCb);

/**
* Check ledger metadata exists for a specified ledger.
*
* @param ledgerId
* Ledger Id
* @param readCb
* Callback when read ledger metadata. Return code:<ul>
* <li>{@link BKException.Code.OK} if success</li>
* <li>{@link BKException.Code.NoSuchLedgerExistsException} if ledger not exist</li>
* <li>{@link BKException.Code.ZKException} for other issue</li>
* </ul>
*/
void existLedgerMetadata(long ledgerId, GenericCallback<Boolean> callback);

/**
* Write ledger metadata.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,30 @@ public void complete(int rc, Versioned<Value> value, Object ctx) {
ledgerTable.get(key, this, msCallback, ALL_FIELDS);
}

@Override
public void existLedgerMetadata(final long ledgerId, final GenericCallback<Boolean> callback) {
final String key = ledgerId2Key(ledgerId);
MetastoreCallback<Versioned<Value>> msCallback = new MetastoreCallback<Versioned<Value>>() {
@Override
public void complete(int rc, Versioned<Value> value, Object ctx) {
if (MSException.Code.NoKey.getCode() == rc) {
LOG.error("No ledger metadata found for ledger " + ledgerId + " : ",
MSException.create(MSException.Code.get(rc), "No key " + key + " found."));
callback.operationComplete(BKException.Code.NoSuchLedgerExistsException, false);
return;
} else if (MSException.Code.OK.getCode() != rc) {
LOG.error("Could not read metadata for ledger " + ledgerId + " : ",
MSException.create(MSException.Code.get(rc), "Failed to get key " + key));
callback.operationComplete(BKException.Code.MetaStoreException, false);
return;
} else {
callback.operationComplete(BKException.Code.OK, true);
}
}
};
ledgerTable.get(key, this, msCallback, ALL_FIELDS);
}

@Override
public void writeLedgerMetadata(final long ledgerId, final LedgerMetadata metadata,
final GenericCallback<Void> cb) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -528,6 +528,10 @@ public LedgerManager.LedgerRange next() throws IOException {
}
};
}
@Override
public void existLedgerMetadata(long ledgerId, GenericCallback<Boolean> callback) {
unsupported();
}
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,12 @@
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
import org.apache.bookkeeper.versioning.Version;
import org.junit.Test;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.collect.Sets;

/**
* Test garbage collection ledgers in ledger manager
*/
Expand Down Expand Up @@ -300,4 +303,64 @@ public void clean(long ledgerId) {
assertEquals("Should have cleaned something", 1, cleaned.size());
assertEquals("Should have cleaned first ledger" + first, (long) first, (long) cleaned.get(0));
}

/**
* It verifies that GC doesn't delete ledgers which are not deleted and its metadata present into zookeeper even
* though ledgerManager derives active ledger as deleted.
*
* <pre>
* 1. Create list of ledgers
* 2. Mock ledgerManager that derives live ledgers as deleted ledgers
* 3. GC performs sanity before deleting ledger and doesn't delete if ledger metadata present
* </pre>
*
* @throws Exception
*/
@Test(timeout = 120000)
public void testGCTryToDeleteActiveLedgers() throws Exception {
final SortedSet<Long> createdLedgers = Collections.synchronizedSortedSet(new TreeSet<Long>());
final List<Long> cleaned = new ArrayList<Long>();

final int numLedgers = 100;

createLedgers(numLedgers, createdLedgers);

LedgerManager ledgerManager = Mockito.spy(getLedgerManager());
LedgerRange range = new LedgerRange(Sets.newHashSet(createdLedgers.first()));
AtomicInteger noOfIterations = new AtomicInteger(1);

LedgerRangeIterator ledgerRangeIterator = new LedgerRangeIterator() {
@Override
public boolean hasNext() throws IOException {
return noOfIterations.get() > 0;
}

@Override
public LedgerRange next() throws IOException {
noOfIterations.decrementAndGet();
return range;
}
};

Mockito.doReturn(ledgerRangeIterator).when(ledgerManager).getLedgerRanges();

final GarbageCollector garbageCollector = new ScanAndCompareGarbageCollector(ledgerManager,
new MockLedgerStorage(), null, null, false, baseConf.getZkLedgersRootPath());
GarbageCollector.GarbageCleaner cleaner = new GarbageCollector.GarbageCleaner() {
@Override
public void clean(long ledgerId) {
LOG.info("Cleaned {}", ledgerId);
cleaned.add(ledgerId);
}
};

garbageCollector.gc(cleaner);
assertTrue("Should have cleaned nothing", cleaned.isEmpty());

long first = createdLedgers.first();
removeLedger(first);
garbageCollector.gc(cleaner);
assertEquals("Should have cleaned something", 1, cleaned.size());
assertEquals("Should have cleaned first ledger" + first, (long) first, (long) cleaned.get(0));
}
}