Predicting Breast Cancer Using Apache Spark Machine Learning Logistic Regression

Contributed by

16 min read

In this blog post, I’ll help you get started using Apache Spark’s spark.ml Logistic Regression for predicting cancer malignancy. Spark’s spark.ml library goal is to provide a set of APIs on top of DataFrames that help users create and tune machine learning workflows or pipelines. Using spark.ml with DataFrames improves performance through intelligent optimizations.

Classification

Classification is a family of supervised machine learning algorithms that identify which category an item belongs to (for example, whether a cancer tissue observation is malignant or not), based on labeled examples of known items (for example, observations known to be malignant or not). Classification takes a set of data with known labels and pre-determined features and learns how to label new records based on that information. Features are the “if questions” that you ask. The label is the answer to those questions. In the example below, if it walks, swims, and quacks like a duck, then the label is "duck."

Let’s go through an example of Cancer Tissue Observations:

  • What are we trying to predict?

  • Whether a sample observation is malignant or not.

  • This is the Label: malignant or not.

  • What are the “if questions” or properties that you can use to predict?

  • Tissue sample characteristics: Clump Thickness, Uniformity of Cell Size, Uniformity of Cell Shape, Marginal Adhesion, Single Epithelial Cell Size, Bare Nuclei, Bland Chromatin, Normal Nucleoli, Mitoses.

  • These are the Features. To build a classifier model, you extract the features of interest that most contribute to the classification.

Logistic Regression

Logistic regression is a popular method to predict a binary response. It is a special case of Generalized Linear models that predicts the probability of the outcome. Logistic regression measures the relationship between the Y “Label” and the X “Features” by estimating probabilities using a logistic function. The model predicts a probability which is used to predict the label class.

Analyze Cancer Observations with Spark Machine Learning Scenario

Our data is from the Wisconsin Diagnostic Breast Cancer (WDBC) Data Set which categorizes breast tumor cases as either benign or malignant based on 9 features to predict the diagnosis. For each cancer observation, we have the following information:

1\. Sample code number: id number
2\. Clump Thickness: 1 - 10
3\. Uniformity of Cell Size: 1 - 10
4\. Uniformity of Cell Shape: 1 - 10
5\. Marginal Adhesion: 1 - 10
6\. Single Epithelial Cell Size: 1 - 10
7\. Bare Nuclei: 1 - 10
8\. Bland Chromatin: 1 - 10
9\. Normal Nucleoli: 1 - 10
10\. Mitoses: 1 - 10
11\. Class: (2 for benign, 4 for malignant)

The Cancer Observation csv file has the following format :

1000025,5,1,1,1,2,1,3,1,1,2
1002945,5,4,4,5,7,10,3,2,1,2
1015425,3,1,1,1,2,2,3,1,1,2

In this scenario, we will build a logistic regression model to predict the label / classification of malignant or not based on the following features:

  • Label → malignant or benign (1 or 0)
  • Features → {Clump Thickness, Uniformity of Cell Size, Uniformity of Cell Shape, Marginal Adhesion, Single Epithelial Cell Size, Bare Nuclei, Bland Chromatin, Normal Nucleoli, Mitoses }

Spark ML provides a uniform set of high-level APIs built on top of DataFrames. The main concepts in Spark ML are:

  • DataFrame: The ML API uses DataFrames from Spark SQL as an ML dataset.
  • Transformer: A Transformer is an algorithm which transforms one DataFrame into another DataFrame. For example, turning a DataFrame with features into a DataFrame with predictions.
  • Estimator: An Estimator is an algorithm which can be fit on a DataFrame to produce a Transformer. For example, training/tuning on a DataFrame and producing a model.
  • Pipeline: A Pipeline chains multiple Transformers and Estimators together to specify a ML workflow.
  • ParamMaps: Parameters to choose from, sometimes called a “parameter grid” to search over.
  • Evaluator: Metric to measure how well a fitted Model does on held-out test data.
  • CrossValidator: Identifies the best ParamMap and re-fits the Estimator using the best ParamMap and the entire dataset.

In this example, we will use the Spark ML workflow shown below:

Software

This tutorial will run on Spark 1.6.1

Log into the MapR Sandbox, as explained in Getting Started with Spark on MapR Sandbox, using userid user01, password mapr. Copy the sample data file to your sandbox home directory /user/user01 using scp. Start the Spark shell with:

$spark-shell --master local[1]

Load and Parse the Data from a csv File

First, we will import the machine learning packages.
(In the code boxes, comments are in Green and output is in Blue)

