Skip to content

Commit

Permalink
Limit Zstd ByteBuffer allocations with BufferPool
Browse files Browse the repository at this point in the history
Implement a limiting BufferPool for Zstd's decompression sources.
This may mitigate OOMs which occur as a result of many concurrent
writes, but does nothing to capitalize on commonly short compressed
blobs.
Since there is no control over the buffered read size at the zstd-jni
level, lower-level wrappers must be written to take advantage of this,
and short-blob short-circuiting in CASFileCache may be necessary as
well.
  • Loading branch information
werkt committed Apr 20, 2024
1 parent b328db0 commit b55a080
Show file tree
Hide file tree
Showing 9 changed files with 110 additions and 12 deletions.
1 change: 1 addition & 0 deletions _site/docs/configuration/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,7 @@ backplane:
| realInputDirectories | List of Strings, _external_ | | A list of paths that will not be subject to the effects of linkInputDirectories setting, may also be used to provide writable directories as input roots for actions which expect to be able to write to an input location and will fail if they cannot |
| gracefulShutdownSeconds | Integer, 0 | | Time in seconds to allow for operations in flight to finish when shutdown signal is received |
| createSymlinkOutputs | boolean, _false_ | | Creates SymlinkNodes for symbolic links discovered in output paths for actions. No verification of the symlink target path occurs. Buildstream, for example, requires this. |
| zstdBufferPoolSize | Integer, _2048_ | | Specifies the maximum number of zstd data buffers that may be in use concurrently by the filesystem CAS. Increase to improve compressed blob throughput, decrease to reduce memory usage. |

