ETL Pipeline to Transform, Store and Explore Healthcare Dataset With Spark SQL, JSON and MapR-DB

Contributed by Carol McDonald

This post is based on a recent workshop I helped develop and deliver at a large health services and innovation company’s analytics conference. This company is doing a lot of interesting analytics and machine learning on top of the MapR Converged Data Platform, including an internal “Data Science University”. In this post we will:

  • Extract Medicare Open payments data from a CSV file and load into an Apache Spark Dataset.
  • Analyze the data with Spark SQL.
  • Transform the data into JSON format and save to the MapR-DB document database.
  • Query and Load the JSON data from MapR-DB back into Spark.

CSV Spark SQL and MapR-DB

A large Health payment dataset, JSON, Apache Spark, and MapR-DB are an interesting combination for a health analytics workshop because:

  • JSON is an open-standard and efficient format that uses human-readable text to represent, transmit, and interpret data objects consisting of attribute–value pairs. Because JSON is easy for computer languages to manipulate, JSON has supplanted XML for web and mobile applications.
  • Newer standards for exchanging healthcare information such as FHIR are easier to implement because they use a modern web-based suite of API technology, including REST and JSON.
  • Apache Spark SQL, Dataframes, and Datasets make it easy to load, process, transform, and analyze JSON data.
  • MapR-DB, a high performance NoSQL database, supports JSON documents as a native data store. MapR-DB makes it easy to store, query and build applications with JSON documents.

Apache Spark and MapR-DB

Apache Spark and MapR-DB

One of the challenges when you are processing lots of data is where do you want to store it? With MapR-DB (HBase API or JSON API), a table is automatically partitioned into tablets across a cluster by key range, providing for scalable and fast reads and writes by row key.

Fast Reads and Writes by Key

The MapR-DB OJAI Connector for Apache Spark makes it easier to build real-time or batch pipelines between your JSON data and MapR-DB and leverage Spark within the pipeline. Included are a set of APIs that that enable MapR users to write applications that consume MapR-DB JSON tables and use them in Spark.

Spark MapR-DB connector

The Spark MapR-DB Connector leverages the Spark DataSource API. The connector architecture has a connection object in every Spark Executor, allowing for distributed parallel writes, reads, or scans with MapR-DB tablets.

Connection in every Spark Executor

Example Use Case Data Set

Since 2013, Open Payments is a federal program that collects information about the payments drug and device companies make to physicians and teaching hospitals for things like travel, research, gifts, speaking fees, and meals.

The Facts About Open Payments Data

Below is an example of one line from an open payments csv file:

"NEW","Covered Recipient Physician",,,,"132655","GREGG","D","ALZATE",,"8745 AERO DRIVE","STE 200","SAN DIEGO","CA","92123","United States",,,"Medical Doctor","Allopathic & Osteopathic Physicians|Radiology|Diagnostic Radiology","CA",,,,,"DFINE, Inc","100000000326","DFINE, Inc","CA","United States",90.87,"02/12/2016","1","In-kind items and services","Food and Beverage",,,,"No","No Third Party Payment",,,,,"No","346039438","No","Yes","Covered","Device","Radiology","StabiliT",,"Covered","Device","Radiology","STAR Tumor Ablation System",,,,,,,,,,,,,,,,,"2016","06/30/2017"

There are a lot of fields in this file that we will not use; we will select the following fields:

CSV Fields

And transform them into the following JSON object:

{  
   "_id":"317150_08/26/2016_346122858",
   "physician_id":"317150",
   "date_payment":"08/26/2016",
   "record_id":"346122858",
   "payer":"Mission Pharmacal Company",
   "amount":9.23,
   "Physician_Specialty":"Obstetrics & Gynecology",
   "Nature_of_payment":"Food and Beverage"
}

Apache Spark SQL, Datasets, and Dataframes

A Spark Dataset is a distributed collection of data. Dataset is a newer interface, which provides the benefits of the older RDD interface (strong typing, ability to use powerful lambda functions) combined with the benefits of Spark SQL’s optimized execution engine. Datasets also provide faster performance than RDDs with more efficient object serialization/deserialization.

DataFrame

A DataFrame is a Dataset organized into named columns Dataset[Row]. (In Spark 2.0, the DataFrame APIs merged with Datasets APIs.)

