Skip to content

Commit

Permalink
[SPARK-16356][ML] Add testImplicits for ML unit tests and promote toDF()
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

This was suggested in apache@101663f#commitcomment-17114968.

This PR adds `testImplicits` to `MLlibTestSparkContext` so that some implicits such as `toDF()` can be sued across ml tests.

This PR also changes all the usages of `spark.createDataFrame( ... )` to `toDF()` where applicable in ml tests in Scala.

## How was this patch tested?

Existing tests should work.

Author: hyukjinkwon <[email protected]>

Closes apache#14035 from HyukjinKwon/minor-ml-test.
  • Loading branch information
HyukjinKwon authored and yanboliang committed Sep 26, 2016
1 parent 50b89d0 commit f234b7c
Show file tree
Hide file tree
Showing 45 changed files with 462 additions and 460 deletions.
13 changes: 7 additions & 6 deletions mllib/src/test/scala/org/apache/spark/ml/PipelineSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ import org.apache.spark.sql.types.StructType

class PipelineSuite extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest {

import testImplicits._

abstract class MyModel extends Model[MyModel]

test("pipeline") {
Expand Down Expand Up @@ -183,12 +185,11 @@ class PipelineSuite extends SparkFunSuite with MLlibTestSparkContext with Defaul
}

test("pipeline validateParams") {
val df = spark.createDataFrame(
Seq(
(1, Vectors.dense(0.0, 1.0, 4.0), 1.0),
(2, Vectors.dense(1.0, 0.0, 4.0), 2.0),
(3, Vectors.dense(1.0, 0.0, 5.0), 3.0),
(4, Vectors.dense(0.0, 0.0, 5.0), 4.0))
val df = Seq(
(1, Vectors.dense(0.0, 1.0, 4.0), 1.0),
(2, Vectors.dense(1.0, 0.0, 4.0), 2.0),
(3, Vectors.dense(1.0, 0.0, 5.0), 3.0),
(4, Vectors.dense(0.0, 0.0, 5.0), 4.0)
).toDF("id", "features", "label")

intercept[IllegalArgumentException] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,13 @@ import org.apache.spark.sql.{DataFrame, Dataset}

class ClassifierSuite extends SparkFunSuite with MLlibTestSparkContext {

test("extractLabeledPoints") {
def getTestData(labels: Seq[Double]): DataFrame = {
val data = labels.map { label: Double => LabeledPoint(label, Vectors.dense(0.0)) }
spark.createDataFrame(data)
}
import testImplicits._

private def getTestData(labels: Seq[Double]): DataFrame = {
labels.map { label: Double => LabeledPoint(label, Vectors.dense(0.0)) }.toDF()
}

test("extractLabeledPoints") {
val c = new MockClassifier
// Valid dataset
val df0 = getTestData(Seq(0.0, 2.0, 1.0, 5.0))
Expand Down Expand Up @@ -70,11 +71,6 @@ class ClassifierSuite extends SparkFunSuite with MLlibTestSparkContext {
}

test("getNumClasses") {
def getTestData(labels: Seq[Double]): DataFrame = {
val data = labels.map { label: Double => LabeledPoint(label, Vectors.dense(0.0)) }
spark.createDataFrame(data)
}

val c = new MockClassifier
// Valid dataset
val df0 = getTestData(Seq(0.0, 2.0, 1.0, 5.0))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ class DecisionTreeClassifierSuite
extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest {

import DecisionTreeClassifierSuite.compareAPIs
import testImplicits._

private var categoricalDataPointsRDD: RDD[LabeledPoint] = _
private var orderedLabeledPointsWithLabel0RDD: RDD[LabeledPoint] = _
Expand Down Expand Up @@ -345,7 +346,7 @@ class DecisionTreeClassifierSuite
}

test("Fitting without numClasses in metadata") {
val df: DataFrame = spark.createDataFrame(TreeTests.featureImportanceData(sc))
val df: DataFrame = TreeTests.featureImportanceData(sc).toDF()
val dt = new DecisionTreeClassifier().setMaxDepth(1)
dt.fit(df)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import org.apache.spark.util.Utils
class GBTClassifierSuite extends SparkFunSuite with MLlibTestSparkContext
with DefaultReadWriteTest {

import testImplicits._
import GBTClassifierSuite.compareAPIs

// Combinations for estimators, learning rates and subsamplingRate
Expand Down Expand Up @@ -134,15 +135,14 @@ class GBTClassifierSuite extends SparkFunSuite with MLlibTestSparkContext
*/

test("Fitting without numClasses in metadata") {
val df: DataFrame = spark.createDataFrame(TreeTests.featureImportanceData(sc))
val df: DataFrame = TreeTests.featureImportanceData(sc).toDF()
val gbt = new GBTClassifier().setMaxDepth(1).setMaxIter(1)
gbt.fit(df)
}

test("extractLabeledPoints with bad data") {
def getTestData(labels: Seq[Double]): DataFrame = {
val data = labels.map { label: Double => LabeledPoint(label, Vectors.dense(0.0)) }
spark.createDataFrame(data)
labels.map { label: Double => LabeledPoint(label, Vectors.dense(0.0)) }.toDF()
}

val gbt = new GBTClassifier().setMaxDepth(1).setMaxIter(1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ import org.apache.spark.sql.functions.lit
class LogisticRegressionSuite
extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest {

import testImplicits._

@transient var smallBinaryDataset: Dataset[_] = _
@transient var smallMultinomialDataset: Dataset[_] = _
@transient var binaryDataset: Dataset[_] = _
Expand All @@ -46,8 +48,7 @@ class LogisticRegressionSuite
override def beforeAll(): Unit = {
super.beforeAll()

smallBinaryDataset =
spark.createDataFrame(generateLogisticInput(1.0, 1.0, nPoints = 100, seed = 42))
smallBinaryDataset = generateLogisticInput(1.0, 1.0, nPoints = 100, seed = 42).toDF()

smallMultinomialDataset = {
val nPoints = 100
Expand All @@ -61,7 +62,7 @@ class LogisticRegressionSuite
val testData = generateMultinomialLogisticInput(
coefficients, xMean, xVariance, addIntercept = true, nPoints, 42)

val df = spark.createDataFrame(sc.parallelize(testData, 4))
val df = sc.parallelize(testData, 4).toDF()
df.cache()
df
}
Expand All @@ -76,7 +77,7 @@ class LogisticRegressionSuite
generateMultinomialLogisticInput(coefficients, xMean, xVariance,
addIntercept = true, nPoints, 42)

spark.createDataFrame(sc.parallelize(testData, 4))
sc.parallelize(testData, 4).toDF()
}

multinomialDataset = {
Expand All @@ -91,7 +92,7 @@ class LogisticRegressionSuite
val testData = generateMultinomialLogisticInput(
coefficients, xMean, xVariance, addIntercept = true, nPoints, 42)

val df = spark.createDataFrame(sc.parallelize(testData, 4))
val df = sc.parallelize(testData, 4).toDF()
df.cache()
df
}
Expand Down Expand Up @@ -430,10 +431,10 @@ class LogisticRegressionSuite
val model = new LogisticRegressionModel("mLogReg",
Matrices.dense(3, 2, Array(0.0, 0.0, 0.0, 1.0, 2.0, 3.0)),
Vectors.dense(0.0, 0.0, 0.0), 3, true)
val overFlowData = spark.createDataFrame(Seq(
val overFlowData = Seq(
LabeledPoint(1.0, Vectors.dense(0.0, 1000.0)),
LabeledPoint(1.0, Vectors.dense(0.0, -1.0))
))
).toDF()
val results = model.transform(overFlowData).select("rawPrediction", "probability").collect()

// probabilities are correct when margins have to be adjusted
Expand Down Expand Up @@ -1795,9 +1796,9 @@ class LogisticRegressionSuite
val numPoints = 40
val outlierData = MLTestingUtils.genClassificationInstancesWithWeightedOutliers(spark,
numClasses, numPoints)
val testData = spark.createDataFrame(Array.tabulate[LabeledPoint](numClasses) { i =>
val testData = Array.tabulate[LabeledPoint](numClasses) { i =>
LabeledPoint(i.toDouble, Vectors.dense(i.toDouble))
})
}.toSeq.toDF()
val lr = new LogisticRegression().setFamily("binomial").setWeightCol("weight")
val model = lr.fit(outlierData)
val results = model.transform(testData).select("label", "prediction").collect()
Expand All @@ -1819,9 +1820,9 @@ class LogisticRegressionSuite
val numPoints = 40
val outlierData = MLTestingUtils.genClassificationInstancesWithWeightedOutliers(spark,
numClasses, numPoints)
val testData = spark.createDataFrame(Array.tabulate[LabeledPoint](numClasses) { i =>
val testData = Array.tabulate[LabeledPoint](numClasses) { i =>
LabeledPoint(i.toDouble, Vectors.dense(i.toDouble))
})
}.toSeq.toDF()
val mlr = new LogisticRegression().setFamily("multinomial").setWeightCol("weight")
val model = mlr.fit(outlierData)
val results = model.transform(testData).select("label", "prediction").collect()
Expand Down Expand Up @@ -1945,11 +1946,10 @@ class LogisticRegressionSuite
}

test("multiclass logistic regression with all labels the same") {
val constantData = spark.createDataFrame(Seq(
val constantData = Seq(
LabeledPoint(4.0, Vectors.dense(0.0)),
LabeledPoint(4.0, Vectors.dense(1.0)),
LabeledPoint(4.0, Vectors.dense(2.0)))
)
LabeledPoint(4.0, Vectors.dense(2.0))).toDF()
val mlr = new LogisticRegression().setFamily("multinomial")
val model = mlr.fit(constantData)
val results = model.transform(constantData)
Expand All @@ -1961,11 +1961,10 @@ class LogisticRegressionSuite
}

// force the model to be trained with only one class
val constantZeroData = spark.createDataFrame(Seq(
val constantZeroData = Seq(
LabeledPoint(0.0, Vectors.dense(0.0)),
LabeledPoint(0.0, Vectors.dense(1.0)),
LabeledPoint(0.0, Vectors.dense(2.0)))
)
LabeledPoint(0.0, Vectors.dense(2.0))).toDF()
val modelZeroLabel = mlr.setFitIntercept(false).fit(constantZeroData)
val resultsZero = modelZeroLabel.transform(constantZeroData)
resultsZero.select("rawPrediction", "probability", "prediction").collect().foreach {
Expand All @@ -1990,20 +1989,18 @@ class LogisticRegressionSuite
}

test("compressed storage") {
val moreClassesThanFeatures = spark.createDataFrame(Seq(
val moreClassesThanFeatures = Seq(
LabeledPoint(4.0, Vectors.dense(0.0, 0.0, 0.0)),
LabeledPoint(4.0, Vectors.dense(1.0, 1.0, 1.0)),
LabeledPoint(4.0, Vectors.dense(2.0, 2.0, 2.0)))
)
LabeledPoint(4.0, Vectors.dense(2.0, 2.0, 2.0))).toDF()
val mlr = new LogisticRegression().setFamily("multinomial")
val model = mlr.fit(moreClassesThanFeatures)
assert(model.coefficientMatrix.isInstanceOf[SparseMatrix])
assert(model.coefficientMatrix.asInstanceOf[SparseMatrix].colPtrs.length === 4)
val moreFeaturesThanClasses = spark.createDataFrame(Seq(
val moreFeaturesThanClasses = Seq(
LabeledPoint(1.0, Vectors.dense(0.0, 0.0, 0.0)),
LabeledPoint(1.0, Vectors.dense(1.0, 1.0, 1.0)),
LabeledPoint(1.0, Vectors.dense(2.0, 2.0, 2.0)))
)
LabeledPoint(1.0, Vectors.dense(2.0, 2.0, 2.0))).toDF()
val model2 = mlr.fit(moreFeaturesThanClasses)
assert(model2.coefficientMatrix.isInstanceOf[SparseMatrix])
assert(model2.coefficientMatrix.asInstanceOf[SparseMatrix].colPtrs.length === 3)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,18 @@ import org.apache.spark.sql.{Dataset, Row}
class MultilayerPerceptronClassifierSuite
extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest {

import testImplicits._

@transient var dataset: Dataset[_] = _

override def beforeAll(): Unit = {
super.beforeAll()

dataset = spark.createDataFrame(Seq(
(Vectors.dense(0.0, 0.0), 0.0),
(Vectors.dense(0.0, 1.0), 1.0),
(Vectors.dense(1.0, 0.0), 1.0),
(Vectors.dense(1.0, 1.0), 0.0))
dataset = Seq(
(Vectors.dense(0.0, 0.0), 0.0),
(Vectors.dense(0.0, 1.0), 1.0),
(Vectors.dense(1.0, 0.0), 1.0),
(Vectors.dense(1.0, 1.0), 0.0)
).toDF("features", "label")
}

Expand Down Expand Up @@ -80,11 +82,11 @@ class MultilayerPerceptronClassifierSuite
}

test("Test setWeights by training restart") {
val dataFrame = spark.createDataFrame(Seq(
val dataFrame = Seq(
(Vectors.dense(0.0, 0.0), 0.0),
(Vectors.dense(0.0, 1.0), 1.0),
(Vectors.dense(1.0, 0.0), 1.0),
(Vectors.dense(1.0, 1.0), 0.0))
(Vectors.dense(1.0, 1.0), 0.0)
).toDF("features", "label")
val layers = Array[Int](2, 5, 2)
val trainer = new MultilayerPerceptronClassifier()
Expand Down Expand Up @@ -114,9 +116,9 @@ class MultilayerPerceptronClassifierSuite
val xMean = Array(5.843, 3.057, 3.758, 1.199)
val xVariance = Array(0.6856, 0.1899, 3.116, 0.581)
// the input seed is somewhat magic, to make this test pass
val rdd = sc.parallelize(generateMultinomialLogisticInput(
coefficients, xMean, xVariance, true, nPoints, 1), 2)
val dataFrame = spark.createDataFrame(rdd).toDF("label", "features")
val data = generateMultinomialLogisticInput(
coefficients, xMean, xVariance, true, nPoints, 1).toDS()
val dataFrame = data.toDF("label", "features")
val numClasses = 3
val numIterations = 100
val layers = Array[Int](4, 5, 4, numClasses)
Expand All @@ -137,9 +139,9 @@ class MultilayerPerceptronClassifierSuite
.setNumClasses(numClasses)
lr.optimizer.setRegParam(0.0)
.setNumIterations(numIterations)
val lrModel = lr.run(rdd.map(OldLabeledPoint.fromML))
val lrModel = lr.run(data.rdd.map(OldLabeledPoint.fromML))
val lrPredictionAndLabels =
lrModel.predict(rdd.map(p => OldVectors.fromML(p.features))).zip(rdd.map(_.label))
lrModel.predict(data.rdd.map(p => OldVectors.fromML(p.features))).zip(data.rdd.map(_.label))
// MLP's predictions should not differ a lot from LR's.
val lrMetrics = new MulticlassMetrics(lrPredictionAndLabels)
val mlpMetrics = new MulticlassMetrics(mlpPredictionAndLabels)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ import org.apache.spark.sql.{DataFrame, Dataset, Row}

class NaiveBayesSuite extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest {

import testImplicits._

@transient var dataset: Dataset[_] = _

override def beforeAll(): Unit = {
Expand All @@ -47,7 +49,7 @@ class NaiveBayesSuite extends SparkFunSuite with MLlibTestSparkContext with Defa
Array(0.10, 0.10, 0.70, 0.10) // label 2
).map(_.map(math.log))

dataset = spark.createDataFrame(generateNaiveBayesInput(pi, theta, 100, 42))
dataset = generateNaiveBayesInput(pi, theta, 100, 42).toDF()
}

def validatePrediction(predictionAndLabels: DataFrame): Unit = {
Expand Down Expand Up @@ -131,16 +133,16 @@ class NaiveBayesSuite extends SparkFunSuite with MLlibTestSparkContext with Defa
val pi = Vectors.dense(piArray)
val theta = new DenseMatrix(3, 4, thetaArray.flatten, true)

val testDataset = spark.createDataFrame(generateNaiveBayesInput(
piArray, thetaArray, nPoints, 42, "multinomial"))
val testDataset =
generateNaiveBayesInput(piArray, thetaArray, nPoints, 42, "multinomial").toDF()
val nb = new NaiveBayes().setSmoothing(1.0).setModelType("multinomial")
val model = nb.fit(testDataset)

validateModelFit(pi, theta, model)
assert(model.hasParent)

val validationDataset = spark.createDataFrame(generateNaiveBayesInput(
piArray, thetaArray, nPoints, 17, "multinomial"))
val validationDataset =
generateNaiveBayesInput(piArray, thetaArray, nPoints, 17, "multinomial").toDF()

val predictionAndLabels = model.transform(validationDataset).select("prediction", "label")
validatePrediction(predictionAndLabels)
Expand All @@ -161,16 +163,16 @@ class NaiveBayesSuite extends SparkFunSuite with MLlibTestSparkContext with Defa
val pi = Vectors.dense(piArray)
val theta = new DenseMatrix(3, 12, thetaArray.flatten, true)

val testDataset = spark.createDataFrame(generateNaiveBayesInput(
piArray, thetaArray, nPoints, 45, "bernoulli"))
val testDataset =
generateNaiveBayesInput(piArray, thetaArray, nPoints, 45, "bernoulli").toDF()
val nb = new NaiveBayes().setSmoothing(1.0).setModelType("bernoulli")
val model = nb.fit(testDataset)

validateModelFit(pi, theta, model)
assert(model.hasParent)

val validationDataset = spark.createDataFrame(generateNaiveBayesInput(
piArray, thetaArray, nPoints, 20, "bernoulli"))
val validationDataset =
generateNaiveBayesInput(piArray, thetaArray, nPoints, 20, "bernoulli").toDF()

val predictionAndLabels = model.transform(validationDataset).select("prediction", "label")
validatePrediction(predictionAndLabels)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ import org.apache.spark.sql.types.Metadata

class OneVsRestSuite extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest {

import testImplicits._

@transient var dataset: Dataset[_] = _
@transient var rdd: RDD[LabeledPoint] = _

Expand All @@ -55,7 +57,7 @@ class OneVsRestSuite extends SparkFunSuite with MLlibTestSparkContext with Defau
val xVariance = Array(0.6856, 0.1899, 3.116, 0.581)
rdd = sc.parallelize(generateMultinomialLogisticInput(
coefficients, xMean, xVariance, true, nPoints, 42), 2)
dataset = spark.createDataFrame(rdd)
dataset = rdd.toDF()
}

test("params") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ class RandomForestClassifierSuite
extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest {

import RandomForestClassifierSuite.compareAPIs
import testImplicits._

private var orderedLabeledPoints50_1000: RDD[LabeledPoint] = _
private var orderedLabeledPoints5_20: RDD[LabeledPoint] = _
Expand Down Expand Up @@ -158,7 +159,7 @@ class RandomForestClassifierSuite
}

test("Fitting without numClasses in metadata") {
val df: DataFrame = spark.createDataFrame(TreeTests.featureImportanceData(sc))
val df: DataFrame = TreeTests.featureImportanceData(sc).toDF()
val rf = new RandomForestClassifier().setMaxDepth(1).setNumTrees(1)
rf.fit(df)
}
Expand Down
Loading

0 comments on commit f234b7c

Please sign in to comment.