Skip to content

Commit

Permalink
[FLINK-33798][statebackend/rocksdb] automatically clean up rocksdb l…
Browse files Browse the repository at this point in the history
…ogs when the task exited. (#23922)
  • Loading branch information
liming30 authored Mar 13, 2024
1 parent 94b55d1 commit 398bb50
Show file tree
Hide file tree
Showing 2 changed files with 112 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.runtime.memory.OpaqueMemoryResource;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;

Expand All @@ -44,7 +45,11 @@
import javax.annotation.Nullable;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;

import static org.apache.flink.util.Preconditions.checkNotNull;
Expand All @@ -59,8 +64,11 @@
public final class RocksDBResourceContainer implements AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(RocksDBResourceContainer.class);

private static final String ROCKSDB_RELOCATE_LOG_SUFFIX = "_LOG";

// the filename length limit is 255 on most operating systems
private static final int INSTANCE_PATH_LENGTH_LIMIT = 255 - "_LOG".length();
private static final int INSTANCE_PATH_LENGTH_LIMIT =
255 - ROCKSDB_RELOCATE_LOG_SUFFIX.length();

@Nullable private final File instanceRocksDBPath;

Expand All @@ -85,6 +93,8 @@ public final class RocksDBResourceContainer implements AutoCloseable {
/** The handles to be closed when the container is closed. */
private final ArrayList<AutoCloseable> handlesToClose;

@Nullable private Path relocatedDbLogBaseDir;

@VisibleForTesting
public RocksDBResourceContainer() {
this(new Configuration(), PredefinedOptions.DEFAULT, null, null, null, false);
Expand Down Expand Up @@ -267,6 +277,7 @@ public void close() throws Exception {
if (sharedResources != null) {
sharedResources.close();
}
cleanRelocatedDbLogs();
}

/**
Expand Down Expand Up @@ -426,7 +437,9 @@ private void relocateDefaultDbLogDir(DBOptions dbOptions) {
if (logFilePath != null) {
File logFile = resolveFileLocation(logFilePath);
if (logFile != null && resolveFileLocation(logFile.getParent()) != null) {
dbOptions.setDbLogDir(logFile.getParent());
String relocatedDbLogDir = logFile.getParent();
this.relocatedDbLogBaseDir = new File(relocatedDbLogDir).toPath();
dbOptions.setDbLogDir(relocatedDbLogDir);
}
}
}
Expand All @@ -441,4 +454,44 @@ private File resolveFileLocation(String logFilePath) {
File logFile = new File(logFilePath);
return (logFile.exists() && logFile.canRead()) ? logFile : null;
}

/** Clean all relocated rocksdb logs. */
private void cleanRelocatedDbLogs() {
if (instanceRocksDBPath != null && relocatedDbLogBaseDir != null) {
LOG.info("Cleaning up relocated RocksDB logs: {}.", relocatedDbLogBaseDir);

String relocatedDbLogPrefix =
resolveRelocatedDbLogPrefix(instanceRocksDBPath.getAbsolutePath());
try {
Arrays.stream(FileUtils.listDirectory(relocatedDbLogBaseDir))
.filter(
path ->
!Files.isDirectory(path)
&& path.toFile()
.getName()
.startsWith(relocatedDbLogPrefix))
.forEach(IOUtils::deleteFileQuietly);
} catch (IOException e) {
LOG.warn(
"Could not list relocated RocksDB log directory: {}",
relocatedDbLogBaseDir);
}
}
}

/**
* Resolve the prefix of rocksdb's log file name according to rocksdb's log file name rules. See
* https://github.com/ververica/frocksdb/blob/FRocksDB-6.20.3/file/filename.cc#L30.
*
* @param instanceRocksDBAbsolutePath The path where the rocksdb directory is located.
* @return Resolved rocksdb log name prefix.
*/
private String resolveRelocatedDbLogPrefix(String instanceRocksDBAbsolutePath) {
if (!instanceRocksDBAbsolutePath.isEmpty()
&& !instanceRocksDBAbsolutePath.matches("^[a-zA-Z0-9\\-._].*")) {
instanceRocksDBAbsolutePath = instanceRocksDBAbsolutePath.substring(1);
}
return instanceRocksDBAbsolutePath.replaceAll("[^a-zA-Z0-9\\-._]", "_")
+ ROCKSDB_RELOCATE_LOG_SUFFIX;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.fs.FileSystem;
Expand All @@ -47,24 +48,29 @@
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
import org.apache.flink.testutils.junit.FailsInGHAContainerWithRootUser;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.IOUtils;

import org.apache.commons.lang3.RandomUtils;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.jupiter.api.Timeout;
import org.junit.rules.TemporaryFolder;
import org.rocksdb.BlockBasedTableConfig;
import org.rocksdb.BloomFilter;
import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.CompactionStyle;
import org.rocksdb.CompressionType;
import org.rocksdb.DBOptions;
import org.rocksdb.FlushOptions;
import org.rocksdb.InfoLogLevel;
import org.rocksdb.util.SizeUnit;

import java.io.File;
import java.nio.file.Files;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
Expand Down Expand Up @@ -367,6 +373,57 @@ public void testDbPathRelativePaths() throws Exception {
rocksDbBackend.setDbStoragePath("relative/path");
}

@Test
@Timeout(value = 60)
public void testCleanRelocatedDbLogs() throws Exception {
final File folder = tempFolder.newFolder();
final File relocatedDBLogDir = tempFolder.newFolder("db_logs");
final File logFile = new File(relocatedDBLogDir, "taskManager.log");
Files.createFile(logFile.toPath());
System.setProperty("log.file", logFile.getAbsolutePath());

Configuration conf = new Configuration();
conf.set(RocksDBConfigurableOptions.LOG_LEVEL, InfoLogLevel.DEBUG_LEVEL);
conf.set(RocksDBConfigurableOptions.LOG_FILE_NUM, 4);
conf.set(RocksDBConfigurableOptions.LOG_MAX_FILE_SIZE, MemorySize.parse("1kb"));
final EmbeddedRocksDBStateBackend rocksDbBackend =
new EmbeddedRocksDBStateBackend().configure(conf, getClass().getClassLoader());
final String dbStoragePath = new Path(folder.toURI().toString()).toString();
rocksDbBackend.setDbStoragePath(dbStoragePath);

final MockEnvironment env = getMockEnvironment(tempFolder.newFolder());
RocksDBKeyedStateBackend<Integer> keyedBackend =
createKeyedStateBackend(rocksDbBackend, env, IntSerializer.INSTANCE);

File instanceBasePath = keyedBackend.getInstanceBasePath();
File instanceRocksDBPath =
RocksDBKeyedStateBackendBuilder.getInstanceRocksDBPath(instanceBasePath);

// avoid tests without relocate.
Assume.assumeTrue(instanceRocksDBPath.getAbsolutePath().length() <= 255 - "_LOG".length());

java.nio.file.Path[] relocatedDbLogs;
try {
relocatedDbLogs = FileUtils.listDirectory(relocatedDBLogDir.toPath());
while (relocatedDbLogs.length <= 2) {
// If the default number of log files in rocksdb is not enough, add more logs.
try (FlushOptions flushOptions = new FlushOptions()) {
keyedBackend.db.put(RandomUtils.nextBytes(32), RandomUtils.nextBytes(512));
keyedBackend.db.flush(flushOptions);
}
relocatedDbLogs = FileUtils.listDirectory(relocatedDBLogDir.toPath());
}
} finally {
IOUtils.closeQuietly(keyedBackend);
keyedBackend.dispose();
env.close();
}

relocatedDbLogs = FileUtils.listDirectory(relocatedDBLogDir.toPath());
assertEquals(1, relocatedDbLogs.length);
assertEquals("taskManager.log", relocatedDbLogs[0].toFile().getName());
}

// ------------------------------------------------------------------------
// RocksDB local file automatic from temp directories
// ------------------------------------------------------------------------
Expand Down

0 comments on commit 398bb50

Please sign in to comment.