Skip to content

Commit

Permalink
[FLINK-34580][rest] Do not erase "pipeline.classpaths" config during …
Browse files Browse the repository at this point in the history
…REST job deploy
  • Loading branch information
ferenc-csaky authored and gyfora committed Mar 7, 2024
1 parent 2429c29 commit d0ce534
Show file tree
Hide file tree
Showing 3 changed files with 128 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.client.program;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ProgramDescription;
import org.apache.flink.client.ClientUtils;
import org.apache.flink.configuration.Configuration;
Expand Down Expand Up @@ -683,6 +684,11 @@ public Builder setSavepointRestoreSettings(
return this;
}

@VisibleForTesting
public List<URL> getUserClassPaths() {
return userClassPaths;
}

public PackagedProgram build() throws ProgramInvocationException {
if (jarFile == null && entryPointClassName == null) {
throw new IllegalArgumentException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

import java.net.MalformedURLException;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Path;
Expand Down Expand Up @@ -182,17 +183,22 @@ public PackagedProgram toPackagedProgram(Configuration configuration) {
}

try {
return PackagedProgram.newBuilder()
.setJarFile(jarFile.toFile())
.setEntryPointClassName(entryClass)
.setConfiguration(configuration)
.setArguments(programArgs.toArray(new String[0]))
.build();
return initPackagedProgramBuilder(configuration).build();
} catch (final ProgramInvocationException e) {
throw new CompletionException(e);
}
}

@VisibleForTesting
PackagedProgram.Builder initPackagedProgramBuilder(Configuration configuration) {
return PackagedProgram.newBuilder()
.setJarFile(jarFile.toFile())
.setEntryPointClassName(entryClass)
.setConfiguration(configuration)
.setUserClassPaths(getClasspaths(configuration))
.setArguments(programArgs.toArray(new String[0]));
}

@VisibleForTesting
String getEntryClass() {
return entryClass;
Expand All @@ -214,6 +220,21 @@ JobID getJobId() {
}
}

private static List<URL> getClasspaths(Configuration configuration) {
try {
return ConfigUtils.decodeListFromConfig(
configuration, PipelineOptions.CLASSPATHS, URL::new);
} catch (MalformedURLException e) {
throw new CompletionException(
new RestHandlerException(
String.format(
"Failed to extract '%s' as URLs. Provided value: %s",
PipelineOptions.CLASSPATHS.key(),
configuration.get(PipelineOptions.CLASSPATHS)),
HttpResponseStatus.BAD_REQUEST));
}
}

/** Parse program arguments in jar run or plan request. */
private static <R extends JarRequestBody, M extends MessageParameters>
List<String> getProgramArgs(HandlerRequest<R> request, Logger log)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@
package org.apache.flink.runtime.webmonitor.handlers.utils;

import org.apache.flink.api.common.JobID;
import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.handler.RestHandlerException;
import org.apache.flink.runtime.webmonitor.handlers.JarPlanRequestBody;
Expand All @@ -31,18 +34,27 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import java.net.URL;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

/** Tests for {@link JarHandlerUtils}. */
class JarHandlerUtilsTest {

private static final Logger LOG = LoggerFactory.getLogger(JarHandlerUtilsTest.class);

@TempDir private Path tempDir;

@Test
void testTokenizeNonQuoted() {
final List<String> arguments = JarHandlerUtils.tokenizeArguments("--foo bar");
Expand All @@ -65,17 +77,14 @@ void testTokenizeDoubleQuoted() {
}

@Test
void testFromRequestDefaults(@TempDir Path tmp) throws RestHandlerException {
final JarRunMessageParameters parameters =
JarRunHeaders.getInstance().getUnresolvedMessageParameters();

parameters.jarIdPathParameter.resolve("someJar");
void testFromRequestDefaults() throws Exception {
final JarRunMessageParameters parameters = getDummyMessageParameters();

final HandlerRequest<JarPlanRequestBody> request =
HandlerRequest.create(new JarPlanRequestBody(), parameters);

final JarHandlerUtils.JarHandlerContext jarHandlerContext =
JarHandlerUtils.JarHandlerContext.fromRequest(request, tmp, LOG);
JarHandlerUtils.JarHandlerContext.fromRequest(request, tempDir, LOG);
assertThat(jarHandlerContext.getEntryClass()).isNull();
assertThat(jarHandlerContext.getProgramArgs()).isEmpty();
assertThat(jarHandlerContext.getParallelism())
Expand All @@ -84,25 +93,12 @@ void testFromRequestDefaults(@TempDir Path tmp) throws RestHandlerException {
}

@Test
void testFromRequestRequestBody(@TempDir Path tmp) throws RestHandlerException {
final JarRunMessageParameters parameters =
JarRunHeaders.getInstance().getUnresolvedMessageParameters();

parameters.jarIdPathParameter.resolve("someJar");

final JarPlanRequestBody requestBody =
new JarPlanRequestBody(
"entry-class",
null,
Arrays.asList("arg1", "arg2"),
37,
JobID.generate(),
null);
final HandlerRequest<JarPlanRequestBody> request =
HandlerRequest.create(requestBody, parameters);
void testFromRequestRequestBody() throws Exception {
final JarPlanRequestBody requestBody = getDummyJarPlanRequestBody("entry-class", 37, null);
final HandlerRequest<JarPlanRequestBody> request = getDummyRequest(requestBody);

final JarHandlerUtils.JarHandlerContext jarHandlerContext =
JarHandlerUtils.JarHandlerContext.fromRequest(request, tmp, LOG);
JarHandlerUtils.JarHandlerContext.fromRequest(request, tempDir, LOG);
assertThat(jarHandlerContext.getEntryClass()).isEqualTo(requestBody.getEntryClassName());
assertThat(jarHandlerContext.getProgramArgs())
.containsExactlyElementsOf(requestBody.getProgramArgumentsList());
Expand All @@ -111,28 +107,91 @@ void testFromRequestRequestBody(@TempDir Path tmp) throws RestHandlerException {
}

@Test
void testFromRequestWithParallelismConfig(@TempDir Path tmp) throws RestHandlerException {
void testFromRequestWithParallelismConfig() throws Exception {
final int parallelism = 37;
final JarRunMessageParameters parameters =
JarRunHeaders.getInstance().getUnresolvedMessageParameters();

parameters.jarIdPathParameter.resolve("someJar");

final JarPlanRequestBody requestBody =
new JarPlanRequestBody(
getDummyJarPlanRequestBody(
"entry-class",
null,
Arrays.asList("arg1", "arg2"),
null,
JobID.generate(),
Collections.singletonMap(
CoreOptions.DEFAULT_PARALLELISM.key(),
String.valueOf(parallelism)));
final HandlerRequest<JarPlanRequestBody> request =
HandlerRequest.create(requestBody, parameters);
final HandlerRequest<JarPlanRequestBody> request = getDummyRequest(requestBody);

final JarHandlerUtils.JarHandlerContext jarHandlerContext =
JarHandlerUtils.JarHandlerContext.fromRequest(request, tmp, LOG);
JarHandlerUtils.JarHandlerContext.fromRequest(request, tempDir, LOG);
assertThat(jarHandlerContext.getParallelism()).isEqualTo(parallelism);
}

@Test
void testClasspathsConfigNotErased() throws Exception {
final JarPlanRequestBody requestBody =
getDummyJarPlanRequestBody(
null,
null,
Collections.singletonMap(
PipelineOptions.CLASSPATHS.key(),
"file:/tmp/some.jar;file:/tmp/another.jar"));

final HandlerRequest<JarPlanRequestBody> request = getDummyRequest(requestBody);

final JarHandlerUtils.JarHandlerContext jarHandlerContext =
JarHandlerUtils.JarHandlerContext.fromRequest(request, tempDir, LOG);

final Configuration originalConfig = request.getRequestBody().getFlinkConfiguration();
final PackagedProgram.Builder builder =
jarHandlerContext.initPackagedProgramBuilder(originalConfig);
final List<String> retrievedClasspaths =
builder.getUserClassPaths().stream()
.map(URL::toString)
.collect(Collectors.toList());

assertThat(retrievedClasspaths).isEqualTo(originalConfig.get(PipelineOptions.CLASSPATHS));
}

@Test
void testMalformedClasspathsConfig() throws Exception {
final JarPlanRequestBody requestBody =
getDummyJarPlanRequestBody(
null,
null,
Collections.singletonMap(
PipelineOptions.CLASSPATHS.key(), "invalid|:/jar"));
final HandlerRequest<JarPlanRequestBody> request = getDummyRequest(requestBody);

final JarHandlerUtils.JarHandlerContext jarHandlerContext =
JarHandlerUtils.JarHandlerContext.fromRequest(request, tempDir, LOG);

final Configuration originalConfig = request.getRequestBody().getFlinkConfiguration();

assertThatThrownBy(() -> jarHandlerContext.initPackagedProgramBuilder(originalConfig))
.satisfies(anyCauseMatches(RestHandlerException.class, "invalid|:/jar"));
}

private HandlerRequest<JarPlanRequestBody> getDummyRequest(
@Nullable JarPlanRequestBody requestBody) {
return HandlerRequest.create(
requestBody == null ? new JarPlanRequestBody() : requestBody,
getDummyMessageParameters());
}

private JarRunMessageParameters getDummyMessageParameters() {
final JarRunMessageParameters parameters =
JarRunHeaders.getInstance().getUnresolvedMessageParameters();

parameters.jarIdPathParameter.resolve("someJar");

return parameters;
}

private JarPlanRequestBody getDummyJarPlanRequestBody(
String entryClass, Integer parallelism, Map<String, String> flinkConfiguration) {
return new JarPlanRequestBody(
entryClass,
null,
Arrays.asList("arg1", "arg2"),
parallelism,
JobID.generate(),
flinkConfiguration);
}
}

0 comments on commit d0ce534

Please sign in to comment.