diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java index f7f5df28d1b2e..f2bdb9a2e3e6a 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java @@ -18,6 +18,7 @@ package org.apache.flink.client.program; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.ProgramDescription; import org.apache.flink.client.ClientUtils; import org.apache.flink.configuration.Configuration; @@ -683,6 +684,11 @@ public Builder setSavepointRestoreSettings( return this; } + @VisibleForTesting + public List getUserClassPaths() { + return userClassPaths; + } + public PackagedProgram build() throws ProgramInvocationException { if (jarFile == null && entryPointClassName == null) { throw new IllegalArgumentException( diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/utils/JarHandlerUtils.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/utils/JarHandlerUtils.java index c5e0f492c96c2..9b3ff11752ad4 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/utils/JarHandlerUtils.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/utils/JarHandlerUtils.java @@ -46,6 +46,7 @@ import javax.annotation.Nonnull; import javax.annotation.Nullable; +import java.net.MalformedURLException; import java.net.URL; import java.nio.file.Files; import java.nio.file.Path; @@ -182,17 +183,22 @@ public PackagedProgram toPackagedProgram(Configuration configuration) { } try { - return PackagedProgram.newBuilder() - .setJarFile(jarFile.toFile()) - .setEntryPointClassName(entryClass) - .setConfiguration(configuration) - .setArguments(programArgs.toArray(new String[0])) - .build(); + return initPackagedProgramBuilder(configuration).build(); } catch (final ProgramInvocationException e) { throw new CompletionException(e); } } + @VisibleForTesting + PackagedProgram.Builder initPackagedProgramBuilder(Configuration configuration) { + return PackagedProgram.newBuilder() + .setJarFile(jarFile.toFile()) + .setEntryPointClassName(entryClass) + .setConfiguration(configuration) + .setUserClassPaths(getClasspaths(configuration)) + .setArguments(programArgs.toArray(new String[0])); + } + @VisibleForTesting String getEntryClass() { return entryClass; @@ -214,6 +220,21 @@ JobID getJobId() { } } + private static List getClasspaths(Configuration configuration) { + try { + return ConfigUtils.decodeListFromConfig( + configuration, PipelineOptions.CLASSPATHS, URL::new); + } catch (MalformedURLException e) { + throw new CompletionException( + new RestHandlerException( + String.format( + "Failed to extract '%s' as URLs. Provided value: %s", + PipelineOptions.CLASSPATHS.key(), + configuration.get(PipelineOptions.CLASSPATHS)), + HttpResponseStatus.BAD_REQUEST)); + } + } + /** Parse program arguments in jar run or plan request. */ private static List getProgramArgs(HandlerRequest request, Logger log) diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/utils/JarHandlerUtilsTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/utils/JarHandlerUtilsTest.java index 53643cabe9552..d90ae902b0588 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/utils/JarHandlerUtilsTest.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/utils/JarHandlerUtilsTest.java @@ -19,7 +19,10 @@ package org.apache.flink.runtime.webmonitor.handlers.utils; import org.apache.flink.api.common.JobID; +import org.apache.flink.client.program.PackagedProgram; +import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.CoreOptions; +import org.apache.flink.configuration.PipelineOptions; import org.apache.flink.runtime.rest.handler.HandlerRequest; import org.apache.flink.runtime.rest.handler.RestHandlerException; import org.apache.flink.runtime.webmonitor.handlers.JarPlanRequestBody; @@ -31,18 +34,27 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nullable; + +import java.net.URL; import java.nio.file.Path; import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; /** Tests for {@link JarHandlerUtils}. */ class JarHandlerUtilsTest { private static final Logger LOG = LoggerFactory.getLogger(JarHandlerUtilsTest.class); + @TempDir private Path tempDir; + @Test void testTokenizeNonQuoted() { final List arguments = JarHandlerUtils.tokenizeArguments("--foo bar"); @@ -65,17 +77,14 @@ void testTokenizeDoubleQuoted() { } @Test - void testFromRequestDefaults(@TempDir Path tmp) throws RestHandlerException { - final JarRunMessageParameters parameters = - JarRunHeaders.getInstance().getUnresolvedMessageParameters(); - - parameters.jarIdPathParameter.resolve("someJar"); + void testFromRequestDefaults() throws Exception { + final JarRunMessageParameters parameters = getDummyMessageParameters(); final HandlerRequest request = HandlerRequest.create(new JarPlanRequestBody(), parameters); final JarHandlerUtils.JarHandlerContext jarHandlerContext = - JarHandlerUtils.JarHandlerContext.fromRequest(request, tmp, LOG); + JarHandlerUtils.JarHandlerContext.fromRequest(request, tempDir, LOG); assertThat(jarHandlerContext.getEntryClass()).isNull(); assertThat(jarHandlerContext.getProgramArgs()).isEmpty(); assertThat(jarHandlerContext.getParallelism()) @@ -84,25 +93,12 @@ void testFromRequestDefaults(@TempDir Path tmp) throws RestHandlerException { } @Test - void testFromRequestRequestBody(@TempDir Path tmp) throws RestHandlerException { - final JarRunMessageParameters parameters = - JarRunHeaders.getInstance().getUnresolvedMessageParameters(); - - parameters.jarIdPathParameter.resolve("someJar"); - - final JarPlanRequestBody requestBody = - new JarPlanRequestBody( - "entry-class", - null, - Arrays.asList("arg1", "arg2"), - 37, - JobID.generate(), - null); - final HandlerRequest request = - HandlerRequest.create(requestBody, parameters); + void testFromRequestRequestBody() throws Exception { + final JarPlanRequestBody requestBody = getDummyJarPlanRequestBody("entry-class", 37, null); + final HandlerRequest request = getDummyRequest(requestBody); final JarHandlerUtils.JarHandlerContext jarHandlerContext = - JarHandlerUtils.JarHandlerContext.fromRequest(request, tmp, LOG); + JarHandlerUtils.JarHandlerContext.fromRequest(request, tempDir, LOG); assertThat(jarHandlerContext.getEntryClass()).isEqualTo(requestBody.getEntryClassName()); assertThat(jarHandlerContext.getProgramArgs()) .containsExactlyElementsOf(requestBody.getProgramArgumentsList()); @@ -111,28 +107,91 @@ void testFromRequestRequestBody(@TempDir Path tmp) throws RestHandlerException { } @Test - void testFromRequestWithParallelismConfig(@TempDir Path tmp) throws RestHandlerException { + void testFromRequestWithParallelismConfig() throws Exception { final int parallelism = 37; - final JarRunMessageParameters parameters = - JarRunHeaders.getInstance().getUnresolvedMessageParameters(); - - parameters.jarIdPathParameter.resolve("someJar"); - final JarPlanRequestBody requestBody = - new JarPlanRequestBody( + getDummyJarPlanRequestBody( "entry-class", null, - Arrays.asList("arg1", "arg2"), - null, - JobID.generate(), Collections.singletonMap( CoreOptions.DEFAULT_PARALLELISM.key(), String.valueOf(parallelism))); - final HandlerRequest request = - HandlerRequest.create(requestBody, parameters); + final HandlerRequest request = getDummyRequest(requestBody); final JarHandlerUtils.JarHandlerContext jarHandlerContext = - JarHandlerUtils.JarHandlerContext.fromRequest(request, tmp, LOG); + JarHandlerUtils.JarHandlerContext.fromRequest(request, tempDir, LOG); assertThat(jarHandlerContext.getParallelism()).isEqualTo(parallelism); } + + @Test + void testClasspathsConfigNotErased() throws Exception { + final JarPlanRequestBody requestBody = + getDummyJarPlanRequestBody( + null, + null, + Collections.singletonMap( + PipelineOptions.CLASSPATHS.key(), + "file:/tmp/some.jar;file:/tmp/another.jar")); + + final HandlerRequest request = getDummyRequest(requestBody); + + final JarHandlerUtils.JarHandlerContext jarHandlerContext = + JarHandlerUtils.JarHandlerContext.fromRequest(request, tempDir, LOG); + + final Configuration originalConfig = request.getRequestBody().getFlinkConfiguration(); + final PackagedProgram.Builder builder = + jarHandlerContext.initPackagedProgramBuilder(originalConfig); + final List retrievedClasspaths = + builder.getUserClassPaths().stream() + .map(URL::toString) + .collect(Collectors.toList()); + + assertThat(retrievedClasspaths).isEqualTo(originalConfig.get(PipelineOptions.CLASSPATHS)); + } + + @Test + void testMalformedClasspathsConfig() throws Exception { + final JarPlanRequestBody requestBody = + getDummyJarPlanRequestBody( + null, + null, + Collections.singletonMap( + PipelineOptions.CLASSPATHS.key(), "invalid|:/jar")); + final HandlerRequest request = getDummyRequest(requestBody); + + final JarHandlerUtils.JarHandlerContext jarHandlerContext = + JarHandlerUtils.JarHandlerContext.fromRequest(request, tempDir, LOG); + + final Configuration originalConfig = request.getRequestBody().getFlinkConfiguration(); + + assertThatThrownBy(() -> jarHandlerContext.initPackagedProgramBuilder(originalConfig)) + .satisfies(anyCauseMatches(RestHandlerException.class, "invalid|:/jar")); + } + + private HandlerRequest getDummyRequest( + @Nullable JarPlanRequestBody requestBody) { + return HandlerRequest.create( + requestBody == null ? new JarPlanRequestBody() : requestBody, + getDummyMessageParameters()); + } + + private JarRunMessageParameters getDummyMessageParameters() { + final JarRunMessageParameters parameters = + JarRunHeaders.getInstance().getUnresolvedMessageParameters(); + + parameters.jarIdPathParameter.resolve("someJar"); + + return parameters; + } + + private JarPlanRequestBody getDummyJarPlanRequestBody( + String entryClass, Integer parallelism, Map flinkConfiguration) { + return new JarPlanRequestBody( + entryClass, + null, + Arrays.asList("arg1", "arg2"), + parallelism, + JobID.generate(), + flinkConfiguration); + } }