Skip to content

Commit

Permalink
feat: Add FusionClient to send HTTP requests to Platform
Browse files Browse the repository at this point in the history
Signed-off-by: Alberto Miranda <[email protected]>
  • Loading branch information
alberto-miranda committed Dec 16, 2024
1 parent 40670f7 commit bee1e99
Show file tree
Hide file tree
Showing 11 changed files with 665 additions and 3 deletions.
2 changes: 2 additions & 0 deletions modules/nextflow/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ dependencies {
api 'org.bouncycastle:bcpkix-jdk18on:1.78.1'

testImplementation 'org.subethamail:subethasmtp:3.1.7'
// wiremock needed by FusionClient tests
testImplementation "org.wiremock:wiremock:3.5.4"

// test configuration
testFixturesApi ("org.apache.groovy:groovy-test:4.0.24") { exclude group: 'org.apache.groovy' }
Expand Down
204 changes: 204 additions & 0 deletions modules/nextflow/src/main/groovy/nextflow/fusion/FusionClient.groovy
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
package nextflow.fusion

import com.google.gson.Gson
import dev.failsafe.Failsafe
import dev.failsafe.RetryPolicy
import dev.failsafe.event.EventListener
import dev.failsafe.event.ExecutionAttemptedEvent
import dev.failsafe.function.CheckedSupplier
import groovy.json.JsonOutput
import groovy.transform.CompileStatic
import nextflow.Session
import nextflow.SysEnv
import nextflow.exception.AbortOperationException
import nextflow.fusion.exception.BadResponseException
import nextflow.fusion.exception.UnauthorizedException
import nextflow.fusion.exchange.LicenseTokenRequest
import nextflow.fusion.exchange.LicenseTokenResponse
import nextflow.util.Threads
import org.slf4j.Logger
import org.slf4j.LoggerFactory

import java.net.http.HttpClient
import java.net.http.HttpRequest
import java.net.http.HttpResponse
import java.time.Duration
import java.time.temporal.ChronoUnit
import java.util.concurrent.Executors
import java.util.function.Predicate

/**
* Client service for Fusion-related operations
*
* @author Alberto Miranda <[email protected]>
*/
@CompileStatic
class FusionClient {

// The endpoint where license-scoped JWT tokens are obtained
private static final String LICENSE_TOKEN_ENDPOINT = 'license/token/'

// Server errors that should trigger a retry
private static final List<Integer> SERVER_ERRORS = [429, 500, 502, 503, 504]

// Default connection timeout for HTTP requests
private static final Duration DEFAULT_CONNECTION_TIMEOUT = Duration.of(30, ChronoUnit.SECONDS)

// Default retry policy settings for HTTP requests: delay, max delay, attempts, and jitter
private static final Duration DEFAULT_RETRY_POLICY_DELAY = Duration.of(450, ChronoUnit.MILLIS)
private static final Duration DEFAULT_RETRY_POLICY_MAX_DELAY = Duration.of(90, ChronoUnit.SECONDS)
private static final int DEFAULT_RETRY_POLICY_MAX_ATTEMPTS = 10
private static final double DEFAULT_RETRY_POLICY_JITTER = 0.5

// The HttpClient instance used to send requests
private final HttpClient httpClient = newDefaultHttpClient()

// The RetryPolicy instance used to retry requests
private final RetryPolicy retryPolicy = newDefaultRetryPolicy(SERVER_ERRORS)

// Nextflow session
private final Session session

// Configuration for Platform
private final PlatformConfig platform

// Environment variables where the client will look for configuration
private Map<String, String> env

// Logger instance for this component
private static Logger log = LoggerFactory.getLogger(FusionClient)

/**
* Create a new FusionClient instance for the given session
*
* @param session The current Nextflow session
*/
FusionClient(Session session) {
this.session = session
this.env = env
this.platform = new PlatformConfig(session.config.tower as Map ?: Collections.EMPTY_MAP, SysEnv.get())
}

/**
* Send a request to Platform to obtain a license-scoped JWT for Fusion. The request is authenticated using the
* Platform access token provided in the configuration of the current session.
*
* @throws AbortOperationException if a Platform access token cannot be found
*
* @return The signed JWT token
*/
String getLicenseToken() {
// FIXME(amiranda): Find out how to obtain the product and version
// Candidate: FusionConfig?

if (platform.accessToken == null) {
throw new AbortOperationException("Missing personal access token -- Make sure there's a variable TOWER_ACCESS_TOKEN in your environment")
}

final req = HttpRequest.newBuilder()
.uri(URI.create("${platform.endpoint}/${LICENSE_TOKEN_ENDPOINT}").normalize())
.header('Content-Type', 'application/json')
.header('Authorization', "Bearer ${platform.accessToken}")
.POST(
HttpRequest.BodyPublishers.ofString(
JsonOutput.toJson(
new LicenseTokenRequest(
product: 'fusion', // TODO: hardcoded for now
version: '2.4', // TODO: hardcoded for now
)
)
)
)
.build()

try {
final resp = safeHttpSend(req, retryPolicy)

if (resp.statusCode() == 200) {
return new Gson().fromJson(resp.body(), LicenseTokenResponse.class).signedToken
}

if (resp.statusCode() == 401) {
throw new UnauthorizedException("Unauthorized [401] - Verify you have provided a valid access token")
}

throw new BadResponseException("Invalid response: ${req.method()} ${req.uri()} [${resp.statusCode()}] ${resp.body()}")

} catch (IOException e) {
throw new IllegalStateException("Unable to send request to '${req.uri()}' : ${e.message}")
}
}

/**************************************************************************
* Private methods
*************************************************************************/

/**
* Create a new HttpClient instance with default settings
* @return The new HttpClient instance
*/
private static HttpClient newDefaultHttpClient() {
final builder = HttpClient.newBuilder()
.version(HttpClient.Version.HTTP_1_1)
.followRedirects(HttpClient.Redirect.NEVER)
.cookieHandler(new CookieManager())
.connectTimeout(DEFAULT_CONNECTION_TIMEOUT)
// use virtual threads executor if enabled
if (Threads.useVirtual()) {
builder.executor(Executors.newVirtualThreadPerTaskExecutor())
}
// build and return the new client
return builder.build()
}

/**
* Create a new RetryPolicy instance with default settings and the given list of retryable errors. With this policy,
* a request is retried on IOExceptions and any server errors defined in errorsToRetry. The number of retries, delay,
* max delay, and jitter are controlled by the corresponding values defined at class level.
*
* @return The new RetryPolicy instance
*/
private static <T> RetryPolicy<HttpResponse<T>> newDefaultRetryPolicy(List<Integer> errorsToRetry) {

final retryOnException = (e -> e instanceof IOException) as Predicate<? extends Throwable>
final retryOnStatusCode = ((HttpResponse<T> resp) -> resp.statusCode() in errorsToRetry) as Predicate<HttpResponse<T>>

final listener = new EventListener<ExecutionAttemptedEvent<HttpResponse<T>>>() {
@Override
void accept(ExecutionAttemptedEvent event) throws Throwable {
def msg = "connection failure - attempt: ${event.attemptCount}"
if (event.lastResult != null)
msg += "; response: ${event.lastResult}"
if (event.lastFailure != null)
msg += "; exception: [${event.lastFailure.class.name}] ${event.lastFailure.message}"
log.debug(msg)
}
}
return RetryPolicy.<HttpResponse<T>> builder()
.handleIf(retryOnException)
.handleResultIf(retryOnStatusCode)
.withBackoff(DEFAULT_RETRY_POLICY_DELAY.toMillis(), DEFAULT_RETRY_POLICY_MAX_DELAY.toMillis(), ChronoUnit.MILLIS)
.withMaxAttempts(DEFAULT_RETRY_POLICY_MAX_ATTEMPTS)
.withJitter(DEFAULT_RETRY_POLICY_JITTER)
.onRetry(listener)
.build()
}

/**
* Send an HTTP request and return the response. This method automatically retries the request according to the
* given RetryPolicy.
*
* @param req The HttpRequest to send
* @return The HttpResponse received
*/
private <T> HttpResponse<String> safeHttpSend(HttpRequest req, RetryPolicy<T> policy) {
return Failsafe.with(policy).get(
() -> {
log.debug "Request: method:=${req.method()}; uri:=${req.uri()}; request:=${req}"
final resp = httpClient.send(req, HttpResponse.BodyHandlers.ofString())
log.debug "Response: statusCode:=${resp.statusCode()}; body:=${resp.body()}"
return resp
} as CheckedSupplier
) as HttpResponse<String>
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,24 @@
package nextflow.fusion


import nextflow.Session
import nextflow.plugin.Plugins

/**
* Provider strategy for {@link FusionEnv}
*
* @author Paolo Di Tommaso <[email protected]>
*/
class FusionEnvProvider {

Map<String,String> getEnvironment(String scheme) {
// The client used to interact with Platform for Fusion-related operations
final private FusionClient client

FusionEnvProvider(Session session) {
this.client = new FusionClient(session)
}

Map<String, String> getEnvironment(String scheme) {
final config = FusionConfig.getConfig()
final list = Plugins.getExtensions(FusionEnv)
final result = new HashMap<String,String>()
Expand All @@ -44,6 +53,8 @@ class FusionEnvProvider {
result.FUSION_LOG_LEVEL = config.logLevel()
if( config.cacheSize() )
result.FUSION_CACHE_SIZE = "${config.cacheSize().toMega()}M"
// license settings
result.FUSION_LICENSE_TOKEN = client.getLicenseToken()
return result
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

package nextflow.fusion

import nextflow.Global
import nextflow.Session

import static nextflow.fusion.FusionConfig.FUSION_PATH
import static nextflow.fusion.FusionHelper.*

Expand All @@ -28,9 +31,10 @@ import nextflow.executor.BashWrapperBuilder
import nextflow.processor.TaskBean
import nextflow.processor.TaskRun
import nextflow.util.Escape

/**
* Command script launcher implementing the support for Fusion files
*
*
* @author Paolo Di Tommaso <[email protected]>
*/
@Slf4j
Expand Down Expand Up @@ -88,7 +92,7 @@ class FusionScriptLauncher extends BashWrapperBuilder {
final result = new LinkedHashMap(10)
result.FUSION_WORK = work
// foreign env
final provider = new FusionEnvProvider()
final provider = new FusionEnvProvider(Global.session as Session)
result.putAll(provider.getEnvironment(scheme))
env = result
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package nextflow.fusion

import groovy.transform.CompileStatic

/**
* Encapsulates Platform configuration required by Fusion.
*
* The class basically adapts Wave's {@link io.seqera.wave.plugin.config.TowerConfig} class to fit our needs.
* N.B: We cannot reuse the original class directly because it belongs to a plugin which may not have been loaded.
*
* @author Alberto Miranda <[email protected]>
*/
@CompileStatic
class PlatformConfig {

// The default Platform API endpoint
static final String DEFAULT_API_ENDPOINT = 'https://api.cloud.seqera.io'

// The Platform API endpoint
final String endpoint

// The Platform access token
final String accessToken

PlatformConfig(Map opts, Map<String,String> env) {
this.endpoint = endpoint0(opts, env)
this.accessToken = accessToken0(opts, env)
}

/**
* Get the configured Platform access token: if `TOWER_WORKFLOW_ID` is provided in the environment, we are running
* in a Platform-made run and we should ONLY retrieve the token from the environment. Otherwise, check
* the configuration file or fallback to the environment. If no token is found, returns null.
*
* @param opts the configuration options for Platform
* @param env the applicable environment variables
* @return the Platform access token
*/
private static String accessToken0(Map opts, Map<String, String> env) {
def token = env.get('TOWER_WORKFLOW_ID')
? env.get('TOWER_ACCESS_TOKEN')
: opts.containsKey('accessToken') ? opts.accessToken as String : env.get('TOWER_ACCESS_TOKEN')
return token
}

/**
* Get the configured Platform API endpoint: if the endpoint is not provided in the configuration, we fallback to the
* environment variable `TOWER_API_ENDPOINT`. If neither is provided, we fallback to the default endpoint.
*
* @param opts the configuration options for Platform
* @param env the applicable environment variables
* @return the Platform API endpoint
*/
private static String endpoint0(Map opts, Map<String, String> env) {
def result = opts.endpoint as String
if (!result || result == '-') {
result = env.get('TOWER_API_ENDPOINT') ?: DEFAULT_API_ENDPOINT
}
return result.stripEnd('/')
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package nextflow.fusion.exception

import groovy.transform.InheritConstructors

@InheritConstructors
class BadResponseException extends RuntimeException{
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package nextflow.fusion.exception

import groovy.transform.InheritConstructors

@InheritConstructors
class UnauthorizedException extends RuntimeException {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package nextflow.fusion.exchange

import groovy.transform.CompileStatic
import groovy.transform.EqualsAndHashCode
import groovy.transform.ToString

/**
* Models a REST request to obtain a license-scoped JWT token from Platform
*/
@EqualsAndHashCode
@ToString(includeNames = true, includePackage = false)
@CompileStatic
class LicenseTokenRequest {

/** The product code */
String product

/** The product version */
String version
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package nextflow.fusion.exchange

import groovy.transform.CompileStatic
import groovy.transform.ToString

@CompileStatic
@ToString(includeNames = true, includePackage = false)
class LicenseTokenResponse {
/**
* The signed JWT token
*/
String signedToken

/**
* The expiration date of the token
*/
Date expirationDate
}
Loading

0 comments on commit bee1e99

Please sign in to comment.