import org.apache.spark._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SQLContext
import org.apache.spark.ml.feature.StringIndexer
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.classification.BinaryLogisticRegressionSummary
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.feature.StringIndexer
import org.apache.spark.ml.feature.VectorAssembler
import sqlContext.implicits._
import sqlContext._
import org.apache.spark.sql.functions._
import org.apache.spark.mllib.linalg.DenseVector
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics

We use a Scala case class to define the schema corresponding to a line in the csv data file.

**// define the Cancer Observation Schema**
case class Obs(clas: Double, thickness: Double, size: Double, shape: Double, madh: Double, epsize: Double, bnuc: Double, bchrom: Double, nNuc: Double, mit: Double)

The functions below parse a line from the data file into the Cancer Observation class.

**// function to create a Obs class from an Array of Double.Class Malignant 4 is changed to 1**
def parseObs(line: Array[Double]): Obs = {
    Obs(
      if (line(9) == 4.0) 1 else 0, line(0), line(1), line(2), line(3), line(4), line(5), line(6), line(7), line(8)
    )
}
**// function to transform an RDD of Strings into an RDD of Double, filter lines with ?, remove first column**
def parseRDD(rdd: RDD[String]): RDD[Array[Double]] = {
    rdd.map(_.split(",")).filter(_(6) != "?").map(_.drop(1)).map(_.map(_.toDouble))
}

Below we load the data from the csv file into an RDD of Strings. Then we use the map transformation on the rdd, which will apply the ParseRDD function to transform each String element in the RDD into an Array of Double. Then we use another map transformation, which will apply the ParseObs function to transform each Array of Double in the RDD into an Array of Cancer Observation objects. The toDF() method transforms the RDD of Array[[Cancer Observation]] into a Dataframe with the Cancer Observation class schema.

**// load the data into a DataFrame**
val rdd = sc.textFile("data/breast_cancer_wisconsin_data.txt")
val obsRDD = parseRDD(rdd).map(parseObs)
val obsDF = obsRDD.toDF().cache()
obsDF.registerTempTable("obs")

DataFrame printSchema() Prints the schema to the console in a tree format

**// Return the schema of this DataFrame**
obsDF.printSchema
 root
 |-- clas: double (nullable = false)
 |-- thickness: double (nullable = false)
 |-- size: double (nullable = false)
 |-- shape: double (nullable = false)
 |-- madh: double (nullable = false)
 |-- epsize: double (nullable = false)
 |-- bnuc: double (nullable = false)
 |-- bchrom: double (nullable = false)
 |-- nNuc: double (nullable = false)
 |-- mit: double (nullable = false)
**// Display the top 20 rows of DataFrame**
obsDF.show
 +----+---------+----+-----+----+------+----+------+----+---+
|clas|thickness|size|shape|madh|epsize|bnuc|bchrom|nNuc|mit|
+----+---------+----+-----+----+------+----+------+----+---+
| 0.0|      5.0| 1.0|  1.0| 1.0|   2.0| 1.0|   3.0| 1.0|1.0|
| 0.0|      5.0| 4.0|  4.0| 5.0|   7.0|10.0|   3.0| 2.0|1.0|
| 0.0|      3.0| 1.0|  1.0| 1.0|   2.0| 2.0|   3.0| 1.0|1.0|
...
+----+---------+----+-----+----+------+----+------+----+---+
only showing top 20 rows

After a DataFrame is instantiated, you can query it using SQL queries. Here are some example queries using the Scala DataFrame API:

**//  describe computes statistics for thickness column, including count, mean, stddev, min, and max**
obsDF.describe("thickness").show
 +-------+------------------+
|summary|         thickness|
+-------+------------------+
|  count|               683|
|   mean|  4.44216691068814|
| stddev|2.8207613188371288|
|    min|               1.0|
|    max|              10.0|
+-------+------------------+

**// compute the avg thickness, size, shape grouped by clas (malignant or not)**
sqlContext.sql("SELECT clas, avg(thickness) as avgthickness, avg(size) as avgsize, avg(shape) as avgshape FROM obs GROUP BY clas ").show
 +----+-----------------+------------------+------------------+
|clas|     avgthickness|           avgsize|          avgshape|
+----+-----------------+------------------+------------------+
| 1.0|7.188284518828452| 6.577405857740586| 6.560669456066946|
| 0.0|2.963963963963964|1.3063063063063063|1.4144144144144144|
+----+-----------------+------------------+------------------+
**// compute avg thickness grouped by clas (malignant or not)**
obsDF.groupBy("clas").avg("thickness").show
 +----+-----------------+
|clas|   avg(thickness)|
+----+-----------------+
| 1.0|7.188284518828452|
| 0.0|2.963963963963964|
+----+-----------------+

Extract Features