```yaml
worker:
Expand Down
1 change: 1 addition & 0 deletions examples/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ worker:
selectForBlockNetwork: false
selectForTmpFs: false
createSymlinkOutputs: false
zstdBufferPoolSize: 2048
executionPolicies:
- name: test
executionWrapper:
Expand Down
9 changes: 7 additions & 2 deletions src/main/java/build/buildfarm/cas/cfc/CASFileCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import build.buildfarm.common.Write.CompleteWrite;
import build.buildfarm.common.ZstdCompressingInputStream;
import build.buildfarm.common.ZstdDecompressingOutputStream;
import build.buildfarm.common.ZstdDecompressingOutputStream.FixedBufferPool;
import build.buildfarm.common.grpc.Retrier;
import build.buildfarm.common.grpc.Retrier.Backoff;
import build.buildfarm.common.io.CountingOutputStream;
Expand Down Expand Up @@ -181,11 +182,11 @@ public abstract class CASFileCache implements ContentAddressableStorage {
private final Consumer<Iterable<Digest>> onExpire;
private final Executor accessRecorder;
private final ExecutorService expireService;
private Thread prometheusMetricsThread;

private final Map<Digest, DirectoryEntry> directoryStorage = Maps.newConcurrentMap();
private final DirectoriesIndex directoriesIndex;
private final String directoriesIndexDbName;
private final FixedBufferPool zstdBufferPool;
private final LockMap locks = new LockMap();
@Nullable private final ContentAddressableStorage delegate;
private final boolean delegateSkipLoad;
Expand Down Expand Up @@ -249,6 +250,8 @@ public SettableFuture<Long> load(Digest digest) {
@GuardedBy("this")
private int removedEntryCount = 0;

private Thread prometheusMetricsThread;

public synchronized long size() {
return sizeInBytes;
}
Expand Down Expand Up @@ -318,6 +321,7 @@ public CASFileCache(
Executor accessRecorder,
ConcurrentMap<String, Entry> storage,
String directoriesIndexDbName,
FixedBufferPool zstdBufferPool,
Consumer<Digest> onPut,
Consumer<Iterable<Digest>> onExpire,
@Nullable ContentAddressableStorage delegate,
Expand All @@ -335,6 +339,7 @@ public CASFileCache(
this.delegate = delegate;
this.delegateSkipLoad = delegateSkipLoad;
this.directoriesIndexDbName = directoriesIndexDbName;
this.zstdBufferPool = zstdBufferPool;

entryPathStrategy = new HexBucketEntryPathStrategy(root, hexBucketLevels);

Expand Down Expand Up @@ -2876,7 +2881,7 @@ private CancellableOutputStream putOrReferenceGuarded(
direct = true;
break;
case ZSTD:
out = new ZstdDecompressingOutputStream(countingOut);
out = new ZstdDecompressingOutputStream(countingOut, zstdBufferPool);
direct = false;
break;
default:
Expand Down
1 change: 1 addition & 0 deletions src/main/java/build/buildfarm/common/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ java_library(
"@maven//:io_grpc_grpc_protobuf",
"@maven//:io_prometheus_simpleclient",
"@maven//:org_apache_commons_commons_compress",
"@maven//:org_apache_commons_commons_pool2",
"@maven//:org_projectlombok_lombok",
"@maven//:org_threeten_threetenbp",
"@maven//:redis_clients_jedis",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,90 @@
// This file was copied from the bazel project.
package build.buildfarm.common;

import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Throwables.throwIfUnchecked;

import build.buildfarm.common.io.FeedbackOutputStream;
import com.github.luben.zstd.BufferPool;
import com.github.luben.zstd.ZstdInputStreamNoFinalizer;
import com.google.protobuf.ByteString;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import org.apache.commons.pool2.BasePooledObjectFactory;
import org.apache.commons.pool2.ObjectPool;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;

/** An {@link OutputStream} that use zstd to decompress the content. */
public final class ZstdDecompressingOutputStream extends FeedbackOutputStream {
private final OutputStream out;
private ByteArrayInputStream inner;
private final ZstdInputStreamNoFinalizer zis;

public ZstdDecompressingOutputStream(OutputStream out) throws IOException {
private static class ZstdDInBufferFactory extends BasePooledObjectFactory<ByteBuffer> {
static int getBufferSize() {
return (int) ZstdInputStreamNoFinalizer.recommendedDInSize();
}

@Override
public ByteBuffer create() {
return ByteBuffer.allocate(getBufferSize());
}

@Override
public PooledObject<ByteBuffer> wrap(ByteBuffer buffer) {
return new DefaultPooledObject<>(buffer);
}
}

public static final class FixedBufferPool extends GenericObjectPool<ByteBuffer> {
private static GenericObjectPoolConfig<ByteBuffer> createPoolConfig(int capacity) {
GenericObjectPoolConfig<ByteBuffer> poolConfig = new GenericObjectPoolConfig<>();
poolConfig.setMaxTotal(capacity);
return poolConfig;
}

public FixedBufferPool(int capacity) {
super(new ZstdDInBufferFactory(), createPoolConfig(capacity));
}
}

public static class ZstdFixedBufferPool implements BufferPool {
private final ObjectPool<ByteBuffer> pool;

public ZstdFixedBufferPool(ObjectPool<ByteBuffer> pool) {
this.pool = pool;
}

@Override
public ByteBuffer get(int bufferSize) {
// guaranteed through final
checkState(bufferSize > 0 && bufferSize <= ZstdDInBufferFactory.getBufferSize());
try {
return pool.borrowObject();
} catch (Exception e) {
throwIfUnchecked(e);
throw new RuntimeException(e);
}
}

@Override
public void release(ByteBuffer buffer) {
try {
pool.returnObject(buffer);
} catch (Exception e) {
throwIfUnchecked(e);
throw new RuntimeException(e);
}
}
}

public ZstdDecompressingOutputStream(OutputStream out, FixedBufferPool pool) throws IOException {
this.out = out;
zis =
new ZstdInputStreamNoFinalizer(
Expand All @@ -43,7 +112,8 @@ public int read() {
public int read(byte[] b, int off, int len) {
return inner.read(b, off, len);
}
})
},
new ZstdFixedBufferPool(pool))
.setContinuous(true);
}

Expand All @@ -66,15 +136,8 @@ public void write(byte[] b, int off, int len) throws IOException {

@Override
public void close() throws IOException {
closeShallow();
out.close();
}

/**
* Free resources related to decompression without closing the underlying {@link OutputStream}.
*/
public void closeShallow() throws IOException {
zis.close();
out.close();
}

@Override
Expand Down
1 change: 1 addition & 0 deletions src/main/java/build/buildfarm/common/config/Worker.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public class Worker {
private ExecutionPolicy[] executionPolicies = {};
private SandboxSettings sandboxSettings = new SandboxSettings();
private boolean createSymlinkOutputs = false;
private int zstdBufferPoolSize = 2048; /* * ZSTD_DStreamInSize (current is 128k) == 256MiB */

// These limited resources are only for the individual worker.
// An example would be hardware resources such as GPUs.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import build.buildfarm.cas.cfc.CASFileCache;
import build.buildfarm.common.DigestUtil;
import build.buildfarm.common.InputStreamFactory;
import build.buildfarm.common.ZstdDecompressingOutputStream.FixedBufferPool;
import com.google.common.collect.Maps;
import java.io.IOException;
import java.io.InputStream;
Expand All @@ -42,6 +43,7 @@ class ShardCASFileCache extends CASFileCache {
DigestUtil digestUtil,
ExecutorService expireService,
Executor accessRecorder,
FixedBufferPool zstdBufferPool,
Consumer<Digest> onPut,
Consumer<Iterable<Digest>> onExpire,
ContentAddressableStorage delegate,
Expand All @@ -58,6 +60,7 @@ class ShardCASFileCache extends CASFileCache {
accessRecorder,
/* storage= */ Maps.newConcurrentMap(),
DEFAULT_DIRECTORIES_INDEX_NAME,
zstdBufferPool,
onPut,
onExpire,
delegate,
Expand Down
20 changes: 20 additions & 0 deletions src/main/java/build/buildfarm/worker/shard/Worker.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import build.buildfarm.common.DigestUtil;
import build.buildfarm.common.InputStreamFactory;
import build.buildfarm.common.LoggingMain;
import build.buildfarm.common.ZstdDecompressingOutputStream.FixedBufferPool;
import build.buildfarm.common.config.BuildfarmConfigs;
import build.buildfarm.common.config.Cas;
import build.buildfarm.common.config.GrpcMetrics;
Expand Down Expand Up @@ -315,6 +316,7 @@ private ContentAddressableStorage createStorages(
InputStreamFactory remoteInputStreamFactory,
ExecutorService removeDirectoryService,
Executor accessRecorder,
FixedBufferPool zstdBufferPool,
List<Cas> storages)
throws ConfigurationException {
ContentAddressableStorage storage = null;
Expand All @@ -326,6 +328,7 @@ private ContentAddressableStorage createStorages(
remoteInputStreamFactory,
removeDirectoryService,
accessRecorder,
zstdBufferPool,
cas,
delegate,
delegateSkipLoad);
Expand All @@ -339,6 +342,7 @@ private ContentAddressableStorage createStorage(
InputStreamFactory remoteInputStreamFactory,
ExecutorService removeDirectoryService,
Executor accessRecorder,
FixedBufferPool zstdBufferPool,
Cas cas,
ContentAddressableStorage delegate,
boolean delegateSkipLoad)
Expand All @@ -365,6 +369,7 @@ private ContentAddressableStorage createStorage(
digestUtil,
removeDirectoryService,
accessRecorder,
zstdBufferPool,
this::onStoragePut,
delegate == null ? this::onStorageExpire : (digests) -> {},
delegate,
Expand Down Expand Up @@ -559,6 +564,20 @@ public void start() throws ConfigurationException, InterruptedException, IOExcep

ExecutorService removeDirectoryService = BuildfarmExecutors.getRemoveDirectoryPool();
ExecutorService accessRecorder = newSingleThreadExecutor();
FixedBufferPool zstdBufferPool =
new FixedBufferPool(configs.getWorker().getZstdBufferPoolSize());
Gauge.build()
.name("zstd_buffer_pool_used")
.help("Current number of Zstd decompression buffers active")
.create()
.setChild(
new Gauge.Child() {
@Override
public double get() {
return zstdBufferPool.getNumActive();
}
})
.register();

InputStreamFactory remoteInputStreamFactory =
new RemoteInputStreamFactory(
Expand All @@ -572,6 +591,7 @@ public void start() throws ConfigurationException, InterruptedException, IOExcep
remoteInputStreamFactory,
removeDirectoryService,
accessRecorder,
zstdBufferPool,
configs.getWorker().getStorages());
execFileSystem =
createExecFileSystem(
Expand Down
3 changes: 3 additions & 0 deletions src/test/java/build/buildfarm/cas/cfc/CASFileCacheTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ public void setUp() throws IOException, InterruptedException {
/* accessRecorder= */ directExecutor(),
storage,
/* directoriesIndexDbName= */ ":memory:",
/* zstdBufferPool= */ null,
onPut,
onExpire,
delegate,
Expand Down Expand Up @@ -1122,6 +1123,7 @@ public void copyExternalInputRetries() throws Exception {
/* accessRecorder= */ directExecutor(),
storage,
/* directoriesIndexDbName= */ ":memory:",
/* zstdBufferPool= */ null,
/* onPut= */ digest -> {},
/* onExpire= */ digests -> {},
/* delegate= */ null,
Expand Down Expand Up @@ -1185,6 +1187,7 @@ public void newInputThrowsNoSuchFileExceptionWithoutDelegate() throws Exception
/* accessRecorder= */ directExecutor(),
storage,
/* directoriesIndexDbName= */ ":memory:",
/* zstdBufferPool= */ null,
/* onPut= */ digest -> {},
/* onExpire= */ digests -> {},
/* delegate= */ null,
Expand Down

0 comments on commit b55a080

Please sign in to comment.