diff --git a/bookkeeper-server/pom.xml b/bookkeeper-server/pom.xml index 9800880c274..72022edd144 100644 --- a/bookkeeper-server/pom.xml +++ b/bookkeeper-server/pom.xml @@ -59,6 +59,12 @@ 4.8.1 test + + org.mockito + mockito-all + 1.10.19 + test + org.slf4j slf4j-api diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ScanAndCompareGarbageCollector.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ScanAndCompareGarbageCollector.java index e9b2bb854a0..90c095c021a 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ScanAndCompareGarbageCollector.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ScanAndCompareGarbageCollector.java @@ -27,6 +27,7 @@ import java.util.SortedMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Semaphore; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.bookkeeper.bookie.GarbageCollectorThread.CompactableLedgerStorage; import org.apache.bookkeeper.client.BKException; @@ -89,11 +90,10 @@ public void gc(GarbageCleaner garbageCleaner) { // Iterate over all the ledger on the metadata store LedgerRangeIterator ledgerRangeIterator = ledgerManager.getLedgerRanges(); - if (!ledgerRangeIterator.hasNext()) { // Empty global active ledgers, need to remove all local active ledgers. for (long ledgerId : bkActiveLedgers) { - garbageCleaner.clean(ledgerId); + gcLedgerSafely(garbageCleaner, ledgerId, ledgerManager); } } @@ -127,7 +127,7 @@ public void gc(GarbageCleaner garbageCleaner) { } for (Long bkLid : subBkActiveLedgers) { if (!ledgersInMetadata.contains(bkLid)) { - garbageCleaner.clean(bkLid); + gcLedgerSafely(garbageCleaner, bkLid, ledgerManager); } } lastEnd = end; @@ -138,6 +138,35 @@ public void gc(GarbageCleaner garbageCleaner) { } } + /** + * Cleans ledger safely by verifying ledger-metadata is deleted from the zk else it skips ledger-gc if ledger-metada + * exists in zk. + * + * @param garbageCleaner + * @param ledgerId + * @param ledgerManager + * @param semaphore + * @throws InterruptedException + */ + private void gcLedgerSafely(GarbageCleaner garbageCleaner, long ledgerId, LedgerManager ledgerManager) throws InterruptedException { + CountDownLatch latch = new CountDownLatch(1); + AtomicBoolean ledgerDeleted = new AtomicBoolean(false); + ledgerManager.existsLedgerMetadata(ledgerId, (rc, exists) -> { + if (rc == BKException.Code.NoSuchLedgerExistsException) { + ledgerDeleted.set(true); + } else if (rc == BKException.Code.OK) { + LOG.warn("Can't delete ledger {} with metadata exists in zk ", ledgerId); + } else { + LOG.warn("Fail to check {} exists in zk {}", ledgerId, BKException.getMessage(rc)); + } + latch.countDown(); + }); + latch.await(); + if (ledgerDeleted.get()) { + garbageCleaner.clean(ledgerId); + } + } + private Set removeOverReplicatedledgers(Set bkActiveledgers, GarbageCleaner garbageCleaner) throws InterruptedException, KeeperException { Set overReplicatedLedgers = Sets.newHashSet(); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java index 4fae58ca0cf..ed571d7c0bd 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java @@ -22,7 +22,6 @@ import java.util.List; import java.util.NavigableSet; import java.util.Set; -import java.util.TreeSet; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Executors; @@ -37,7 +36,6 @@ import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.MultiCallback; import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.Processor; import org.apache.bookkeeper.util.BookKeeperConstants; -import org.apache.bookkeeper.util.StringUtils; import org.apache.bookkeeper.util.ZkUtils; import org.apache.bookkeeper.versioning.Version; import org.apache.zookeeper.AsyncCallback; @@ -350,6 +348,23 @@ public void readLedgerMetadata(final long ledgerId, final GenericCallback callback) { + zk.exists(getLedgerPath(ledgerId), false, (int rc, String path, Object ctx, Stat stat) -> { + if (rc == KeeperException.Code.NONODE.intValue()) { + callback.operationComplete(BKException.Code.NoSuchLedgerExistsException, false); + return; + } else if (rc != KeeperException.Code.OK.intValue()) { + LOG.error("Could not check metadata exists for ledger: " + ledgerId, + KeeperException.create(KeeperException.Code.get(rc), path)); + callback.operationComplete(BKException.Code.ZKException, false); + return; + } else { + callback.operationComplete(BKException.Code.OK, true); + } + }, null); + } + protected void readLedgerMetadata(final long ledgerId, final GenericCallback readCb, Watcher watcher) { zk.getData(getLedgerPath(ledgerId), watcher, new DataCallback() { diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/CleanupLedgerManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/CleanupLedgerManager.java index 961e0d163b3..bf00e6c13c7 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/CleanupLedgerManager.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/CleanupLedgerManager.java @@ -143,6 +143,20 @@ public void readLedgerMetadata(long ledgerId, } } + @Override + public void existsLedgerMetadata(final long ledgerId, final GenericCallback callback) { + closeLock.readLock().lock(); + try { + if (closed) { + callback.operationComplete(BKException.Code.ClientClosedException, false); + return; + } + underlying.existsLedgerMetadata(ledgerId, new CleanupGenericCallback(callback)); + } finally { + closeLock.readLock().unlock(); + } + } + @Override public void writeLedgerMetadata(long ledgerId, LedgerMetadata metadata, GenericCallback cb) { diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManager.java index fe3c2cf4d44..1a892dae257 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManager.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManager.java @@ -89,6 +89,20 @@ public interface LedgerManager extends Closeable { */ public void readLedgerMetadata(long ledgerId, GenericCallback readCb); + /** + * Check ledger metadata exists for a specified ledger. + * + * @param ledgerId + * Ledger Id + * @param readCb + * Callback when read ledger metadata. Return code:
    + *
  • {@link BKException.Code.OK} if success
  • + *
  • {@link BKException.Code.NoSuchLedgerExistsException} if ledger not exist
  • + *
  • {@link BKException.Code.ZKException} for other issue
  • + *
+ */ + void existsLedgerMetadata(long ledgerId, GenericCallback callback); + /** * Write ledger metadata. * diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MSLedgerManagerFactory.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MSLedgerManagerFactory.java index 9f7ef386da3..7a2a4a4ca52 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MSLedgerManagerFactory.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MSLedgerManagerFactory.java @@ -420,6 +420,30 @@ public void complete(int rc, Versioned value, Object ctx) { ledgerTable.get(key, this, msCallback, ALL_FIELDS); } + @Override + public void existsLedgerMetadata(final long ledgerId, final GenericCallback callback) { + final String key = ledgerId2Key(ledgerId); + MetastoreCallback> msCallback = new MetastoreCallback>() { + @Override + public void complete(int rc, Versioned value, Object ctx) { + if (MSException.Code.NoKey.getCode() == rc) { + LOG.error("No ledger metadata found for ledger " + ledgerId + " : ", + MSException.create(MSException.Code.get(rc), "No key " + key + " found.")); + callback.operationComplete(BKException.Code.NoSuchLedgerExistsException, false); + return; + } else if (MSException.Code.OK.getCode() != rc) { + LOG.error("Could not read metadata for ledger " + ledgerId + " : ", + MSException.create(MSException.Code.get(rc), "Failed to get key " + key)); + callback.operationComplete(BKException.Code.MetaStoreException, false); + return; + } else { + callback.operationComplete(BKException.Code.OK, true); + } + } + }; + ledgerTable.get(key, this, msCallback, ALL_FIELDS); + } + @Override public void writeLedgerMetadata(final long ledgerId, final LedgerMetadata metadata, final GenericCallback cb) { diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java index 4d03ad33668..e3cbee98909 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java @@ -528,6 +528,10 @@ public LedgerManager.LedgerRange next() throws IOException { } }; } + @Override + public void existsLedgerMetadata(long ledgerId, GenericCallback callback) { + unsupported(); + } }; } diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java index 422c4efc764..4f03f754a72 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java @@ -53,9 +53,12 @@ import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback; import org.apache.bookkeeper.versioning.Version; import org.junit.Test; +import org.mockito.Mockito; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.collect.Sets; + /** * Test garbage collection ledgers in ledger manager */ @@ -300,4 +303,64 @@ public void clean(long ledgerId) { assertEquals("Should have cleaned something", 1, cleaned.size()); assertEquals("Should have cleaned first ledger" + first, (long) first, (long) cleaned.get(0)); } + + /** + * It verifies that GC doesn't delete ledgers which are not deleted and its metadata present into zookeeper even + * though ledgerManager derives active ledger as deleted. + * + *
+     *  1. Create list of ledgers
+     *  2. Mock ledgerManager that derives live ledgers as deleted ledgers
+     *  3. GC performs sanity before deleting ledger and doesn't delete if ledger metadata present
+     * 
+ * + * @throws Exception + */ + @Test(timeout = 120000) + public void testGCTryToDeleteActiveLedgers() throws Exception { + final SortedSet createdLedgers = Collections.synchronizedSortedSet(new TreeSet()); + final List cleaned = new ArrayList(); + + final int numLedgers = 100; + + createLedgers(numLedgers, createdLedgers); + + LedgerManager ledgerManager = Mockito.spy(getLedgerManager()); + LedgerRange range = new LedgerRange(Sets.newHashSet(createdLedgers.first())); + AtomicInteger noOfIterations = new AtomicInteger(1); + + LedgerRangeIterator ledgerRangeIterator = new LedgerRangeIterator() { + @Override + public boolean hasNext() throws IOException { + return noOfIterations.get() > 0; + } + + @Override + public LedgerRange next() throws IOException { + noOfIterations.decrementAndGet(); + return range; + } + }; + + Mockito.doReturn(ledgerRangeIterator).when(ledgerManager).getLedgerRanges(); + + final GarbageCollector garbageCollector = new ScanAndCompareGarbageCollector(ledgerManager, + new MockLedgerStorage(), null, null, false, baseConf.getZkLedgersRootPath()); + GarbageCollector.GarbageCleaner cleaner = new GarbageCollector.GarbageCleaner() { + @Override + public void clean(long ledgerId) { + LOG.info("Cleaned {}", ledgerId); + cleaned.add(ledgerId); + } + }; + + garbageCollector.gc(cleaner); + assertTrue("Should have cleaned nothing", cleaned.isEmpty()); + + long first = createdLedgers.first(); + removeLedger(first); + garbageCollector.gc(cleaner); + assertEquals("Should have cleaned something", 1, cleaned.size()); + assertEquals("Should have cleaned first ledger" + first, (long) first, (long) cleaned.get(0)); + } }