Getting Started with Spark

This section is a step-by-step guide to loading a dataset, applying a schema, writing simple queries, and querying data with Spark.

The following instructions are based on the assumption that your MapR cluster is up and running and installed with Spark (either standalone or YARN). If not, refer to the Spark configure.sh section. Or, you could download and install the MapR Sandbox for Hadoop.

Note: Take the ESS 360 - Apache Spark Essentials course for a basic understanding of Spark.

Discover Spark with Spark Interactive Shell

Read Data from MapR-FS

  1. Copy sample data into MapR-FS:

    - For this example, the dataset constitutes a CSV file of a list of auctions.

    - Download the file from GitHub: https://github.com/mapr-demos/getting-started-spark-on-mapr/tree/master/data.

    - Copy the file into your cluster, in the /apps/ directory, using the cp/scp or hadoop put command:
    scp ./data/auctiondata.csv mapr@[mapr-cluster-node]:/mapr/[cluster-name]/apps/
    or
    $ hadoop fs -put ./data/auctiondata.csv /apps
    - This dataset is from eBay online auctions. The dataset contains the following fields:
    auctionid - Unique identifier of an auction.
    bid - Proxy bid placed by a bidder.
    bidtime - Time (in days) that the bid was placed from the start of the auction.
    bidder - eBay username of the bidder.
    bidderrate - eBay feedback rating of the bidder.
    openbid - Opening bid set by the seller.
    price - Closing price that the item sold for (equivalent to the second highest bid + an increment). 
    item - Type of item. 

    The table below shows the fields with some sample data:

    auctionid bid bidtime bidder bidderrate openbid price item daystolive
    8213034705 95 2.927373 jake7870 0 95 117.5 xbox 3
  2. Start the Spark interactive shell:

    - $SPARK_HOME represents the home of your Spark installation in MapR, for example: /opt/mapr/spark/spark-2.2.1/.

    $ $SPARK_HOME/bin/spark-shell --master local[2]
  3. Once the Spark shell is ready, load the dataset:
    scala> val auctionData = spark.read.textFile("/apps/auctiondata.csv")
  4. Display the first entry:
    scala> auctionData.first()
  5. Count the number of entries:
    scala> auctionData.count()
  6. Use other Spark actions:
    // Displays first 20 lines
    scala> auctionData.show()
                         
    // Displays first 3 lines – change value to see more/less
    scala> auctionData.take(3)
  7. Transform the dataset into a new one that contains only xbox lines, and count them:
    scala> val auctionWithXbox = auctionData.filter(line => line.contains("xbox"))
    scala> auctionWithXbox.count()

    - This could also be done in a single line by chaining transformations and actions:

    scala> auctionData.filter(line => line.contains("xbox")).count()
  8. Use Spark Dataframes:
    scala> val auctionDataFrame = spark.read.format("csv").option("inferSchema",
    true).load("/apps/auctiondata.csv").toDF("auctionid","bid","bidtime","bidder","bidderrate","openbid","price","item","daystolive")
  9. Use a filter transformation on the Dataframe:
    scala> auctionDataFrame.filter($"price" < 30).show()

Figure: Schematic representation of performing transformations and actions on a dataset

Applying transformations and actions to a dataset

Write Data from MapR-FS

Using the same dataset, save all xbox items as a file in MapR-FS:

- You can use the filter($"item" === "xbox") filter and write.json or other options to save the result of the action to MapR-FS.

scala> auctionDataFrame.filter($"item" === "xbox").write.json("/apps/results/json/xbox")
This command creates the /apps/results/json/xbox directory in which you will see the JSON file(s) created. You can use the same command to create Parquet or any other file format:
scala> auctionDataFrame.filter($"item" === "xbox").write.parquet("/apps/results/parquet/xbox")

Write Data to MapR-DB JSON

The first step when you are working with MapR-DB JSON is to define a document _id that uniquely identifies the document.

Add a new _id field in the csv file and generate UUIDs to add to this field.

The following commands will create a table and insert the data into: /apps/auction_json_table.

scala> import spark.implicits._
scala> import java.util.UUID
scala> import org.apache.spark.sql.SparkSession
scala> import org.apache.spark.sql.types._
scala> import org.apache.spark.sql.SaveMode
scala> import com.mapr.db.spark.sql._  // import the MapR-DB OJAI Connector
scala> val generateUUID = udf(() => UUID.randomUUID().toString) // create UDF to generate UUID
scala> // showing that you can create your own schema 
  val customSchema = 
  StructType(
   Array(
     StructField("actionid",StringType,true),
     StructField("bid",DoubleType,true), 
     StructField("bidtime",DoubleType,true),
     StructField("bidder",StringType,true),
     StructField("bidderrate",IntegerType,true),
     StructField("openbid",DoubleType,true),
     StructField("price",DoubleType,true),
     StructField("item",StringType,true),
     StructField("daystolive",IntegerType,true)
     )
   )            
You can now query the table using the MapR-DB shell. Open a terminal on your cluster and run the following command:
$ mapr dbshell
maprdb mapr:> find /apps/auction_json_table --limit 10

Read Data from MapR-DB JSON

Now that you have the data in MapR-DB JSON, you can create and query a Spark Dataframe using the following commands:
scala> import com.mapr.db.spark.sql._
scala> import org.apache.spark.sql.SparkSession
scala> val dataFromMapR = spark.loadFromMapRDB("/apps/auction_json_table")
scala> dataFromMapR.printSchema
scala> dataFromMapR.count
                  
scala> dataFromMapR.filter($"price" < 30).show()   // use a filter         
Note: To use notebooks, see MapR Data Science Refinery or Jupyter notebooks. Refer to the MapR Spark blog for more information regarding Spark implementations and use cases.