Skip to content

maxliu/spark_postgre

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

19 Commits
 
 
 
 
 
 
 
 

Repository files navigation

Query Postgre from Spark (2.1.1)

The popurse of this blog (Including source code) is to demonstrate a reliable way to connect to PostgreSQL server from Spark 2.1.1.

Preparation to use Postgre in Spark.

1) Copy the driver jar file to the folder of "lib" to enable the workers to use the driver.

In this case, the file is "postgresql-42.1.1.jar"
wget https://jdbc.postgresql.org/download/postgresql-42.1.1.jar

2) Add the following to scala code.

val driver = "org.postgresql.Driver"
Class.forName(driver)
connectionProperties.put("driver", driver)

Data for testing.

The following is a "popular" SQL question:) Given a table of employee with their salaries and departments, find the highest three slaralies in each department.

The employee table in PostgreSQL database.

The schema

create table employee (
    id int,
    name char(50),
    salary int,
    department char(50)
    );

The data

+---+--------------------+------+--------------------+
| id|                name|salary|          department|
+---+--------------------+------+--------------------+
|  1|Joe              ...| 70000|IT               ...|
|  2|Henry            ...| 80000|Sales            ...|
|  3|Sam              ...| 60000|Sales            ...|
|  4|Max              ...| 90000|IT               ...|
|  5|Janet            ...| 69000|IT               ...|
|  6|Randy            ...| 85000|IT               ...|
+---+--------------------+------+--------------------+

Desired result.

+--------------------+--------------------+------+
|          department|                name|salary|
+--------------------+--------------------+------+
|IT               ...|Max              ...| 90000|
|IT               ...|Randy            ...| 85000|
|IT               ...|Joe              ...| 70000|
|Sales            ...|Henry            ...| 80000|
|Sales            ...|Sam              ...| 60000|
+--------------------+--------------------+------+

Query data by Spark.

1) Read table to a dataframe.

val employees_table = spark.read.jdbc(jdbc_url, "employee", connectionProperties).cache()

2) Register it.

employees_table.createGlobalTempView("employee")

3) Run query using window function.

spark.sql("""
    select department, name, salary
    from (
         select department, name, salary,dense_rank() over(partition by department order by salary desc) salary_rank
         from global_temp.employee
         )  t
    where salary_rank <= 3
    order by department, salary desc
  """)

Query by Postgre.

1) Define the query.

var query_str = """
    (select e.department, name, e.salary
    from employee e
    where e.salary in 
          (
            select distinct salary as salary_d
            from employee 
            where department=e.department
            order by salary_d desc
            limit 3
          ) 
    order by e.department, e.salary desc) as e_q
    """

2) Send query to Postgre server and return a spark dataframe.

spark.read.jdbc(jdbc_url,query_str , connectionProperties)

"please note that the query above (query_str) will not work in spark sql in Spark 2.1.1 because it does not allow subqueries to acces out-layer varibles."

About

No description, website, or topics provided.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages