Using Apache Spark DataFrames for Processing of Tabular Data

Contributed by

13 min read

This post will help you get started using Apache Spark DataFrames with Scala on the MapR Sandbox. The new Spark DataFrames API is designed to make big data processing on tabular data easier.

What is a Spark DataFrame?

A Spark DataFrame is a distributed collection of data organized into named columns that provides operations to filter, group, or compute aggregates, and can be used with Spark SQL. DataFrames can be constructed from structured data files, existing RDDs, tables in Hive, or external databases.

In this post, you’ll learn how to:

  • Load data into Spark DataFrames
  • Explore data with Spark SQL

This post assumes a basic understanding of Spark concepts. If you have not already read the tutorial on Getting Started with Spark on MapR Sandbox, it would be good to read that first.


This tutorial will run on the MapR v5.0 Sandbox, which includes Spark 1.3

The sample data sets

We will use two example datasets - one from eBay online auctions and one from the SFPD Crime Incident Reporting system.

The eBay online auction dataset has the following data fields:

auctionid - unique identifier of an auction
bid - the proxy bid placed by a bidder
bidtime - the time (in days) that the bid was placed, from the start of the auction
bidder - eBay username of the bidder
bidderrate - eBay feedback rating of the bidder
openbid - the opening bid set by the seller
price - the closing price that the item sold for (equivalent to the second highest bid + an increment)

The table below shows the data fields with some sample data:

Using Spark DataFrames we will explore the data with questions like:

  • How many auctions were held?
  • How many bids were made per item?
  • What's the minimum, maximum, and average number of bids per item?
  • Show the bids with price > 100

The table below shows the SFPD data fields with some sample data:

Using Spark DataFrames, we will explore the SFPD data with questions like:

  • What are the top 10 Resolutions?
  • How many Categories are there?
  • What are the top 10 incident Categories?

Loading data into Spark DataFrames

Log into the MapR Sandbox, as explained in Getting Started with Spark on MapR Sandbox, using userid user01, password mapr. Copy the sample data files to your sandbox home directory /user/user01 using scp. Start the spark shell with:
$ spark-shell

First, we will import some packages and instantiate a sqlContext, which is the entry point for working with structured data (rows and columns) in Spark and allows the creation of DataFrame objects.
(In the code boxes, comments are in Green and output is in Blue)

//  SQLContext entry point for working with structured data
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// this is used to implicitly convert an RDD to a DataFrame.
import sqlContext.implicits._
// Import Spark SQL data types and Row.
import org.apache.spark.sql._

Below we load the data from the ebay.csv file into a Resilient Distributed Dataset (RDD). RDDs can have transformations and actions; the first() action returns the first element in the RDD, which is the String “8213034705,95,2.927373,jake7870,0,95,117.5,xbox,3”

// load the data into a  new RDD
val ebayText = sc.textFile("ebay.csv")

// Return the first element in this RDD
// res6: String = 8213034705,95,2.927373,jake7870,0,95,117.5,xbox,3

Below we use a Scala case class to define the Auction schema corresponding to the ebay.csv file. Then map() transformations are applied to each element of ebayText to create the ebay RDD of Auction objects.

//define the schema using a case class
case class Auction(auctionid: String, bid: Float, bidtime: Float, bidder: String, bidderrate: Integer, openbid: Float, price: Float, item: String, daystolive: Integer)

// create an RDD of Auction objects
val ebay =",")).map(p => Auction(p(0),p(1).toFloat,p(2).toFloat,p(3),p(4).toInt,p(5).toFloat,p(6).toFloat,p(7),p(8).toInt ))

The ebay RDD first() action returns the first element in the RDD, Auction = Auction( 8213034705, 95.0, 2.927373, jake7870, 0, 95.0, 117.5, xbox,3).

// Return the first element in this RDD
//res7: Auction = Auction(8213034705,95.0,2.927373,jake7870,0,95.0,117.5,xbox,3)
// Return the number of elements in the RDD
res8: Long = 10654

