Scala Applications to Access Hadoop Data

Contributed by

11 min read

One of the most recent and highly used functional programming language is Scala. It is used in some of the Hadoop ecosystem components like Apache Spark, Apache Kafka, etc. So it would be really useful for someone to develop applications using Scala that uses Hadoop and the ecosystem projects.

In this post, I am going to show you how to run a Scala application that accesses Hadoop data. The example below references MapR-FS, which is a distributed and fast file system similar to HDFS, and is part of the MapR Distribution. And from an application perspective, MapR-FS looks exactly like HDFS, so this example application will work on any Hadoop distribution with a slight modification to the core-site.xml configuration file. You can access Hadoop data using Scala applications similarly to how one accesses Hadoop data using Java and C/C++ applications.

I will walk through the code of a simple Scala application, and then go through the steps to compile and execute that Scala application, which uses Java APIs. I will assume that you already have MapR v3.0.3 (if any other version, please change it to corresponding version in build.sbt to pick appropriate dependencies) installed, but if not, please download (for free) either the Community Edition or the MapR Sandbox. Also, you need to have a Scala compiler installed, which you can download from: http://www.scala-lang.org.

The program performs the following, all on MapR-FS:

  1. Initial configuration
  2. Create and save a file
  3. Delete a file
  4. Read a file
  5. Create a folder

Below we have snippets of code explaining each step. We also have the complete source code available for download later in the blog post.

1. Initial configuration

//Get an instance of Configuration
private var conf = new Configuration() //Hadoop and MapR resources to be added to Configuration instance. Here, we can have custom configuration for both MapR and generic Hadoop stuff.
private var maprfsCoreSitePath = new Path("core-site.xml")
  private var maprfsSitePath = new Path("maprfs-site.xml")
//Add the resources to Configuration instance
  conf.addResource(maprfsCoreSitePath)
  conf.addResource(maprfsSitePath)
//Get a FileSystem handle
private var fileSystem = FileSystem.get(conf) 2. Create and save a file into MapR-FS def createAndSave(filepath: String): Unit = { var file = new File(filepath) var out = fileSystem.create(new Path(file.getName)) <font color="green">//New File will be created in MapR-FS</font> //Read from the file into InputStream
    var in = new BufferedInputStream(new FileInputStream(file))
    var b = new Array[Byte](1024)
    var numBytes = in.read(b)
//Write the output to the OutputStream given by the FileSystem
    while (numBytes > 0) {
      out.write(b, 0, numBytes)
      numBytes = in.read(b)
    }
    in.close()
    out.close()
  }

3. Delete a file from MapR-FS

//Get the path of the filename and delete the file from MapR-FS
 def deleteFile(filename: String): Boolean = {
     var path = new Path(filename)
     fileSystem.delete(path, true)
  }

4. Read a file from MapR-FS

//Read a file in MapRFS using FileSystem which returns an InputStream
  def getFile(filename: String): InputStream = {
    var path = new Path(filename)
    fileSystem.open(path)
  }

5. Create a folder inside MapR-FS

//If a directory not exists already, create the directory using FileSystem instance
 def mkdirs(folderPath: String): Unit = {
   var path = new Path(folderPath)
     if (!fileSystem.exists(path)) {
    fileSystem.mkdirs(path)
     }
  }

Compiling and Running the Program

Scala has 2 utilities, scalac (compile) and scala(run), similar to Java. I’ll go through the compile and run steps, which should look familiar to Java programmers.

1. cd to the directory where .scala files are stored.

root@ubuntu1:/home/maprfs-scala-example# ls -l
total 44
-rwxr-xr-x 1 mapr root  552 Aug 25 15:42 build.sbt
drwxr-xr-x 3 root root 4096 Sep 15 10:35 com
-rwxr-xr-x 1 mapr root  855 Aug 25 16:24 core-site.xml
drwxr-xr-x 2 mapr root 4096 Aug 26 11:56 lib
-rwxr-xr-x 1 mapr root  305 Aug 25 11:35 maprfs-site.xml
drwxr-xr-x 3 mapr root 4096 Aug 25 11:36 project
-rwxr-xr-x 1 mapr root  132 Aug 25 11:35 README.md
drwxr-xr-x 4 mapr root 4096 Aug 25 11:35 src
drwxr-xr-x 5 root root 4096 Oct  3 13:20 target
-rwxr-xr-x 1 root root   97 Oct  3 10:21 testfile1.txt
-rwxr-xr-x 1 mapr root  110 Oct  3 13:20 testfile.txt

2. Compile the .scala files with “scalac -cp .:$(hadoop classpath) *.scala”

root@ubuntu1:/home/maprfs-scala-example<font color="green">**# scalac -cp .:$(hadoop classpath) src/main/scala/*.scala**</font>

3. Run the scala class files with “scala -cp .:$(hadoop classpath) com.mapr.scala.example.Main”

