From 237894a796a55fbf99317e777e3e5cc8dcf5635b Mon Sep 17 00:00:00 2001 From: Paolo Di Tommaso Date: Fri, 13 Dec 2024 14:40:04 +0100 Subject: [PATCH] Add rate limiter to wave requests Signed-off-by: Paolo Di Tommaso --- .../groovy/nextflow/cli/CmdInspect.groovy | 14 ++++++ .../groovy/nextflow/cli/CmdInspectTest.groovy | 45 +++++++++++++++++++ .../io/seqera/wave/plugin/WaveClient.groovy | 17 +++++++ .../seqera/wave/plugin/config/HttpOpts.groovy | 8 ++++ .../wave/plugin/config/HttpOptsTest.groovy | 5 ++- 5 files changed, 88 insertions(+), 1 deletion(-) create mode 100644 modules/nextflow/src/test/groovy/nextflow/cli/CmdInspectTest.groovy diff --git a/modules/nextflow/src/main/groovy/nextflow/cli/CmdInspect.groovy b/modules/nextflow/src/main/groovy/nextflow/cli/CmdInspect.groovy index 3a61e69cb9..2ea71b2f26 100644 --- a/modules/nextflow/src/main/groovy/nextflow/cli/CmdInspect.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/cli/CmdInspect.groovy @@ -20,6 +20,7 @@ package nextflow.cli import com.beust.jcommander.DynamicParameter import com.beust.jcommander.Parameter import com.beust.jcommander.Parameters +import groovy.transform.CompileDynamic import groovy.transform.CompileStatic import groovy.util.logging.Slf4j import nextflow.Session @@ -91,6 +92,10 @@ class CmdInspect extends CmdBase { } protected void applyInspect(Session session) { + // slow down max rate when concretize is specified + if( concretize ) { + configureMaxRate(session.config) + } // run the inspector new ContainersInspector(concretize) .withFormat(format) @@ -98,4 +103,13 @@ class CmdInspect extends CmdBase { .printContainers() } + @CompileDynamic + protected void configureMaxRate(Map config) { + if( config.wave == null ) + config.wave = new HashMap() + if( config.wave.httpClient == null ) + config.wave.httpClient = new HashMap() + config.wave.httpClient.maxRate = '5/30sec' + } + } diff --git a/modules/nextflow/src/test/groovy/nextflow/cli/CmdInspectTest.groovy b/modules/nextflow/src/test/groovy/nextflow/cli/CmdInspectTest.groovy new file mode 100644 index 0000000000..688cc0ee8e --- /dev/null +++ b/modules/nextflow/src/test/groovy/nextflow/cli/CmdInspectTest.groovy @@ -0,0 +1,45 @@ +/* + * Copyright 2013-2024, Seqera Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package nextflow.cli + +import spock.lang.Specification + +/** + * + * @author Paolo Di Tommaso + */ +class CmdInspectTest extends Specification { + + def 'should configure max rate' () { + given: + def cmd = new CmdInspect() + + when: + def cfg1 = [:] + cmd.configureMaxRate(cfg1) + then: + cfg1 == [wave:[httpClient:[maxRate:'5/30sec']]] + + when: + def cfg2 = [wave:[enabled:true, httpClient: [something:true, maxRate: '1/s']]] + cmd.configureMaxRate(cfg2) + then: + cfg2 == [wave:[enabled:true, httpClient: [something:true, maxRate: '5/30sec']]] + } + +} diff --git a/plugins/nf-wave/src/main/io/seqera/wave/plugin/WaveClient.groovy b/plugins/nf-wave/src/main/io/seqera/wave/plugin/WaveClient.groovy index c3640ce42d..80e43b64e8 100644 --- a/plugins/nf-wave/src/main/io/seqera/wave/plugin/WaveClient.groovy +++ b/plugins/nf-wave/src/main/io/seqera/wave/plugin/WaveClient.groovy @@ -34,6 +34,7 @@ import java.util.function.Predicate import com.google.common.cache.Cache import com.google.common.cache.CacheBuilder +import com.google.common.util.concurrent.RateLimiter import com.google.common.util.concurrent.UncheckedExecutionException import com.google.gson.Gson import com.google.gson.reflect.TypeToken @@ -125,6 +126,8 @@ class WaveClient { final private URL s5cmdConfigUrl + final private RateLimiter limiter + WaveClient(Session session) { this.session = session this.config = new WaveConfig(session.config.wave as Map ?: Collections.emptyMap(), SysEnv.get()) @@ -137,6 +140,7 @@ class WaveClient { log.debug "Wave config: $config" this.packer = new Packer().withPreserveTimestamp(config.preserveFileTimestamp()) this.waveRegistry = new URI(endpoint).getAuthority() + this.limiter = RateLimiter.create( config.httpOpts().maxRate().rate ) // create cache this.cache = CacheBuilder .newBuilder() @@ -257,7 +261,20 @@ class WaveClient { return sendRequest(request) } + private void checkLimiter() { + final ts = System.currentTimeMillis() + try { + limiter.acquire() + } finally { + final delta = System.currentTimeMillis()-ts + if( delta>0 ) + log.debug "Request limiter blocked ${Duration.ofMillis(delta)}" + } + } + + SubmitContainerTokenResponse sendRequest(SubmitContainerTokenRequest request) { + checkLimiter() return sendRequest0(request, 1) } diff --git a/plugins/nf-wave/src/main/io/seqera/wave/plugin/config/HttpOpts.groovy b/plugins/nf-wave/src/main/io/seqera/wave/plugin/config/HttpOpts.groovy index 6113a41272..819e2de9c4 100644 --- a/plugins/nf-wave/src/main/io/seqera/wave/plugin/config/HttpOpts.groovy +++ b/plugins/nf-wave/src/main/io/seqera/wave/plugin/config/HttpOpts.groovy @@ -20,6 +20,8 @@ package io.seqera.wave.plugin.config import groovy.transform.CompileStatic import groovy.transform.ToString import nextflow.util.Duration +import nextflow.util.RateUnit + /** * Model the HTTP client settings to connect the Wave service * @@ -31,12 +33,18 @@ class HttpOpts { final private Duration connectTimeout + final private RateUnit maxRate + HttpOpts(Map opts) { connectTimeout = opts.connectTimeout as Duration ?: Duration.of('30s') + maxRate = opts.maxRate as RateUnit ?: RateUnit.of('20 /min') } java.time.Duration connectTimeout() { return java.time.Duration.ofMillis(connectTimeout.toMillis()) } + RateUnit maxRate() { + return maxRate + } } diff --git a/plugins/nf-wave/src/test/io/seqera/wave/plugin/config/HttpOptsTest.groovy b/plugins/nf-wave/src/test/io/seqera/wave/plugin/config/HttpOptsTest.groovy index 0f6dc27ba6..36a6dbbdad 100644 --- a/plugins/nf-wave/src/test/io/seqera/wave/plugin/config/HttpOptsTest.groovy +++ b/plugins/nf-wave/src/test/io/seqera/wave/plugin/config/HttpOptsTest.groovy @@ -19,6 +19,7 @@ package io.seqera.wave.plugin.config import java.time.Duration +import nextflow.util.RateUnit import spock.lang.Specification /** * @@ -31,11 +32,13 @@ class HttpOptsTest extends Specification { def opts = new HttpOpts([:]) then: opts.connectTimeout() == Duration.ofSeconds(30) + opts.maxRate() == RateUnit.of('20 /min') when: - opts = new HttpOpts([connectTimeout:'50s']) + opts = new HttpOpts([connectTimeout:'50s', maxRate: '10/s']) then: opts.connectTimeout() == Duration.ofSeconds(50) + opts.maxRate() == RateUnit.of('10/s') } }