Skip to content

Commit

Permalink
[SPARK-17650] malformed url's throw exceptions before bricking Executors
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

When a malformed URL was sent to Executors through `sc.addJar` and `sc.addFile`, the executors become unusable, because they constantly throw `MalformedURLException`s and can never acknowledge that the file or jar is just bad input.

This PR tries to fix that problem by making sure MalformedURLs can never be submitted through `sc.addJar` and `sc.addFile`. Another solution would be to blacklist bad files and jars on Executors. Maybe fail the first time, and then ignore the second time (but print a warning message).

## How was this patch tested?

Unit tests in SparkContextSuite

Author: Burak Yavuz <[email protected]>

Closes apache#15224 from brkyvz/SPARK-17650.
  • Loading branch information
brkyvz authored and zsxwing committed Sep 26, 2016
1 parent de333d1 commit 59d87d2
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 7 deletions.
16 changes: 9 additions & 7 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark

import java.io._
import java.lang.reflect.Constructor
import java.net.URI
import java.net.{MalformedURLException, URI}
import java.util.{Arrays, Locale, Properties, ServiceLoader, UUID}
import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap}
import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger, AtomicReference}
Expand All @@ -36,18 +36,15 @@ import com.google.common.collect.MapMaker
import org.apache.commons.lang3.SerializationUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.io.{ArrayWritable, BooleanWritable, BytesWritable, DoubleWritable,
FloatWritable, IntWritable, LongWritable, NullWritable, Text, Writable}
import org.apache.hadoop.mapred.{FileInputFormat, InputFormat, JobConf, SequenceFileInputFormat,
TextInputFormat}
import org.apache.hadoop.io.{ArrayWritable, BooleanWritable, BytesWritable, DoubleWritable, FloatWritable, IntWritable, LongWritable, NullWritable, Text, Writable}
import org.apache.hadoop.mapred.{FileInputFormat, InputFormat, JobConf, SequenceFileInputFormat, TextInputFormat}
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat, Job => NewHadoopJob}
import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFormat}

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil}
import org.apache.spark.input.{FixedLengthBinaryInputFormat, PortableDataStream, StreamInputFormat,
WholeTextFileInputFormat}
import org.apache.spark.input.{FixedLengthBinaryInputFormat, PortableDataStream, StreamInputFormat, WholeTextFileInputFormat}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.io.CompressionCodec
Expand Down Expand Up @@ -1452,6 +1449,9 @@ class SparkContext(config: SparkConf) extends Logging {
throw new SparkException(s"Added file $hadoopPath is a directory and recursive is not " +
"turned on.")
}
} else {
// SPARK-17650: Make sure this is a valid URL before adding it to the list of dependencies
Utils.validateURL(uri)
}

val key = if (!isLocal && scheme == "file") {
Expand Down Expand Up @@ -1711,6 +1711,8 @@ class SparkContext(config: SparkConf) extends Logging {
key = env.rpcEnv.fileServer.addJar(new File(path))
} else {
val uri = new URI(path)
// SPARK-17650: Make sure this is a valid URL before adding it to the list of dependencies
Utils.validateURL(uri)
key = uri.getScheme match {
// A JAR file which exists only on the driver node
case null | "file" =>
Expand Down
20 changes: 20 additions & 0 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -697,6 +697,26 @@ private[spark] object Utils extends Logging {
}
}

/**
* Validate that a given URI is actually a valid URL as well.
* @param uri The URI to validate
*/
@throws[MalformedURLException]("when the URI is an invalid URL")
def validateURL(uri: URI): Unit = {
Option(uri.getScheme).getOrElse("file") match {
case "http" | "https" | "ftp" =>
try {
uri.toURL
} catch {
case e: MalformedURLException =>
val ex = new MalformedURLException(s"URI (${uri.toString}) is not a valid URL.")
ex.initCause(e)
throw ex
}
case _ => // will not be turned into a URL anyway
}
}

/**
* Get the path of a temporary directory. Spark's local directories can be configured through
* multiple settings, which are used with the following precedence:
Expand Down
22 changes: 22 additions & 0 deletions core/src/test/scala/org/apache/spark/SparkContextSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark

import java.io.File
import java.net.MalformedURLException
import java.nio.charset.StandardCharsets
import java.util.concurrent.TimeUnit

Expand Down Expand Up @@ -173,6 +174,27 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext {
}
}

test("SPARK-17650: malformed url's throw exceptions before bricking Executors") {
try {
sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local"))
Seq("http", "https", "ftp").foreach { scheme =>
val badURL = s"$scheme://user:pwd/path"
val e1 = intercept[MalformedURLException] {
sc.addFile(badURL)
}
assert(e1.getMessage.contains(badURL))
val e2 = intercept[MalformedURLException] {
sc.addJar(badURL)
}
assert(e2.getMessage.contains(badURL))
assert(sc.addedFiles.isEmpty)
assert(sc.addedJars.isEmpty)
}
} finally {
sc.stop()
}
}

test("addFile recursive works") {
val pluto = Utils.createTempDir()
val neptune = Utils.createTempDir(pluto.getAbsolutePath)
Expand Down

0 comments on commit 59d87d2

Please sign in to comment.