root@ubuntu1:/home/maprfs-scala-example<font color="green">**# scala -cp .:$(hadoop classpath) com.mapr.scala.example.Main**</font>
Creating an empty file
New file created as empty-file.txt
Creating new file
Writing Example text  into the testfile.txt
Saving the file testfile.txt to MaprFs
Appended text from testfile1.txt to testfile.txt
Deleting the file empty-file.txt

Using SBT (Simple Build Tool)

SBT (Simple Build Tool) is an open source build tool used to build Scala and Java applications similar to Maven, Ant etc.

1. Install SBT, which you can download from here: http://www.scala-sbt.org/download.html

2. cd to the directory where the Scala project (maprfs-scala-example) resides

3. Create a file called “build.sbt” with the following in it:

name := <font color="red">"example"</font>
organization := <font color="red">"com.mapr.scala"</font>
version := <font color="red">"0.1"</font>
scalaVersion := <font color="red">"2.10.3"</font>

libraryDependencies ++= Seq(
  <font color="red">"org.apache.hadoop"</font> % <font color="red">"hadoop-client"</font> % <font color="red">"1.0.3-mapr-3.0.3"</font> excludeAll(
    ExclusionRule(organization = <font color="red">"com.sun.jdmk"</font>),
    ExclusionRule(organization = <font color="red">"com.sun.jmx"</font>),
    ExclusionRule(organization = <font color="red">"javax.jms"</font>)),
  <font color="red">"org.scalatest"</font> % <font color="red">"scalatest_2.10"</font> % <font color="red">"1.9.2"</font> % <font color="red">"test"</font>
)

resolvers += <font color="red">"MapR jars"</font> at <font color="red">"http://repository.mapr.com/nexus/content/groups/mapr-public/"</font>
initialCommands := <font color="red">"import com.mapr.scala.example._"</font>

4. And then run “sbt” inside the directory where build.sbt is saved.

root@ubuntu1:/home/maprfs-scala-example<font color="green">**# sbt**</font>
[info] Set current project to example (in build file:/home/maprfs-scala-example/)

5. Inside the sbt shell, run “clean” (to clean already existing class files under target directory), “compile” (to compile the scala files inside the project directory), “run” (to execute the “Main” class which is having the main function).

> <font color="green">**clean**</font>
[<font color="green">**success**</font>] Total time: 1 s, completed Sep 15, 2014 11:03:26 AM
> <font color="green">**compile**</font>
[info] Updating {file:/home/maprfs-scala-example/}maprfs-scala-example...
[info] Resolving org.fusesource.jansi<font color="green">#jansi;1.4 ...</font>
[info] Done updating.
[info] Compiling 2 Scala sources to /home/maprfs-scala-example/target/scala-2.10/classes...
[<font color="green">**success**</font>] Total time: 28 s, completed Sep 15, 2014 11:04:44 AM
< <font color="green">**run**</font>
[info] Running com.mapr.scala.example.Main
New file created as empty-file.txt
Creating new file
Writing Example text  into the testfile.txt
Saving the file testfile.txt to MaprFs
Appended text from testfile1.txt to testfile.txt
Deleting the file empty-file.txt
[<font color="green">**success**</font>] Total time: 2 s, completed Oct 6, 2014 10:30:35 AM
>

As you can see, running a Scala application that accesses Hadoop data in MapR is very straightforward. This is just one example of the power of MapR-FS and how it makes your Hadoop data accessible to a wider variety of applications.

**MapRfsFileService.scala**

package com.mapr.scala.example

import java.io.BufferedInputStream
import java.io.File
import java.io.FileInputStream
import java.io.InputStream
import org.apache.hadoop.conf._
import org.apache.hadoop.fs._

object MapRfsFileService {

<font color="green">//Initial Configuration</font>

  private var conf = new Configuration()
  private var maprfsCoreSitePath = new Path("core-site.xml")
  private var maprfsSitePath = new Path("maprfs-site.xml")

  conf.addResource(maprfsCoreSitePath)
  conf.addResource(maprfsSitePath)

  private var fileSystem = FileSystem.get(conf)

<font color="green">//Create a folder inside MapR-FS</font>

  def mkdirs(folderPath: String): Unit = {
   var path = new Path(folderPath)
     if (!fileSystem.exists(path)) {
    fileSystem.mkdirs(path)
     }
  }

<font color="green">//Create a new Empty file</font>

 def createNewFile(filepath:String): Unit = {
   var file = new File(filepath)
   var out = fileSystem.createNewFile(new Path(file.getName))
   if(out)
     println("New file created as "+file.getName)
   else
     println("File cannot be created : "+file.getName)
  }

<font color="green">//Create and save a file into MapR-FS</font>