To build a classifier model, you first extract the features that most contribute to the classification. In the cancer data set, the data is labeled with two classes – 1 (malignant) and 0 (not malignant).

The features for each item consists of the fields shown below:

  • Label → malignant: 0 or 1
  • Features → {"thickness", "size", "shape", "madh", "epsize", "bnuc", "bchrom", "nNuc", "mit"}

Define Features Array

(reference Learning Spark)

In order for the features to be used by a machine learning algorithm, the features are transformed and put into Feature Vectors, which are vectors of numbers representing the value for each feature.

Below a VectorAssembler is used to transform and return a new DataFrame with all of the feature columns in a vector column

**//define the feature columns to put in the feature vector**
val featureCols = Array("thickness", "size", "shape", "madh", "epsize", "bnuc", "bchrom", "nNuc", "mit")

**//set the input and output column names**
val assembler = new VectorAssembler().setInputCols(featureCols).setOutputCol("features")
**//return a dataframe with all of the  feature columns in  a vector column**
val df2 = assembler.transform(obsDF)
**// the transform method produced a new column: features.**
df2.show
 +----+---------+----+-----+----+------+----+------+----+---+--------------------+
|clas|thickness|size|shape|madh|epsize|bnuc|bchrom|nNuc|mit|            features|
+----+---------+----+-----+----+------+----+------+----+---+--------------------+
| 0.0|      5.0| 1.0|  1.0| 1.0|   2.0| 1.0|   3.0| 1.0|1.0|[5.0,1.0,1.0,1.0,...|
| 0.0|      5.0| 4.0|  4.0| 5.0|   7.0|10.0|   3.0| 2.0|1.0|[5.0,4.0,4.0,5.0,...|
| 1.0|      8.0|10.0| 10.0| 8.0|   7.0|10.0|   9.0| 7.0|1.0|[8.0,10.0,10.0,8....|

Next, we use a StringIndexer to return a Dataframe with the clas (malignant or not) column added as a label .

**//  Create a label column with the StringIndexer**
val labelIndexer = new StringIndexer().setInputCol("clas").setOutputCol("label")
val df3 = labelIndexer.fit(df2).transform(df2)
**// the  transform method produced a new column: label.**
df3.show
 +----+---------+----+-----+----+------+----+------+----+---+--------------------+-----+
|clas|thickness|size|shape|madh|epsize|bnuc|bchrom|nNuc|mit|            features|label|
+----+---------+----+-----+----+------+----+------+----+---+--------------------+-----+
| 0.0|      5.0| 1.0|  1.0| 1.0|   2.0| 1.0|   3.0| 1.0|1.0|[5.0,1.0,1.0,1.0,...|  0.0|
| 0.0|      5.0| 4.0|  4.0| 5.0|   7.0|10.0|   3.0| 2.0|1.0|[5.0,4.0,4.0,5.0,...|  0.0|
| 0.0|      3.0| 1.0|  1.0| 1.0|   2.0| 2.0|   3.0| 1.0|1.0|[3.0,1.0,1.0,1.0,...|  0.0|

Below the data is split into a training data set and a test data set, 70% of the data is used to train the model, and 30% will be used for testing.

**//  split the dataframe into training and test data**
val splitSeed = 5043
val Array(trainingData, testData) = df3.randomSplit(Array(0.7, 0.3), splitSeed)

Train the Model

Next, we train the logistic regression model with elastic net regularization

The model is trained by making associations between the input features and the labeled output associated with those features.

**// create the classifier,  set parameters for training**
val lr = new LogisticRegression().setMaxIter(10).setRegParam(0.3).setElasticNetParam(0.8)
**//  use logistic regression to train (fit) the model with the training data**
val model = lr.fit(trainingData)    

**// Print the coefficients and intercept for logistic regression**
println(s"Coefficients: ${model.coefficients} Intercept: ${model.intercept}")

 Coefficients: (9,[1,2,5,6],[0.06503554553146387,0.07181362361391264,0.07583963853124673,0.0012675057388232965]) Intercept: -1.39319142312609

Test the Model

Next we use the test data to get predictions.

**// run the  model on test features to get predictions**
val predictions = model.transform(testData)
**//As you can see, the previous model transform produced a new columns: rawPrediction, probablity and prediction.**
predictions.show
 +----+---------+----+-----+----+------+----+------+----+---+--------------------+-----+--------------------+--------------------+----------+
|clas|thickness|size|shape|madh|epsize|bnuc|bchrom|nNuc|mit|            features|label|       rawPrediction|         probability|prediction|
+----+---------+----+-----+----+------+----+------+----+---+--------------------+-----+--------------------+--------------------+----------+
| 0.0|      1.0| 1.0|  1.0| 1.0|   1.0| 1.0|   1.0| 3.0|1.0|[1.0,1.0,1.0,1.0,...|  0.0|[1.17923510971064...|[0.76481024658406...|       0.0|
| 0.0|      1.0| 1.0|  1.0| 1.0|   1.0| 1.0|   3.0| 1.0|1.0|[1.0,1.0,1.0,1.0,...|  0.0|[1.17670009823299...|[0.76435395397908...|       0.0|
| 0.0|      1.0| 1.0|  1.0| 1.0|   1.0| 1.0|   3.0| 1.0|1.0|[1.0,1.0,1.0,1.0,...|  0.0|[1.17670009823299...|[0.76435395397908...|       0.0|
| 0.0|      1.0| 1.0|  1.0| 1.0|   2.0| 1.0|   1.0| 1.0|1.0|[1.0,1.0,1.0,1.0,...|  0.0|[1.17923510971064...|[0.76481024658406...|       0.0|
| 0.0|      1.0| 1.0|  1.0| 1.0|   2.0| 1.0|   2.0| 1.0|1.0|[1.0,1.0,1.0,1.0,...|  0.0|[1.17796760397182...|[0.76458217679258...|       0.0|
+----+---------+----+-----+----+------+----+------+----+---+--------------------+-----+--------------------+--------------------+----------+

Below we evaluate the predictions, we use a BinaryClassificationEvaluator which returns a precision metric by comparing the test label column with the test prediction column. In this case, the evaluation returns 99% precision.

 **//A common metric used for logistic regression is area under the ROC curve (AUC). We can use the BinaryClasssificationEvaluator to obtain the AUC
// create an Evaluator for binary classification, which expects two input columns: rawPrediction and label.**
val evaluator = new BinaryClassificationEvaluator().setLabelCol("label").setRawPredictionCol("rawPrediction").setMetricName("areaUnderROC")
**// Evaluates predictions and returns a scalar metric areaUnderROC(larger is better).**
val accuracy = evaluator.evaluate(predictions)
accuracy: Double = 0.9926910299003322

Below we calculate some more metrics. The number of false and true positive and negative predictions is also useful:

  • True positives are how often the model correctly predicted a tumour was malignant
  • False positives are how often the model predicted a tumour was malignant when it was benign
  • True negatives indicate how the model correctly predicted a tumour was benign
  • False negatives indicate how often the model predicted a tumour was benign when in fact it was malignant
**// Calculate Metrics**
val lp = predictions.select( "label", "prediction")
val counttotal = predictions.count()
val correct = lp.filter($"label" === $"prediction").count()
val wrong = lp.filter(not($"label" === $"prediction")).count()
val truep = lp.filter($"prediction" === 0.0).filter($"label" === $"prediction").count()
val falseN = lp.filter($"prediction" === 0.0).filter(not($"label" === $"prediction")).count()
val falseP = lp.filter($"prediction" === 1.0).filter(not($"label" === $"prediction")).count()
val ratioWrong=wrong.toDouble/counttotal.toDouble
val ratioCorrect=correct.toDouble/counttotal.toDouble

 counttotal: Long = 199
correct: Long = 168
wrong: Long = 31
truep: Long = 128
falseN: Long = 30
falseP: Long = 1
ratioWrong: Double = 0.15577889447236182
ratioCorrect: Double = 0.8442211055276382

**// use MLlib to evaluate, convert DF to RDD**
val  predictionAndLabels =predictions.select("rawPrediction", "label").rdd.map(x => (x(0).asInstanceOf[DenseVector](1), x(1).asInstanceOf[Double]))
val metrics = new BinaryClassificationMetrics(predictionAndLabels)
println("area under the precision-recall curve: " + metrics.areaUnderPR)
println("area under the receiver operating characteristic (ROC) curve : " + metrics.areaUnderROC)
**// A Precision-Recall curve plots (precision, recall) points for different threshold values, while a receiver operating characteristic, or ROC, curve plots (recall, false positive rate) points. The closer  the area Under ROC is to 1, the better the model is making predictions.**
 area under the precision-recall curve: 0.9828385182615946
area under the receiver operating characteristic (ROC) curve : 0.9926910299003322

Want to learn more?

In this blog post, we showed you how to get started using Apache Spark’s machine learning Logistic Regression for classification. If you have any further questions about this tutorial, please ask them in the comments section below.


This blog post was published October 17, 2016.
Categories

50,000+ of the smartest have already joined!

Stay ahead of the bleeding edge...get the best of Big Data in your inbox.


Get our latest posts in your inbox

Subscribe Now