Similar Document Search using Apache Spark with TF-IDF

6 min read

Background

Google Groups is used as the main platform for knowledge sharing and interaction by the Professional Services (PS) team here at MapR. The usual flow starts with a team member posting a technical question, followed by other team members responding to the question. Over the years, many messages have been responded to. Often the new messages are very similar to older messages that already have a response. In this context, we were tasked with providing a solution to automatically find messages in the archives that are similar to new messages and send them to the person asking the question.

To accomplish that goal, we decided to apply machine learning to the process, so that there is an automated program able to find similarities between the current message and the historical data. The algorithm used was term frequency—inverse document frequency (TF-IDF). TF-IDF is used in a variety of applications. Typical use cases include document search, document tagging, and finding similar documents.

Problem Description

The desired solution was built using two Apache Spark applications running in a MapR cluster: one of them uses the historical data to update data features and train the model on a regular basis, and the second one analyzes every new message and finds five similar ones.

By the way, you can try MapR now for free, either via the web or a sandbox VM here.

Application 1 - Creates Features and Trains Model

This application was developed using Spark and Scala, and it can run on a schedule, depending on the needs. Here is what it does, step by step:

  1. Loads all messages from MapR Database. For the sake of brevity, we omit preprocessing steps like tokenization, stop words removal, punctuation removal, and other types of cleanup.
    val rawEmaiData=spark.loadFromMapRDB("/googlegroups/messages")
    val rawEmaiDataDF=rawEmaiData.select("_id","bodyWithHistory","threadId","emailDate")
    
  2. Creates hashingTF, using HashingTF class available in Spark, and sets fixed-length feature vectors of 1000. It applies the hashing transformation to the document, resulting in the featurizedData.
    val hashingTF = new HashingTF().setInputCol("words").setOutputCol("rawFeatures").setNumFeatures(1000)
    val featurizedData = hashingTF.transform(wordsData)
    
  3. Creates the IDF, and from the TF and the IDF, it creates the TF-IDF.
    val idf = new IDF().setInputCol("rawFeatures").setOutputCol("features")
    val idfModel = idf.fit(featurizedData)
    val rescaledData = idfModel.transform(featurizedData)
    
  4. A UDF is necessary for pre-calculating sparse vector norm.
    def calcNorm(vectorA: SparseVector): Double = {
      var norm = 0.0
      for (i <-  vectorA.indices){ norm += vectorA(i)*vectorA(i) }
      (math.sqrt(norm))
    }
    val calcNormDF = udf[Double,SparseVector](calcNorm)
    
  5. Creates a TF-IDF corpus.
    val normalized = rescaledData.withColumn("norm",calcNormDF(col("features")))
    
  6. Saves IDF model to MapR XD Distributed File and Object Store.
    idfModel.write.overwrite().save("/googlegroups/save_model_idf")
    
  7. To save features vector to the MapR Database table, we have to convert the features vector to JSON format; for this, we create and register a UDF.
    def toJson(v: Vector): String = {
    v match {
     case SparseVector(size, indices, values) =>
       val jValue = ("type" -> 0) ~
         ("size" -> size) ~
         ("indices" -> indices.toSeq) ~
         ("values" -> values.toSeq)
       compact(render(jValue))
     case DenseVector(values) =>
       val jValue = ("type" -> 1) ~ ("values" -> values.toSeq)
       compact(render(jValue))
        }
      }
    }
    val asJsonUDF = udf[String,Vector](toJson)
    
  8. Finally, saves features vector to the MapR Database table.
    val dfToSave = normalized.withColumn("rawFeaturesJson", asJsonUDF(col("rawFeatures"))).withColumn("featuresJson", asJsonUDF(col("features"))).drop("rawFeatures").drop("features")
    dfToSave.saveToMapRDB("/googlegroups/trained_model", createTable = false)
    

Application 2 - New Messages

The second application is a Spark Stream Consumer application that will execute the following steps:

  1. Loads the previously saved idfModel and initializes a new HashingTF model.
    val idfModel = IDFModel.load("path/to/serialized/model")
    val hashingTF = new HashingTF().setInputCol("words").setOutputCol("rawFeatures").setNumFeatures(1000)
    
  2. Loads in memory and caches the data with the features, saved previously.
    val all = contextFuntions.loadFromMapRDB(argsConfiguration.trained).toDF
    all.cache()
    
  3. Creates a DataFrame with current message.
    val one = Seq((x._id,x.body)).toDF("_id", "contents")
    val newWords = prepareWords(one, "words")
    val newFeature = hashingTF.transform(newWords)
    val newRescale = idfModel.transform(newFeature)
    val normalized = newRescale.withColumn("norm2", UDF.calcNormUDF(col("features2")))
    
  4. Then it finds the crossjoin DataFrame between the one element and all existing messages in the database and calculates the similarity.
    val cross = normalized.crossJoin(all).drop(normalized.col("_id"))
    val cosine = cross.withColumn("similarity", UDF.calcCosineUDF(col("features"), col("features2"), col("norm"), col("norm2")))
    
  5. For this, it uses the cosine function implemented as follows and registered as a UDF.
    def cosineSimilarity(vectorA: SparseVector, vectorB:SparseVector,normASqrt:Double,normBSqrt:Double) :(Double) = {
    var dotProduct = 0.0
    for (i <-  vectorA.indices){ dotProduct += vectorA(i) * vectorB(i) }
    val div = (normASqrt * normBSqrt)
    if( div == 0 ) (0)
    else (dotProduct / div)
    }
    udf[Double,SparseVector,SparseVector,Double,Double](cosineSimilarity)
    
  6. The result can then be ordered by similarity, in descending order, taking the top five elements.
    val similarsDF = cosine.sort(desc("similarity")).select("similarity","_id").limit(5)
    

Conclusions

MapR provides the ecosystem needed for Apache Spark applications to run and scale as needed. It integrates all database and streaming platforms and enables the ability to do distributed processing. It efficiently integrates Spark with the database and the file system by extending it. Both capabilities, which are particularly useful for this solution, will be implemented in production as a feature of a bigger product in an effort to organize PS Google Groups forum and with the intention of extending it to other data sources and realms. Since it is tested in a MapR cluster, all that would be needed is to install it and dedicate more resources when the moment comes.

Additional Information

Blog Post "Datasets, DataFrames and Spark SQL for Processing Tabular Data" by Carol McDonald

Complimentary eBook Getting Started with Apache Spark 2.x from Inception to Production by Carol McDonald, with Ian Downard

Product description MapR event store for Apache Kafka

Complimentary eBook Streaming Architecture by Ted Dunning and Ellen Friedman

Try MapR free with MapR Live! or a sandbox VM


This blog post was published June 18, 2019.
Categories

50,000+ of the smartest have already joined!

Stay ahead of the bleeding edge...get the best of Big Data in your inbox.


Get our latest posts in your inbox

Subscribe Now