  def createAndSave(filepath: String): Unit = {
    var file = new File(filepath)
    var out = fileSystem.create(new Path(file.getName))
    var in = new BufferedInputStream(new FileInputStream(file))
    var b = new Array[Byte](1024)
    var numBytes = in.read(b)
    while (numBytes > 0) {
      out.write(b, 0, numBytes)
      numBytes = in.read(b)
    }
    in.close()
    out.close()
  }

<font color="green">//Append data from a file to another file</font>

 def appendToFile(tofilepath: String, fromfilepath: String): Unit = {
   var file = new File(tofilepath)
   var out = fileSystem.append(new Path(file.getName))
   var in = new BufferedInputStream(new FileInputStream(new File(fromfilepath)))
   var b = new Array[Byte](1024)
   var numBytes = in.read(b)
   while (numBytes > 0) {
     out.write(b, 0, numBytes)
     numBytes = in.read(b)
    }
    in.close()
    out.close()
 }

<font color="green">//Read a file from MapR-FS</font>

  def getFile(filename: String): InputStream = {
    var path = new Path(filename)
    fileSystem.open(path)
  }

<font color="green">//Delete a file from MapR-FS</font>

  def deleteFile(filename: String): Boolean = {
     var path = new Path(filename)
     fileSystem.delete(path, true)
  }

<font color="green">//Close the FileSystem Handle</font>

  def close() = {
     fileSystem.close
   }
 }

**Main.scala**

package com.mapr.scala.example
import java.io._
import java.util.Arrays;

object Main {
  def main(args: Array[String]) {
    val testfileName = "testfile.txt"
    val testText = "Example text "
    val appendFile = "testfile1.txt"

    val emptyFile = "empty-file.txt"

<font color="green">//Create an Empty file in MapRFS</font>

    println("Creating an empty file")
    MapRfsFileService.createNewFile(emptyFile)

 <font color="green">//Creating a new file and saving it to MapRFS</font>

    println("Creating new file")
    val testfile = new File(testfileName)
    val testfileWriter = new BufferedWriter(new FileWriter(testfile))
    println("Writing " + testText + " into the " + testfileName)
    testfileWriter.write(testText)
    testfileWriter.close
    println("Saving the file " + testfileName + " to MaprFs")
    MapRfsFileService.createAndSave(testfileName)

<font color="green">//Append to file in MapRFS</font>

    MapRfsFileService.appendToFile(testfileName,appendFile)
    println("Appended text from "+ appendFile + " to "+testfileName)

<font color="green">//Reading a file from MapRFS</font>

    val outputStream = new FileOutputStream(new File(testfileName))
    val in = MapRfsFileService.getFile(testfileName)
    var b = new Array[Byte](1024)
    var numBytes = in.read(b)
    while (numBytes > 0) {
      outputStream.write(b, 0, numBytes)
      numBytes = in.read(b)
    }
    outputStream.close
    in.close

<font color="green">//Deleting a file from MapRFS</font>

    println("Deleting the file " + emptyFile)
    MapRfsFileService.deleteFile(emptyFile)

<font color="green">//Close the FileSystem Handle</font>

   MapRfsFileService.close
   }
 }

**maprfs-site.xml**

 <font color="blue"><?xml version=<font color="red">"1.0"</font>?>
<?xml-stylesheet type=<font color="red">"text/xsl"</font> href=<font color="red">"configuration.xsl"</font>?>

<font color="green"><!-- Put site-specific property overrides in this file. --></font>
<configuration>
  <configuration>
    <property>
      <name><font color="black">dfs.replication</font></name>
      <value><font color="black">1</font></value>
    </property>
  </configuration>
</configuration>

<font color="black">**core-site.xml**</font>

<?xml version=<font color="red">"1.0"</font>?>
<?xml-stylesheet type=<font color="red">"text/xsl"</font> href=<font color="red">"configuration.xsl"</font>?>
<font color="green"><!-- Put site-specific property overrides in this file. --></font>
<configuration>
  <property>
    <name><font color="black">fs.default.name</font></name>
    <value><font color="black">maprfs:///</font></value>
  </property>
  <property>
    <name><font color="black">hadoop.tmp.dir</font></name>
    <value><font color="black">/tmp/hadoop-${user.name}</font></value>
    <description><font color="black">A base for other temporary directories.</font></description>
  </property>
<property>
  <name><font color="black">fs.mapr.working.dir</font></name>
  <value><font color="black">/user/$USERNAME/</font></value>
  <description><font color="black">The default directory to be used with relative paths.
  Note that $USERNAME is NOT an environmental variable, but just a placeholder
  to indicate that it will be expanded to the corresponding username.
  Other example default directories could be "/", "/home/$USERNAME", "/$USERNAME" etc.</font>
  </description>
</property>
</configuration></font>

The github source for this Scala application: https://github.com/vsowrirajan/maprfs-scala-example/


This blog post was published October 29, 2014.
Categories

50,000+ of the smartest have already joined!

Stay ahead of the bleeding edge...get the best of Big Data in your inbox.


Get our latest posts in your inbox

Subscribe Now