This example utilizes the public Airline On-Time Statistics and Delay Cause data set. This data set records flights by date, airline, originating and destination airports, and many other flight details.
In this example, you:
- Follow Greenplum Database tutorials to load the flight record data set into Greenplum Database.
- Use
spark-shell
and the VMware Greenplum Connector for Apache Spark to read a fact table from Greenplum Database into Spark. - Perform transformations and actions on the data within Spark.
- Write transformed Spark data into a new Greenplum Database table.
Prerequisites
Before starting this exercise, ensure that you are able to:
- Access your Greenplum Database and Spark clusters
- Identify your Greenplum Database master node hostname or IP address and port
- Identify your Greenplum Database user/role name and password
- Identify the absolute path to the Connector JAR file on your system
Procedure 1: Read from Greenplum Database
Perform the following procedure to load flight record data into Greenplum Database, read this data into Spark, and use Spark to transform and view the table data.
Greenplum Database Operations
Log in to your Greenplum Database master node and set up your environment. For example:
$ ssh gpadmin@<gpmaster> gpadmin@gpmaster$ . /usr/local/greenplum-db/greenplum_path.sh
Download and unpack the flight record example data set:
gpadmin@gpmaster$ git clone https://github.com/greenplum-db/gpdb-sandbox-tutorials.git gpadmin@gpmaster$ cd gpdb-sandbox-tutorials gpadmin@gpmaster$ tar zxf faa.tar.gz
Perform the following exercises in the Greenplum Database tutorial in the specified order:
When you complete Step 3, you will have created:
- A user named
user2
. - A Greenplum database named
tutorial
. - A schema named
faa
. - Several Greenplum Database tables in the
faa
schema and loaded them with flight data.
- A user named
Identify your Greenplum Database user/role name and password:
If you are performing this exercise in the Greenplum Sandbox VM, you will use the Connector as
user2
(passwordpivotal
). If you are performing this exercise in your own Greenplum Database instance, you may have a different user name.Assign required privileges to the Greenplum Database user to access the
tutorial
database,faa
schema, and relevant tables for the read operation. For example, if your Greenplum Database user name isuser2
, the Greenplum Database administrator would run the commands:gpadmin@gpmaster$ psql -d tutorial
tutorial=# GRANT USAGE, CREATE ON SCHEMA faa TO user2; tutorial=# GRANT SELECT ON faa.otp_c TO user2; tutorial=# ALTER USER user2 CREATEEXTTABLE(type = 'writable', protocol = 'gpfdist'); tutorial=# \q
Verify that the flight data loaded correctly:
Connect to the
tutorial
database as useruser2
. Recall that the password foruser2
ispivotal
:gpadmin@gpmaster$ psql -d tutorial -U user2 Password for user user2:
List all of the tables in the
tutorial
database schema namedfaa
:tutorial=> \dt faa.* List of relations Schema | Name | Type | Owner | Storage --------+----------------------+-------+---------+---------------------- faa | d_airlines | table | user1 | heap faa | d_airports | table | user1 | heap faa | d_cancellation_codes | table | user1 | heap faa | d_delay_groups | table | user1 | heap faa | d_distance_groups | table | user1 | heap faa | d_wac | table | user1 | heap faa | faa_load_errors | table | user1 | heap faa | faa_otp_load | table | gpadmin | heap faa | otp_c | table | gpadmin | append only columnar faa | otp_c_1_prt_mth_1 | table | gpadmin | append only columnar faa | otp_c_1_prt_mth_10 | table | gpadmin | append only columnar faa | otp_c_1_prt_mth_11 | table | gpadmin | append only columnar ... (27 rows)
The
\dt
output lists 27 tables when the flight data was correctly loaded.Examine the definition of the table named
otp_c
:tutorial=> \d faa.otp_c Append-Only Columnar Table "faa.otp_c" Column | Type | Modifiers ----------------------+------------------+----------- flt_year | smallint | flt_quarter | smallint | flt_month | smallint | flt_dayofmonth | smallint | flt_dayofweek | smallint | flightdate | date | uniquecarrier | text | airlineid | integer | carrier | text | flightnum | text | origin | text | origincityname | text | originstate | text | originstatename | text | dest | text | destcityname | text | deststate | text | deststatename | text | crsdeptime | text | deptime | integer | depdelay | double precision | depdelayminutes | double precision | departuredelaygroups | smallint | taxiout | smallint | wheelsoff | text | wheelson | text | taxiin | smallint | crsarrtime | text | arrtime | text | arrdelay | double precision | arrdelayminutes | double precision | arrivaldelaygroups | smallint | cancelled | smallint | cancellationcode | text | diverted | smallint | crselapsedtime | integer | actualelapsedtime | double precision | airtime | double precision | flights | smallint | distance | double precision | distancegroup | smallint | carrierdelay | smallint | weatherdelay | smallint | nasdelay | smallint | securitydelay | smallint | lateaircraftdelay | smallint | Checksum: t Number of child tables: 17 (Use \d+ to list them.) Distributed by: (uniquecarrier, flightnum) Partition by: (flightdate)
The table named
otp_c
is a column-oriented, partitioned fact table.
Spark Operations
Open a terminal window and log in to your Spark client node:
$ ssh user@<spark-client> user@spark-client$
Construct the JDBC connection string URL to access Greenplum Database. For example, this JDBC connection string accesses a database named
tutorial
using the Greenplum Database master hostgpmaster.domain
at the default connection port:jdbc:postgresql://gpmaster.domain/tutorial
Save this URL string, you will use it in an upcoming step.
Start
spark-shell
. Replace<gsc-jar>
with the full path to your Connector JAR file:user@spark-client$ spark-shell --jars <gsc-jar> < ... spark-shell startup output messages ... > scala>
You enter the
spark-shell
interactivescala
shell.Prepare to read the
otp_c
table into Spark. In a text editor, construct aMap
of read options for thegreenplum
data source. You want to load the Greenplum Database table namedotp_c
in the schema namedfaa
, specifyingairlineid
as the partition column. For example, if you are the useruser2
with passwordpivotal
:val gscOptionMap = Map( "url" -> "jdbc:postgresql://gpmaster.domain/tutorial", "user" -> "user2", "password" -> "pivotal", "dbschema" -> "faa", "dbtable" -> "otp_c", "partitionColumn" -> "airlineid" )
Copy/paste the options map setting to your
scala>
shell terminal window. You must first enter:paste
to switch toscala>
paste mode:scala> :paste // Entering paste mode (ctrl-D to finish) val gscOptionMap = Map( "url" -> "jdbc:postgresql://gpmaster.domain/tutorial", "user" -> "user2", "password" -> "pivotal", "dbschema" -> "faa", "dbtable" -> "otp_c", "partitionColumn" -> "airlineid" )
Exit
scala>
paste mode by entering control-D:control-D // Exiting paste mode, now interpreting. gscOptionMap: scala.collection.immutable.Map[String,String] = Map(url -> jdbc:postgresql://gpmaster.domain/tutorial, partitionColumn -> airlineid, dbschema -> faa, dbtable -> otp_c, user -> user2, password -> pivotal)
scala
displays the option map that you just constructed.Load the data from the Greenplum Database table
otp_c
into a SparkDataFrame
. Enter paste mode and copy/paste the command:scala> :paste // Entering paste mode (ctrl-D to finish) val gpdf = spark.read.format("greenplum") .options(gscOptionMap) .load() control-D // Exiting paste mode, now interpreting. gpdf: org.apache.spark.sql.DataFrame = [flt_year: smallint, flt_quarter: smallint ... 44 more fields]
The Greenplum Database table is not actually loaded until you perform an action on the returned
DataFrame
.Print the schema of the Greenplum Database table:
scala> gpdf.printSchema() root |-- flt_year: short (nullable = true) |-- flt_quarter: short (nullable = true) |-- flt_month: short (nullable = true) |-- flt_dayofmonth: short (nullable = true) |-- flt_dayofweek: short (nullable = true) |-- flightdate: date (nullable = true) |-- uniquecarrier: string (nullable = true) |-- airlineid: integer (nullable = true) |-- carrier: string (nullable = true) |-- flightnum: string (nullable = true) |-- origin: string (nullable = true) |-- origincityname: string (nullable = true) |-- originstate: string (nullable = true) |-- originstatename: string (nullable = true) |-- dest: string (nullable = true) |-- destcityname: string (nullable = true) |-- deststate: string (nullable = true) |-- deststatename: string (nullable = true) |-- crsdeptime: string (nullable = true) |-- deptime: integer (nullable = true) |-- depdelay: double (nullable = true) |-- depdelayminutes: double (nullable = true) |-- departuredelaygroups: short (nullable = true) |-- taxiout: short (nullable = true) |-- wheelsoff: string (nullable = true) |-- wheelson: string (nullable = true) |-- taxiin: short (nullable = true) |-- crsarrtime: string (nullable = true) |-- arrtime: string (nullable = true) |-- arrdelay: double (nullable = true) |-- arrdelayminutes: double (nullable = true) |-- arrivaldelaygroups: short (nullable = true) |-- cancelled: short (nullable = true) |-- cancellationcode: string (nullable = true) |-- diverted: short (nullable = true) |-- crselapsedtime: integer (nullable = true) |-- actualelapsedtime: double (nullable = true) |-- airtime: double (nullable = true) |-- flights: short (nullable = true) |-- distance: double (nullable = true) |-- distancegroup: short (nullable = true) |-- carrierdelay: short (nullable = true) |-- weatherdelay: short (nullable = true) |-- nasdelay: short (nullable = true) |-- securitydelay: short (nullable = true) |-- lateaircraftdelay: short (nullable = true)
Compare this Spark output with that of the Greenplum Database
\d faa.otp_c
command that you invoked earlier. Note that the Greenplum Database data type names differ from those of Spark. For example, thedistancegroup
column is of Greenplum Database typesmallint
, while the Spark data type isshort
. For detailed information about how the Connector maps data types from Greenplum Database to Spark, refer to the Greenplum Database to Spark Data Type Mapping documentation.Use the
.count()
method to count the number of rows loaded:scala> gpdf.count() res2: Long = 1024552
Use the
.select()
and.filter()
methods to show the origin city, month, and carrier of all flights cancelled in the month of December. Order the results by airline ID and origin city. Enter paste mode and copy/paste the command:scala> :paste // Entering paste mode (ctrl-D to finish) // Cast the filter constants to smallint to enable predicate pushdown. // This is required because the Greenplum table cancelled and flt_month // columns were created with type smallint. gpdf.select("origincityname", "flt_month", "airlineid", "carrier") .filter("cancelled = CAST(1 as SMALLINT)") .filter("flt_month = CAST(12 as SMALLINT)") .orderBy("airlineid", "origincityname") .show() control-D // Exiting paste mode, now interpreting. +---------------+---------+---------+-------+ | origincityname|flt_month|airlineid|carrier| +---------------+---------+---------+-------+ | Detroit, MI| 12| 19386| NW| | Detroit, MI| 12| 19386| NW| | Milwaukee, WI| 12| 19386| NW| |Minneapolis, MN| 12| 19386| NW| | Phoenix, AZ| 12| 19386| NW| | Houston, TX| 12| 19393| WN| | Houston, TX| 12| 19393| WN| | Las Vegas, NV| 12| 19393| WN| | Las Vegas, NV| 12| 19393| WN| | Manchester, NH| 12| 19393| WN| | Omaha, NE| 12| 19393| WN| | Phoenix, AZ| 12| 19393| WN| | San Jose, CA| 12| 19393| WN| | Tampa, FL| 12| 19393| WN| | Washington, DC| 12| 19393| WN| | Anchorage, AK| 12| 19704| CO| | Anchorage, AK| 12| 19704| CO| | Austin, TX| 12| 19704| CO| | Houston, TX| 12| 19704| CO| | Houston, TX| 12| 19704| CO| +--------------------+---------+---------+-------+ only showing top 20 rows
Use the
.groupBy()
,.agg()
, andavg()
methods to identify the average departure delay for each day of the week, sorting by the day of the week with the.sort()
method:scala> gpdf.groupBy("flt_dayofweek").agg(avg("depdelayminutes")).sort("flt_dayofweek").show() +-------------+--------------------+ |flt_dayofweek|avg(depdelayminutes)| +-------------+--------------------+ | 1| 14.738491569779914| | 2| 11.237272024020244| | 3| 11.198198256252295| | 4| 12.056892575385985| | 5| 12.455024249521957| | 6| 12.69586361271813| | 7| 14.818271192603715| +-------------+--------------------+
Use the
startsWith()
method to display the cancelled flights for the month of December for all origin cities whose name starts with the lettersMi
. Enter paste mode and copy/paste the command:scala> :paste // Entering paste mode (ctrl-D to finish) gpdf.select("origincityname", "destcityname", "flightnum", "carrier", "airlineid", "flt_month") .filter("cancelled = CAST(1 as SMALLINT)") .filter("flt_month = CAST(12 as SMALLINT)") .filter($"origincityname".startsWith("Mi")) .orderBy("origincityname", "destcityname") .show() control-D // Exiting paste mode, now interpreting. +--------------------+--------------------+---------+-------+---------+---------+ | origincityname| destcityname|flightnum|carrier|airlineid|flt_month| +--------------------+--------------------+---------+-------+---------+---------+ | Miami, FL| Chicago, IL| 846| AA| 19805| 12| | Miami, FL|Greensboro/High P...| 4197| MQ| 20398| 12| | Miami, FL| Washington, DC| 1068| AA| 19805| 12| | Milwaukee, WI| Baltimore, MD| 817| FL| 20437| 12| | Milwaukee, WI| Denver, CO| 5838| OO| 20304| 12| | Milwaukee, WI| Memphis, TN| 3799| 9E| 20363| 12| | Milwaukee, WI| Minneapolis, MN| 7177| NW| 19386| 12| | Milwaukee, WI| Newark, NJ| 2504| OO| 20304| 12| | Minneapolis, MN| Atlanta, GA| 1073| DL| 19790| 12| | Minneapolis, MN| Grand Forks, ND| 4003| 9E| 20363| 12| | Minneapolis, MN| Houston, TX| 2399| XE| 20374| 12| | Minneapolis, MN| Lincoln, NE| 3798| 9E| 20363| 12| | Minneapolis, MN| Miami, FL| 925| AA| 19805| 12| | Minneapolis, MN| Milwaukee, WI| 7165| NW| 19386| 12| | Minneapolis, MN| Rapid City, SD| 3931| 9E| 20363| 12| |Mission/Mcallen/E...| Memphis, TN| 4066| 9E| 20363| 12| | Missoula, MT| Salt Lake City, UT| 4461| OO| 20304| 12| +--------------------+--------------------+---------+-------+---------+---------+
Notice the
$"origincityname"
notation used in the third filter.$"<colname>"
is an implicit method call shortcut fornew ColumnName("<colname>")
.Exit the
spark-shell
:scala> :q
Procedure 2: Write from Spark to Greenplum Database
Perform the following procedure to write Spark data that you transformed in the previous procedure into a new Greenplum Database table.
: This procedure assumes that you have completed Procedure 1 of this example and have retained the example runtime environment.
Greenplum Database Operations
Locate your Greenplum Database terminal window.
Assign the Greenplum privileges required to write to a Greenplum Database table. For example, if your Greenplum Database user name is
user2
, the Greenplum Database administrator would run the commands:gpadmin@gpmaster$ psql -d tutorial
tutorial=# ALTER USER user2 CREATEEXTTABLE(type = 'readable', protocol = 'gpfdist');
You will return to the Greenplum terminal window at the end of this procedure.
Spark Operations
Identify the average departure delay for each day of the week with the statement you specified in Procedure 1, this time saving, rather than displaying, the
DataFrame
. Assign the data to a variable nameddelaydf
:scala> val delaydf = gpdf.groupBy("flt_dayofweek").agg(avg("depdelayminutes")).sort("flt_dayofweek") delaydf: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [flt_dayofweek: smallint, avg(depdelayminutes): double]
Construct an option map to save
delaydf
to a Greenplum Database table namedavgdelay
. Enter paste mode and copy/paste the command:scala> :paste // Entering paste mode (ctrl-D to finish) val gscWriteMap = Map( "url" -> "jdbc:postgresql://gpmaster.domain/tutorial", "user" -> "user2", "password" -> "pivotal", "dbschema" -> "faa", "dbtable" -> "avgdelay" ) control-D // Exiting paste mode, now interpreting. gscWriteMap: scala.collection.immutable.Map[String,String] = Map(url -> jdbc:postgresql://gpmaster.domain/tutorial, dbschema -> faa, dbtable -> avgdelay, user -> user2, password -> pivotal)
Write
delaydf
to the Greenplum Database table namedavgdelay
. If the table does not exist, the Connector will create the table for you before loading the data. The defaultSaveMode
isErrorIfExists
; the Connector returns an error if the table exists.scala> import org.apache.spark.sql.SaveMode scala> delaydf.write.format("greenplum").options(gscWriteMap).save()
Exit the
spark-shell
:scala> :q
Greenplum Database Operations
Run
psql
and connect to Greenplum Database as useruser2
:gpadmin@gpmaster$ psql -d tutorial -U user2
Examine the schema of Greenplum Database table
avgdelay
:tutorial=> \d+ faa.avgdelay Table "faa.avgdelay" Column | Type | Modifiers | Storage | Description ----------------------+------------------+-----------+---------+------------- flt_dayofweek | smallint | | plain | avg(depdelayminutes) | double precision | | plain | Has OIDs: no Distributed by: (flt_dayofweek)
Examine the table contents:
tutorial=> SELECT * FROM faa.avgdelay ORDER BY flt_dayofweek; flt_dayofweek | avg(depdelayminutes) ---------------+---------------------- 1 | 14.7384915697799 2 | 11.2372720240202 3 | 11.1981982562523 4 | 12.056892575386 5 | 12.455024249522 6 | 12.6958636127181 7 | 14.8182711926037 (7 rows)
The table contents are slightly different than that displayed for the
DataFrame
inspark-shell
. The Greenplum Database double precision data type holds 15 digits, while Spark utilizes 17 digits.
Content feedback and comments