Skip to content

Commit

Permalink
feat(nf-tower): Add TowerFusionEnv provider to set required env vars
Browse files Browse the repository at this point in the history
Signed-off-by: Alberto Miranda <[email protected]>
  • Loading branch information
alberto-miranda committed Dec 18, 2024
1 parent 40670f7 commit e7b1082
Show file tree
Hide file tree
Showing 8 changed files with 748 additions and 0 deletions.
2 changes: 2 additions & 0 deletions plugins/nf-tower/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,6 @@ dependencies {
testImplementation(testFixtures(project(":nextflow")))
testImplementation "org.apache.groovy:groovy:4.0.24"
testImplementation "org.apache.groovy:groovy-nio:4.0.24"
// wiremock required by TowerFusionEnvTest
testImplementation "org.wiremock:wiremock:3.5.4"
}
266 changes: 266 additions & 0 deletions plugins/nf-tower/src/main/io/seqera/tower/plugin/TowerFusionEnv.groovy
Original file line number Diff line number Diff line change
@@ -0,0 +1,266 @@
package io.seqera.tower.plugin

import com.google.gson.Gson
import dev.failsafe.Failsafe
import dev.failsafe.RetryPolicy
import dev.failsafe.event.EventListener
import dev.failsafe.event.ExecutionAttemptedEvent
import dev.failsafe.function.CheckedSupplier
import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import io.seqera.tower.plugin.exception.BadResponseException
import io.seqera.tower.plugin.exception.UnauthorizedException
import io.seqera.tower.plugin.exchange.LicenseTokenRequest
import io.seqera.tower.plugin.exchange.LicenseTokenResponse
import nextflow.Global
import nextflow.Session
import nextflow.SysEnv
import nextflow.exception.AbortOperationException
import nextflow.fusion.FusionConfig
import nextflow.fusion.FusionEnv
import nextflow.util.Threads
import org.pf4j.Extension

import java.net.http.HttpClient
import java.net.http.HttpRequest
import java.net.http.HttpResponse
import java.time.Duration
import java.time.temporal.ChronoUnit
import java.util.concurrent.Executors
import java.util.function.Predicate