Unified Apache Spark 2.0 API

Read the Data from a CSV File into a Dataframe

In the following code:

  1. The SparkSession read method loads a CSV file and returns the result as a Dataframe.
  2. A User defined method is used to convert the amount column from a string to a double.
  3. A local temporary view is created in order to easily use SQL.

Read the data from CSV file into a Dataframe

One row of the Dataframe is shown below:

One row from DataFrame

Transform into a Dataset of Payment objects

Next we want to want select only the fields that we are interested in and transform them into a Dataset of payment objects. First we define the payment object schema with a scala case class:

Define the Payment Schema

Next we use Spark SQL to select the fields we want from the Dataframe and convert this to a Dataset[Payment] by providing the Payment class. Then we replace the payments view.

Create a Dataset of Payment classes

One row of the Dataset[Payment] is shown below:

One row of the Dataset\[Payment\]

Explore and query the Open Payment data with Spark Dataset

Datasets provide a domain-specific language for structured data manipulation in Scala, Java, and Python; below are some examples. The Dataset show() action displays the top 20 rows in a tabular form.

Domain-specific language

Datasets printSchema() Prints the schema to the console in a tree format

printSchema() prints to console in tree format

Here are some example queries using the Scala Dataset API on the payments Dataset:
What are the Nature of Payments with reimbursement amounts greater than $1000 ordered by count?

What are the Nature of Payments with payments > $1000 with count

What are the top 5 nature of payments by count?

What are the Top 5 Nature of Payments by count

You can register a Dataset as a temporary table using a given name, and then run Spark SQL. With the Zeppelin notebook you can display query results in table or chart formats. Here are some example Spark SQL queries on the payments dataset:

What are the top 10 nature of payments by count?

What are the top 10 nature of payments by count?

What are the top 10 nature of payments by total amount?

What are the top 10 nature of payments by total amount?

What are the top 5 physician specialties by total amount ?

What are the top 5 physician specialties by total amount?

Here is the same query with the result displayed in a pie chart:

What are the Top 5 Physicians by total amount? (Chart)

Saving JSON Documents in a MapR-DB JSON Table

In order to save the JSON objects to MapR-DB the first thing we need to do is define the_id field, which is the row key and primary index for MapR-DB. In the function below we create an object with the id equal to a combination of the physician id, the date, and the record id. This way the payments will be grouped by physician and date. Next we use a map operation with the createPaymentwId function to convert the Dataset[Payment] to a Dataset[PaymentwId] , then we convert this to an RDD of JSON documents. (Note with MapR-DB v6 the Spark connector will support Datasets).

Transform Dataset into RDD of JSON documents

One row of the RDD of JSON documents is shown below:

One row of the RDD of JSON documents

In the code below we save the RDD of JSON objects into a MapR-DB JSON table:

Save JSON RDD to MapR-DB

Note that in this example, the table was already created. To create a table using the shell, execute the following at the linux command line:

mapr dbshell

After starting the shell, run the create command. See mapr dbshell.

Loading Data from a MapR-DB JSON Table

The code below loads the documents from the “/user/user01/testable” table into an RDD and prints out 2 rows:

Load the Payments from MapR-DB

Projection Pushdown and Predicate Pushdown for the Load API

The “load” API of the connector also supports “select” and “where” clauses. These can be used for projection pushdown of subsets of fields and/or can filter out documents by using a condition.
Here is an example on how to use the “where” clause to restrict the rows:

Load the Payments for a physician from MapR-DB

nature_of_payment and payer fields

Similarly, if one wants to project only the nature_of_payment and payer fields, and to use the “where” clause to restrict the rows by amount, the following code will generate the required output:

Load the Payment where the amount is greater than 100 from MapR-DB

CODE

Summary

In this blog post, you’ve learned how to ETL Open Payments CSV file data to JSON, explore with SQL, and store in a document data base using Spark Datasets and MapR-DB.

WANT TO LEARN MORE?


This blog post was published October 25, 2017.
Categories

50,000+ of the smartest have already joined!

Stay ahead of the bleeding edge...get the best of Big Data in your inbox.


Get our latest posts in your inbox

Subscribe Now