A DataFrame is a distributed collection of data organized into named columns. Spark SQL supports automatically converting an RDD containing case classes to a DataFrame with the method toDF():

// change ebay RDD of Auction objects to a DataFrame
val auction = ebay.toDF()

The previous RDD transformations can also be written on one line like this:

val auction = sc.textFile("ebay.csv").map(_.split(",")).map(p =>
Auction(p(0),p(1).toFloat,p(2).toFloat,p(3),p(4).toInt,p(5).toFloat,p(6).toFloat,p(7),p(8).toInt )).toDF()

Explore and query the eBay auction data with Spark DataFrames

DataFrames provide a domain-specific language for structured data manipulation in Scala, Java, and Python; below are some examples with the auction DataFrame. The DataFrame show() action displays the top 20 rows in a tabular form.

// Display the top 20 rows of DataFrame
// auctionid  bid   bidtime  bidder         bidderrate openbid price item daystolive
// 8213034705 95.0  2.927373 jake7870       0          95.0    117.5 xbox 3
// 8213034705 115.0 2.943484 davidbresler2  1          95.0    117.5 xbox 3 …

DataFrame printSchema() Prints the schema to the console in a tree format

// Return the schema of this DataFrame
 |-- auctionid: string (nullable = true)
 |-- bid: float (nullable = false)
 |-- bidtime: float (nullable = false)
 |-- bidder: string (nullable = true)
 |-- bidderrate: integer (nullable = true)
 |-- openbid: float (nullable = false)
 |-- price: float (nullable = false)
 |-- item: string (nullable = true)
 |-- daystolive: integer (nullable = true)

After a dataframe is instantiated, you can query it using SQL queries. Here are some example queries using the Scala DataFrame API:

// How many auctions were held?"auctionid").distinct.count
// Long = 627

// How many bids per item?
auction.groupBy("auctionid", "item")
auctionid  item    count
3016429446 palm    10
8211851222 xbox    28
3014480955 palm    12
8214279576 xbox    4
3014844871 palm    18
3014012355 palm    35
1641457876 cartier 2
. . .
// What's the min number of bids per item? what's the average? what's the max?
auction.groupBy("item", "auctionid").count.agg(min("count"), avg("count"),max("count")).show

// MIN(count) AVG(count)        MAX(count)
// 1  16.992025518341308 75

// Get the auctions with closing price > 100
val highprice= auction.filter("price > 100")
// highprice: org.apache.spark.sql.DataFrame = [auctionid: string, bid: float, bidtime: float, bidder: // string, bidderrate: int, openbid: float, price: float, item: string, daystolive: int]

// display dataframe in a tabular format
// auctionid  bid   bidtime  bidder         bidderrate openbid price item daystolive
// 8213034705 95.0  2.927373 jake7870       0          95.0    117.5 xbox 3        
// 8213034705 115.0 2.943484 davidbresler2  1          95.0    117.5 xbox 3

You can register a DataFrame as a temporary table using a given name, and then run SQL statements using the sql methods provided by sqlContext. Here are some example queries using sqlContext:

// register the DataFrame as a temp table
// SQL statements can be run
// How many  bids per auction?
val results =sqlContext.sql("SELECT auctionid, item,  count(bid) FROM auction GROUP BY auctionid, item")
// display dataframe in a tabular format
// auctionid  item    count
// 3016429446 palm    10
// 8211851222 xbox    28\. . .

val results =sqlContext.sql("SELECT auctionid, MAX(price) FROM auction  GROUP BY item,auctionid")
// auctionid  c1
// 3019326300 207.5
// 8213060420 120.0 . . .

Loading the SFPD data into Spark dataframes using a csv parsing library

Now we will load the SFPD dataset into a Spark dataframe using the spark-csv parsing library from Databricks. You can use this library at the Spark shell by specifying --packages com.databricks:spark-csv_2.10:1.0.3 when starting the shell as shown below:

$ spark-shell --packages com.databricks:spark-csv_2.10:1.0.3

The load operation will parse the sfpd.csv file and return a dataframe using the first header line of the file for column names.

import sqlContext.implicits._
import org.apache.spark.sql._

//  Return the dataset specified by data source as a DataFrame, use the header for column names
val df = sqlContext.load("com.databricks.spark.csv", Map("path" -> "sfpd.csv", "header" -> "true"))

The take operation returns the specified number of rows in the DataFame.

// Return the first n rows in the DataFrame

// res4: Array[org.apache.spark.sql.Row] = Array([150467944,LARCENY/THEFT,GRAND THEFT FROM LOCKED AUTO,Thursday,05/28/2015,23:59,TENDERLOIN,NONE,TAYLOR ST / OFARRELL ST,-122.411328369311,37.7859963050476,(37.7859963050476, -122.411328369311),15046794406244])

// Print the schema to the console in a tree format
 |-- IncidntNum: string (nullable = true)
 |-- Category: string (nullable = true)
 |-- Descript: string (nullable = true)
 |-- DayOfWeek: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- Time: string (nullable = true)
 |-- PdDistrict: string (nullable = true)
 |-- Resolution: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- X: string (nullable = true)
 |-- Y: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- PdId: string (nullable = true)

// display dataframe in a tabular format
IncidntNum Category Descript DayOfWeek Date Time PdDistrict Resolution Address X Y Location PdId
150467944  LARCENY/THEFT GRAND THEFT FROM ... Thursday  05/28/2015 23:59 TENDERLOIN NONE           TAYLOR ST / OFARR... -122.411328369311 37.7859963050476 (37.7859963050476... 15046794406244

Here are some example queries using sqlContext:

// how many categories are there?"Category").distinct.count
// res5: Long = 39
// register as a temp table inorder to use sql

// How many categories are there
sqlContext.sql("SELECT distinct Category FROM sfpd").collect().foreach(println)

// [TREA] . . .
// What are the top 10 Resolutions ?
sqlContext.sql("SELECT Resolution , count(Resolution) as rescount FROM sfpd group by Resolution order by rescount desc limit 10").collect().foreach(println)
// [NONE,1063775]
// [ARREST, BOOKED,414219]
// [ARREST, CITED,154033] . . .
// What are the top 10 most incident Categories?
val t =  sqlContext.sql("SELECT Category , count(Category) as catcount FROM sfpd group by Category order by catcount desc limit 10")
// Category       catcount
// LARCENY/THEFT  353793
// NON-CRIMINAL   186272\. . .

// The results of SQL queries are DataFrames and support RDD operations.
// The columns of a row in the result can be accessed by ordinal => "column 0: " + t(0)).collect().foreach(println)
// column 0: LARCENY/THEFT
// column 0: OTHER OFFENSES
// column 0: NON-CRIMINAL
// column 0: ASSAULT …

The physical plan for DataFrames

The Catalyst query optimizer creates the physical Execution Plan for DataFrames as shown in the diagram below:

Print the physical plan to the console

DataFrames are designed to take the SQL queries constructed against them and optimize the execution as sequences of Spark Jobs as required. You can print the physical plan for a DataFrame using the explain operation as shown below:

//  Prints the physical plan to the console for debugging purpose"auctionid").distinct.**explain**()

// == Physical Plan ==
// Distinct false
// Exchange (HashPartitioning [auctionid#0], 200)
//  Distinct true
//   Project [auctionid#0]
 //   PhysicalRDD   //[auctionid#0,bid#1,bidtime#2,bidder#3,bidderrate#4,openbid#5,price#6,item#7,daystolive#8], MapPartitionsRDD[11] at mapPartitions at ExistingRDD.scala:37


In this blog post, you’ve learned how to load data into Spark DataFrames, and explore data with Spark SQL. If you have any further questions, or want to share how you are using Spark DataFrames, please add your comments in the section below.

Want to learn more?

This blog post was published June 24, 2015.