Skip to content

Commit

Permalink
[SPARK-17625][SQL] set expectedOutputAttributes when converting Simpl…
Browse files Browse the repository at this point in the history
…eCatalogRelation to LogicalRelation

## What changes were proposed in this pull request?

We should set expectedOutputAttributes when converting SimpleCatalogRelation to LogicalRelation, otherwise the outputs of LogicalRelation are different from outputs of SimpleCatalogRelation - they have different exprId's.

## How was this patch tested?

add a test case

Author: Zhenhua Wang <[email protected]>

Closes apache#15182 from wzhfy/expectedAttributes.
  • Loading branch information
wzhfy authored and cloud-fan committed Sep 22, 2016
1 parent 3a80f92 commit de7df7d
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,10 @@ case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] {
* source information.
*/
class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan] {
private def readDataSourceTable(sparkSession: SparkSession, table: CatalogTable): LogicalPlan = {
private def readDataSourceTable(
sparkSession: SparkSession,
simpleCatalogRelation: SimpleCatalogRelation): LogicalPlan = {
val table = simpleCatalogRelation.catalogTable
val dataSource =
DataSource(
sparkSession,
Expand All @@ -209,16 +212,17 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan]

LogicalRelation(
dataSource.resolveRelation(),
expectedOutputAttributes = Some(simpleCatalogRelation.output),
catalogTable = Some(table))
}

override def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case i @ logical.InsertIntoTable(s: SimpleCatalogRelation, _, _, _, _)
if DDLUtils.isDatasourceTable(s.metadata) =>
i.copy(table = readDataSourceTable(sparkSession, s.metadata))
i.copy(table = readDataSourceTable(sparkSession, s))

case s: SimpleCatalogRelation if DDLUtils.isDatasourceTable(s.metadata) =>
readDataSourceTable(sparkSession, s.metadata)
readDataSourceTable(sparkSession, s)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ import scala.util.Random
import org.scalatest.Matchers._

import org.apache.spark.SparkException
import org.apache.spark.sql.catalyst.plans.logical.{OneRowRelation, Union}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.plans.logical.{OneRowRelation, Project, Union}
import org.apache.spark.sql.execution.QueryExecution
import org.apache.spark.sql.execution.aggregate.HashAggregateExec
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ReusedExchangeExec, ShuffleExchange}
Expand Down Expand Up @@ -1585,4 +1586,15 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
val d = sampleDf.withColumn("c", monotonically_increasing_id).select($"c").collect
assert(d.size == d.distinct.size)
}

test("SPARK-17625: data source table in InMemoryCatalog should guarantee output consistency") {
val tableName = "tbl"
withTable(tableName) {
spark.range(10).select('id as 'i, 'id as 'j).write.saveAsTable(tableName)
val relation = spark.sessionState.catalog.lookupRelation(TableIdentifier(tableName))
val expr = relation.resolve("i")
val qe = spark.sessionState.executePlan(Project(Seq(expr), relation))
qe.assertAnalyzed()
}
}
}

0 comments on commit de7df7d

Please sign in to comment.