/**
* Environment provider for Platform-specific environment variables.
*
* @author Alberto Miranda <[email protected]>
*/
@Slf4j
@Extension
@CompileStatic
class TowerFusionEnv implements FusionEnv {

// The endpoint where license-scoped JWT tokens are obtained
private static final String LICENSE_TOKEN_ENDPOINT = 'license/token/'

// Server errors that should trigger a retry
private static final List<Integer> SERVER_ERRORS = [429, 500, 502, 503, 504]

// Default connection timeout for HTTP requests
private static final Duration DEFAULT_CONNECTION_TIMEOUT = Duration.of(30, ChronoUnit.SECONDS)

// Default retry policy settings for HTTP requests: delay, max delay, attempts, and jitter
private static final Duration DEFAULT_RETRY_POLICY_DELAY = Duration.of(450, ChronoUnit.MILLIS)
private static final Duration DEFAULT_RETRY_POLICY_MAX_DELAY = Duration.of(90, ChronoUnit.SECONDS)
private static final int DEFAULT_RETRY_POLICY_MAX_ATTEMPTS = 10
private static final double DEFAULT_RETRY_POLICY_JITTER = 0.5

// The HttpClient instance used to send requests
private final HttpClient httpClient = newDefaultHttpClient()

// The RetryPolicy instance used to retry requests
private final RetryPolicy retryPolicy = newDefaultRetryPolicy(SERVER_ERRORS)

// Nextflow session
private final Session session

// Platform endpoint to use for requests
private final String endpoint

// Platform access token to use for requests
private final String accessToken

/**
* Constructor for the class. It initializes the session, endpoint, and access token.
*/
TowerFusionEnv() {
this.session = Global.session as Session
final towerConfig = session.config.navigate('tower') as Map ?: [:]
final env = SysEnv.get()
this.endpoint = endpoint0(towerConfig, env)
this.accessToken = accessToken0(towerConfig, env)
}

/**
* Return any environment variables relevant to Fusion execution. This method is called
* by {@link nextflow.fusion.FusionEnvProvider#getEnvironment} to determine which
* environment variables are needed for the current run.
*
* @param scheme The scheme for which the environment variables are needed (currently unused)
* @param config The Fusion configuration object
* @return A map of environment variables
*/
@Override
Map<String, String> getEnvironment(String scheme, FusionConfig config) {

// TODO(amiranda): Hardcoded for now. We need to find out how to obtain
// the concrete product SKU and version. Candidate: FusionConfig?
final product = 'fusion'
final version = '2.4'

try {
final token = getLicenseToken(product, version)
return [
'FUSION_LICENSE_TOKEN': token,
]
} catch (Exception e) {
log.warn("Error retrieving Fusion license information: ${e.message}")
return [:]
}
}

/**
* Send a request to Platform to obtain a license-scoped JWT for Fusion. The request is authenticated using the
* Platform access token provided in the configuration of the current session.
*
* @throws AbortOperationException if a Platform access token cannot be found
*
* @return The signed JWT token
*/
protected String getLicenseToken(product, version) {
// FIXME(amiranda): Find out how to obtain the product and version
// Candidate: FusionConfig?

if (accessToken == null) {
throw new AbortOperationException("Missing personal access token -- Make sure there's a variable TOWER_ACCESS_TOKEN in your environment")
}

final req = HttpRequest.newBuilder()
.uri(URI.create("${endpoint}/${LICENSE_TOKEN_ENDPOINT}").normalize())
.header('Content-Type', 'application/json')
.header('Authorization', "Bearer ${accessToken}")
.POST(
HttpRequest.BodyPublishers.ofString(
new Gson().toJson(
new LicenseTokenRequest(
product: product,
version: version
),
LicenseTokenRequest.class
),
)
)
.build()

try {
final resp = safeHttpSend(req, retryPolicy)

if (resp.statusCode() == 200) {
return new Gson().fromJson(resp.body(), LicenseTokenResponse.class).signedToken
}

if (resp.statusCode() == 401) {
throw new UnauthorizedException("Unauthorized [401] - Verify you have provided a valid access token")
}

throw new BadResponseException("Invalid response: ${req.method()} ${req.uri()} [${resp.statusCode()}] ${resp.body()}")

} catch (IOException e) {
throw new IllegalStateException("Unable to send request to '${req.uri()}' : ${e.message}")
}
}

/**************************************************************************
* Helper methods
*************************************************************************/

/**
* Get the configured Platform API endpoint: if the endpoint is not provided in the configuration, we fallback to the
* environment variable `TOWER_API_ENDPOINT`. If neither is provided, we fallback to the default endpoint.
*
* @param opts the configuration options for Platform
* @param env the applicable environment variables
* @return the Platform API endpoint
*/
protected static String endpoint0(Map opts, Map<String, String> env) {
def result = opts.endpoint as String
if (!result || result == '-') {
result = env.get('TOWER_API_ENDPOINT') ?: TowerClient.DEF_ENDPOINT_URL
}
return result.stripEnd('/')
}

/**
* Get the configured Platform access token: if `TOWER_WORKFLOW_ID` is provided in the environment, we are running
* in a Platform-made run and we should ONLY retrieve the token from the environment. Otherwise, check
* the configuration file or fallback to the environment. If no token is found, returns null.
*
* @param opts the configuration options for Platform
* @param env the applicable environment variables
* @return the Platform access token
*/
protected static String accessToken0(Map opts, Map<String, String> env) {
def token = env.get('TOWER_WORKFLOW_ID')
? env.get('TOWER_ACCESS_TOKEN')
: opts.containsKey('accessToken') ? opts.accessToken as String : env.get('TOWER_ACCESS_TOKEN')
return token
}

/**
* Create a new HttpClient instance with default settings
* @return The new HttpClient instance
*/
private static HttpClient newDefaultHttpClient() {
final builder = HttpClient.newBuilder()
.version(HttpClient.Version.HTTP_1_1)
.followRedirects(HttpClient.Redirect.NEVER)
.cookieHandler(new CookieManager())
.connectTimeout(DEFAULT_CONNECTION_TIMEOUT)
// use virtual threads executor if enabled
if ( Threads.useVirtual() ) {
builder.executor(Executors.newVirtualThreadPerTaskExecutor())
}
// build and return the new client
return builder.build()
}

/**
* Create a new RetryPolicy instance with default settings and the given list of retryable errors. With this policy,
* a request is retried on IOExceptions and any server errors defined in errorsToRetry. The number of retries, delay,
* max delay, and jitter are controlled by the corresponding values defined at class level.
*
* @return The new RetryPolicy instance
*/
private static <T> RetryPolicy<HttpResponse<T>> newDefaultRetryPolicy(List<Integer> errorsToRetry) {

final retryOnException = (e -> e instanceof IOException) as Predicate<? extends Throwable>
final retryOnStatusCode = ((HttpResponse<T> resp) -> resp.statusCode() in errorsToRetry) as Predicate<HttpResponse<T>>

final listener = new EventListener<ExecutionAttemptedEvent<HttpResponse<T>>>() {
@Override
void accept(ExecutionAttemptedEvent event) throws Throwable {
def msg = "connection failure - attempt: ${event.attemptCount}"
if (event.lastResult != null)
msg += "; response: ${event.lastResult}"
if (event.lastFailure != null)
msg += "; exception: [${event.lastFailure.class.name}] ${event.lastFailure.message}"
log.debug(msg)
}
}
return RetryPolicy.<HttpResponse<T>> builder()
.handleIf(retryOnException)
.handleResultIf(retryOnStatusCode)
.withBackoff(DEFAULT_RETRY_POLICY_DELAY.toMillis(), DEFAULT_RETRY_POLICY_MAX_DELAY.toMillis(), ChronoUnit.MILLIS)
.withMaxAttempts(DEFAULT_RETRY_POLICY_MAX_ATTEMPTS)
.withJitter(DEFAULT_RETRY_POLICY_JITTER)
.onRetry(listener)
.build()
}

/**
* Send an HTTP request and return the response. This method automatically retries the request according to the
* given RetryPolicy.
*
* @param req The HttpRequest to send
* @return The HttpResponse received
*/
private <T> HttpResponse<String> safeHttpSend(HttpRequest req, RetryPolicy<T> policy) {
return Failsafe.with(policy).get(
() -> {
log.debug "Request: method:=${req.method()}; uri:=${req.uri()}; request:=${req}"
final resp = httpClient.send(req, HttpResponse.BodyHandlers.ofString())
log.debug "Response: statusCode:=${resp.statusCode()}; body:=${resp.body()}"
return resp
} as CheckedSupplier
) as HttpResponse<String>
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package io.seqera.tower.plugin.exception

import groovy.transform.InheritConstructors

@InheritConstructors
class BadResponseException extends RuntimeException{
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package io.seqera.tower.plugin.exception

import groovy.transform.InheritConstructors

@InheritConstructors
class UnauthorizedException extends RuntimeException {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package io.seqera.tower.plugin.exchange

import groovy.transform.CompileStatic
import groovy.transform.EqualsAndHashCode
import groovy.transform.ToString

/**
* Models a REST request to obtain a license-scoped JWT token from Platform
*
* @author Alberto Miranda <[email protected]>
*/
@EqualsAndHashCode
@ToString(includeNames = true, includePackage = false)
@CompileStatic
class LicenseTokenRequest {

/** The product code */
String product

/** The product version */
String version
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package io.seqera.tower.plugin.exchange

import groovy.transform.CompileStatic
import groovy.transform.ToString

/**
* Models a REST response containing a license-scoped JWT token from Platform
*
* @author Alberto Miranda <[email protected]>
*/
@CompileStatic
@ToString(includeNames = true, includePackage = false)
class LicenseTokenResponse {
/**
* The signed JWT token
*/
String signedToken

/**
* The expiration date of the token
*/
Date expirationDate
}
1 change: 1 addition & 0 deletions plugins/nf-tower/src/resources/META-INF/extensions.idx
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@
#

io.seqera.tower.plugin.TowerFactory
io.seqera.tower.plugin.TowerFusionEnv
Loading

0 comments on commit e7b1082

Please sign in to comment.