Native Avro Support

The MapR Database Binary Connector for Apache Spark supports different data formats such as Avro, JSON, and others. The following use case shows how Spark supports Avro. You can persist the Avro record into MapR Database binary tables directly. Internally, the Avro schema is converted to a native Spark Catalyst data type automatically. Note that both key-value parts in a MapR Database binary table can be defined in Avro format.

  1. Define the catalog for schema mapping. catalog is a schema for a MapR Database binary table named Avrotable, a row key as key, and one column col1. The rowkey also has to be defined in details as a column (col0), which has a specific cf (rowkey).
    def catalog = s"""{
                          |"table":{"namespace":"default", "name":"/path_to/avro_table"},
                          |"rowkey":"key",
                          |"columns":{
                          |"col0":{"cf":"rowkey", "col":"key", "type":"string"},
                          |"col1":{"cf":"cf1", "col":"col1", "type":"binary"}
                          |}
                          |}""".stripMargin
    
  2. Prepare the data. schemaString is defined first. Then it is parsed to get avroSchema. avroSchema is used to generate AvroHBaseRecord. data prepared by users is a local Scala collection that has 256 AvroHBaseRecord objects.
    object AvroHBaseRecord {
       val schemaString =
         s"""{"namespace": "example.avro",
             |   "type": "record",      "name": "User",
             |    "fields": [
             |        {"name": "name", "type": "string"},
             |        {"name": "favorite_number",  "type": ["int", "null"]},
             |        {"name": "favorite_color", "type": ["string", "null"]},
             |        {"name": "favorite_array", "type": {"type": "array", "items": "string"}},
             |        {"name": "favorite_map", "type": {"type": "map", "values": "int"}}
             |      ]    }""".stripMargin
    
       val avroSchema: Schema = {
         val p = new Schema.Parser
         p.parse(schemaString)
       }
    
       def apply(i: Int): AvroHBaseRecord = {
         val user = new GenericData.Record(avroSchema);
         user.put("name", s"name${"%03d".format(i)}")
         user.put("favorite_number", i)
         user.put("favorite_color", s"color${"%03d".format(i)}")
         val favoriteArray = new GenericData.Array[String](
    	2,
    	avroSchema.getField("favorite_array").schema())
         favoriteArray.add(s"number${i}")
         favoriteArray.add(s"number${i+1}")
         user.put("favorite_array", favoriteArray)
         import collection.JavaConverters._
         val favoriteMap = Map[String, Int](("key1" -> i), ("key2" -> (i+1))).asJava
         user.put("favorite_map", favoriteMap)
         val avroByte = AvroSedes.serialize(user, avroSchema)
         AvroHBaseRecord(s"name${"%03d".format(i)}", avroByte)
       }
     }
    
     val data = (0 to 255).map { i =>
        AvroHBaseRecord(i)
     }
    
  3. Save the DataFrame. Given a data frame with the specified schema catalog, the following example creates a MapR Database binary table with five (5) regions and saves the data frame inside.
    sc.parallelize(data).toDF.write.options(
         Map(
    	HBaseTableCatalog.tableCatalog -> catalog,
    	HBaseTableCatalog.newTable -> "5")
    	).format("org.apache.spark.sql.execution.datasources.hbase")
         .save()
    
  4. Load the DataFrame. In the withCatalog function, read returns a DataFrameReader that can be used to read data in as a DataFrame. The option function adds input options for the underlying data source to the DataFrameReader. There are two options: one is to set avroSchema as AvroHBaseRecord.schemaString. The other option is to set HBaseTableCatalog.tableCatalog as avroCatalog. The load() function loads input in as a DataFrame. The date frame df returned by the withCatalog function can be used to access the MapR Database binary table.
    def avroCatalog = s"""{
                |"table":{"namespace":"default", "name":"avrotable"},
                |"rowkey":"key",
                |"columns":{
                  |"col0":{"cf":"rowkey", "col":"key", "type":"string"},
                  |"col1":{"cf":"cf1", "col":"col1", "avro":"avroSchema"}
                |}
              |}""".stripMargin
    
     def withCatalog(cat: String): DataFrame = {
         sqlContext
             .read
             .options(Map(
    		"avroSchema" -> AvroHBaseRecord.schemaString,
    		HBaseTableCatalog.tableCatalog -> avroCatalog)
    	     ).format("org.apache.spark.sql.execution.datasources.hbase")
             .load()
     }
     val df = withCatalog(catalog)
    
  5. Query data using SQL. After loading df DataFrame, you can query data. registerTempTable registers df DataFrame as a temporary table using the table name avrotable. The sqlContext.sql function allows you to execute SQL queries.
    df.registerTempTable("avrotable")
    val c = sqlContext.sql("select count(1) from avrotable"