Streaming Predictive Maintenance for IoT using TensorFlow - Part 1

Contributed by

19 min read

Editor's Note: Read part 2 of this post here.

This is Part 1 of a two-part series that will describe how to apply an RNN for time series prediction on real-time data generated from a sensor attached to a device that is performing a task along a manufacturing assembly line. I will describe the entire workflow of transforming the data, performing data exploration, training and testing a model, and then deploying that model on new data in real time.

Part 1

In Part 1 of this blog, I will describe how to load the data into the environment, determine data type, convert the type, load into PySpark for ETL, then perform data exploration and model building using Python and TensorFlow in a Jupyter notebook.

Data is generated one event at a time. With advances in real-time data capture and streaming architecture, it is now possible to have real-time data monitoring, where an organization can have insight on individual components in real time and insight on all processes in real time. Monitoring still requires active involvement and quick response: for example, an oil well sensor that is indicating increased temperature or volume, or a network traffic for bot-net activity or insider threat. The idea behind predictive maintenance is to determine if failure patterns of various types can be predictable.

If an organization can predict when a piece of hardware will fail accurately, and replace that component before it fails, it can achieve much higher levels of operational efficiency. With many devices now including sensor data and other components that send diagnostic reports, predictive maintenance using big data becomes increasingly more accurate and effective.

Classifying vs. Predicting in IoT

With real-time monitoring, organizations can have insight on individual components and entire processes as they occur.

Data Platform

When trying to identify if there are any irregularities in the data that is being captured, it is necessary to classify what is anomalous versus normal.

anomalous versus normal

Figure 1 shows readings from a sensor attached to a device over thousands of time stamps. There is consistent behavior that falls within an acceptable threshold for the majority of these timestamps, but at random points, there are extreme events or outliers that are anomalous to the device's normal behavior. These extreme events can be classified as anomalous and this type of machine learning can be extremely important when the anomalies are not so apparent: for example, if there are irregular fluctuations on an EKG that could indicate issues with the electrical activity of a person's heart.

Applying a prediction based on the data that has anomalies would be to forecast when these irregular events occur in the future.

Getting Ready

The data and code for this is posted on my GitHub page:

https://github.com/JustinBurg/IoT_Predictive_Maintenance_Demo

Background

The sensor attached to an automated manufacturing device captures position and calibration at each time stamp. The sensor is capturing real-time data on the device and its current position. The data is stored for historical analysis to identify trends and patterns to determine if any devices need to be taken out of production for health checks and maintenance.

Environment and Tools

This workflow was developed using the MapR Sandbox v5.2.1

The tools used were MapR-FS, MapR-Event Streams, PySpark (Spark version 2.1.0), Anaconda Python 3.5, Jupyter Notebook, TensorFlow 1.1, and Plotly (https://plot.ly/) for the visualization.

Data

2,013 .dat files

Importing the Data

Download the data (demo_data.zip) from my GitHub to your desktop and unzip. If you open a single file with a text editor, you will see that although the extension is .dat, the format of the data is xml.

We will be operating as user01 in MapR-FS. Follow the instructions from my earlier blog post to access user01 in Hue.

Once your sandbox is up and running, upload the zipped data into MapR-FS under /user/user01 and rename the directory containing the raw files to "rw_XML_train."

Open a terminal on your machine and ssh into the sandbox using:

ssh user01@localhost -p 2222

We will be doing some preprocessing at the command line to make it easier to ingest and transform our data. In Sandbox, as user01, at command:

Go into the rw_XML_train folder:

[user01@maprdemo ~]$ cd rw_XML_train

Count number of files:

[user01@maprdemo rw_XML_train]$ ls -1 | wc -

The file names have a space in them. Remove the space with the following:

[user01@maprdemo rw_XML_train]$ find $1 -name "* *.dat" -type f -print0 | while read -d $'\0' f; do mv -v "$f" "${f// /_}"; done

Convert file extension from .dat to .xml:

[user01@maprdemo rw_XML_train]$ for f in *.dat; do mv -- "$f" "${f%.dat}.xml";

Create new empty directory:

[user01@maprdemo]$ mkdir rw_XML_stream

Move the last 10 xml files from the rw_XML_train location into rw_XML_stream (you can do this in Hue by selecting the box next to the file name and then actions). These will be used when we deploy our finished model.

Data Transformation

The data is in xml format and looks like this:

xml format

When the data is coming off the sensor, this is how it is being captured. This data is difficult to interpret in its current state, so we want to transform into a dataframe to get better insight. We want to perform this transformation on all the files, so we will use PySpark to do the bulk conversion.

Spark has a package that reads in xml files and can convert to dataframes. We will use this package, and we specify it when we start PySpark from the command line:

[user01@maprdemo ]$ /opt/mapr/spark/spark-2.1.0/bin/pyspark --packages com.databricks:spark-xml_2.10:0.4.1

Pyspark from the command line

Load the following dependencies into PySpark:

import sys
import os
from pyspark.sql import SparkSession
import pyspark.sql.functions as func
from pyspark.sql.functions import *
from pyspark.sql.types import StringType, IntegerType, StructType, StructField, DoubleType, FloatType, DateType, TimestampType
from pyspark.sql.functions import date_format, col, desc, udf, from_unixtime, unix_timestamp, date_sub, date_add, last_day
import time

Import the data into a dataframe and display:

df = spark.read.format('com.databricks.spark.xml').options(rowTag='HistoricalTextData').load('maprfs:///user/user01/rw_XML_train')

df.show()

Importing the data directly from an xml to a dataframe gives us the following:

xml to dataframe

Having processed the data, we now have a dataframe that is thin and long. Each timestamp has 28 metrics associated with it. Our current data indicates that the first 28 rows will be for ts 1, then second 28 rows will be for ts 2 and so on. What would be useful is for each row to be a timestamp and each column to be a metric. Luckily, we can easily perform this transformation in PySpark.

df = df.withColumn("TimeStamp", df["TimeStamp"].cast("timestamp")).groupBy("TimeStamp").pivot("TagName").sum("TagValue").na.fill(0)

Now we have grouped our dataframe by "TimeStamp" and pivoted our data on "TagName," so now the contents of TagName will become columns in our new dataframe. The new dataframe will have a TimeStamp column and 28 columns for each metric for a total of 29 columns. (Note: The pivot function is a nice capability, but it does have a limit of 10,000 columns. If TagName had 20,000 metrics instead of 28, we would have had to split the original dataframe on half the metrics to create two dataframes, pivot those and then join them together.)

Our new dataframe is approximately 82,000 rows, meaning there is historical data on 82,000 timestamps to build our model on. We can now export this dataframe to a csv for data exploration using Python. Since the data is not very big, we can limit it to a single partition making it easier to import into Python.

df.repartition(1).write.csv("maprfs:///user/user01/rw_etl.csv", header=True, sep=",")

Our dataframe is written into MapR-FS in a new directory called "rw_etl." In rw_etl, there is a single .csv file with the partition tag that contains our entire dataframe.

That's it. Only a few lines of PySpark has processed over 2000 files and converted them into a single dataframe. This was performed on a single node with relative ease. This same code could easily have scaled out to tens of thousands of files while leveraging the power of Spark and Yarn. If you wanted to run this as a PySpark job and to print out how long it takes to do the bulk conversion, the code is in GitHub and the command to run it is (all on a single line):

[user01@maprdemo ~]$ /opt/mapr/spark/spark-2.1.0/bin/spark-submit  --packages com.databricks:spark-xml_2.10:0.4.1 /user/user01/Sensor_ETLsparksubmit.py

Data Exploration

We will be using Python 3.5 and Jupyter Notebook for our data exploration and for prototyping our model. If you need instructions for installing both into the Sandbox, I put them on my GitHub page. MapR can also easily integrate with Zeppelin in case that is your preferred environment.

What exactly is the data we are looking at? The sensor is attached to a Selective Compliance Assembly Robot Arm (SCARA) that captures the timestamp and the positioning of the arm at each individual time period. The data collected captures when the SCARA arm is performing some task – for example, tightening screws – and then returns to a ready position. The positions need to be calibrated to make the robot operate at the highest possible efficiency.

SCARA Arm

At the command line in your sandbox:

[user01@maprdemo ]$ mkdir TFmodel
[user01@maprdemo ]$ jupyter notebook

Open a browser and go to:

http://127.0.0.1:9999/

The port setting 9999 (different from the default port) should have been configured when you were installing Anaconda and Jupyter. Once your Jupyter tree page opens, go to New, then choose the Python [conda root] notebook.

Python conda root notebook

Import packages:

import os
import pandas as pd
import csv
import numpy as np
import random
import glob
import matplotlib
import matplotlib.pyplot as plt
import random
get_ipython().magic('matplotlib inline')

Import the data from MapR-FS; do some data cleaning:

df = pd.read_csv("/user/user01/rw_etl.csv/part-00000-45866095-f76d-4f6c-ba2d-a07f0ab2dc04.csv").sort_values(['TimeStamp'], ascending=True).reset_index()

df.drop(['::[scararobot]Ax_J1.PositionCommand','::[scararobot]Ax_J1.TorqueFeedback','::[scararobot]Ax_J2.PositionCommand','::[scararobot]Ax_J2.TorqueFeedback','::[scararobot]Ax_J3.TorqueFeedback','::[scararobot]Ax_J6.TorqueFeedback','::[scararobot]ScanTimeAverage','::[scararobot]Ax_J6.PositionCommand','::[scararobot]Ax_J3.PositionCommand','index'], axis=1, inplace=True)

df['TimeStamp']=pd.to_datetime(df['TimeStamp'])

df.head(5)

Based on my discussions with the customer, I had already identified that I wanted to approach this as a time-series analysis problem. The first thing I wanted to do was plot my data to get an idea of its behavior.


df.plot(x="TimeStamp", y="::[scararobot]Ax_J1.ActualPosition", kind="line")
df.plot(x="TimeStamp", y=["::[scararobot]Ax_J1.ActualPosition","::[scararobot]Ax_J3.TorqueCommand"], kind="line")
df.plot(x="TimeStamp", y=["::[scararobot]CS_Cartesian.ActualPosition[0]","::[scararobot]CS_Cartesian.ActualPosition[1]"], kind="line")

Figure 3

Figure 3 displays plots of some of the metrics captured at each time stamp. We see, historically, the data points fall in some threshold and remains consistent over time. However, there are data points that are outliers and appear to be random. How can we predict when these will occur? What models could we consider?

My background is in econometrics, so for time series data, I have experience with ARIMA models. Auto Regressive Integrated Moving Average models have typically been employed for financial forecasting to predict stock behavior or oil prices. You can account for exogenous market shocks when employing ARIMA models, but tuning ARIMA models can be a challenge when there isn't a trend to the data. For example, stock prices over a specific time will exhibit a trend, either ticking up or down, so while you might not be able to predict specific values, you can forecast trend over a forecast horizon (the number of periods into the future you are forecasting). Our dataset doesn't exhibit any trends. The data falls between two values and those two values are different for all the metrics. It doesn't tick up or down and remains consistent until a random outlier occurs.

The next model considered was a Kalman Filter. The Kalman Filter (KF) uses recursive computation to estimate the previous time periods to predict the future time periods and when doing so, filters out fluctuations in a time series. Necessary assumptions about KF is that it is applied to a linear system (not focusing on Extend KF at the moment) and error measurements can be described as white noise, and the noise is Gaussian in nature. At each time period/state, we can make some assumptions about the position or value of a data point based on previous states and account for the error in our estimation. This is great for modeling the current trajectory and speed of a car going down the road, but when the car comes to a fork in the road, it could go right or left, and if the choice of right or left is not known, the model will give equal weight to both, preventing an accurate estimation of the actual decision.

Looking at the data, the SCARA arm is either at rest or in position. However, it doesn't alternate each position at every other time step. In some instances, it is at rest for multiple time steps. It would be difficult to filter this as the behavior is not linear and adjusting for means will not account for significant outliers that fall outside the historical threshold of normal behavior. The previous models discussed are challenged to capture the behavior of a series where the variance fluctuates widely and randomly with time.

The model chosen to do the prediction was a Recurrent Neural Network (RNN). In a previous blog post, I described the utility of using RNNs for time series forecasting, where the inputs and outputs of the model are sequences of data points with the prediction influenced by previous values. I chose TensorFlow to implement my RNN.

When an outlier occurs in the data, it occurs across all columns in a time stamp. This indicates that when something irregular occurs, it occurs across all measurements. Summing up the values in each row gives a single value to predict and also still provides a threshold of acceptable normal behavior. For the purposes of this demo, we will focus on the single summed data point, but we could also perform RNNs on individual features to have multiple models running to identify one that could be more effective.

Create a new column that will be our feature variable for our model:

df['Total']= df.select_dtypes(include=['float64','float32']).apply(lambda row: np.sum(row),axis=1)

Convert into a time series object:

ts = pd.Series(df['Total'])
ts.plot(c='b', title='RW Total Sensor Aggregation')

Import TensorFlow libraries:

import tensorflow as tf
import shutil
import tensorflow.contrib.learn as tflearn
import tensorflow.contrib.layers as tflayers
from tensorflow.contrib.learn.python.learn import learn_runner
import tensorflow.contrib.metrics as metrics
import tensorflow.contrib.rnn as rnn

Prepare data and inputs for our TF model:

num_periods = 100
f_horizon = 1       #number of periods into the future we are forecasting
TS = np.array(ts)   #convert time series object to an array
print (len(TS))
print (TS[0:10])

Create our training input data set "X":

x_data = TS[:(len(TS)-(len(TS) % num_periods))]
print (x_data[0:5])
x_batches = x_data.reshape(-1, num_periods, 1)
print (len(x_batches))
print (x_batches.shape)

Create our training output dataset "y":

y_data = TS[1:(len(TS)-(len(TS) % num_periods))+f_horizon]
print (y_data)
print (len(y_data))
y_batches = y_data.reshape(-1, num_periods, 1)
print (len(y_batches))

Create our test X and y data:

def test_data(series,forecast,num_periods):
    test_x_setup = series[-(num_periods + forecast):]
    testX = test_x_setup[:num_periods].reshape(-1, num_periods, 1)
    testY = TS[-(num_periods):].reshape(-1, num_periods, 1)
    return testX,testY

X_test, Y_test = test_data(TS,f_horizon,num_periods)
print (X_test.shape)
print (X_test[:,(num_periods-1):num_periods])
print (Y_test.shape)
print (Y_test[:,(num_periods-1):num_periods])

RNN Model using TensorFlow

Number of arrays submitted:

inputs = 1

Number of neurons we will recursively work through. These can be changed to improve accuracy.

hidden = 100

Output Array:

output = 1

Create variable objects:

X = tf.placeholder(tf.float32, [None, num_periods, inputs], name = "X")  
y = tf.placeholder(tf.float32, [None, num_periods, output], name = "y")

Create our RNN object:

basic_cell = tf.contrib.rnn.BasicRNNCell(num_units=hidden, activation=tf.nn.relu)

This example is using a Basic RNN and the results were pretty good. If given enough time, we would utilize RNN variations such as Long Short Term Memory (LSTM) to see if we could improve predictive performance.

Change the array into a tensor:

rnn_output, states = tf.nn.dynamic_rnn(basic_cell, X, dtype=tf.float32)

The tf.nn.dynamic_rnn function handles the recursion capability to pull together the components of the RNN and takes the batches of input sequences and feeds them into the model.

The tensor is reshaped into a two-dimensional tensor and then, where the tf.layers.dense functions, makes it fully connected.

stacked_rnn_output = tf.reshape(rnn_output, [-1, hidden])
stacked_outputs = tf.layers.dense(stacked_rnn_output, output)        

Shape of results:

outputs = tf.reshape(stacked_outputs, [-1, num_periods, output])

A higher learning rate would speed up training time, but we choose a small learning rate, so we don't overshoot the minimum:

learning_rate = 0.001

Define the cost function which evaluates the quality of our model

loss = tf.reduce_sum(tf.square(outputs - y),name='loss')

Back propagation method:

optimizer = tf.train.AdamOptimizer(learning_rate=learning_rate)

Train the result of the application of the cost_function:

training_op = optimizer.minimize(loss)

Initialize all components:

init = tf.global_variables_initializer()

Number of iterations or training cycles, which includes both the FeedFoward and Backpropogation:

epochs = 1000

Specify saving the model and where it will be saved:

saver = tf.train.Saver()  
DIR="/user/user01/TFmodel"

Construct our session object that creates our computational graph, and run our model:

The last value is the last value for our test data X_test of our predicted y_pred array.

with tf.Session() as sess:
    init.run()
    for ep in range(epochs):
        sess.run(training_op, feed_dict={X: x_batches, y: y_batches})
        if ep % 100 == 0:
            mse = loss.eval(feed_dict={X: x_batches, y: y_batches})
            print(ep, "\tMSE:", mse)

    y_pred = sess.run(outputs, feed_dict={X: X_test})
    print(y_pred[:,(num_periods-1):num_periods])
    saver.save(sess, os.path.join(DIR,"IoT_TF_model"),global_step = epochs)

Plot our test y_test data and our y_predicted forecast:

y_test data and our y_predicted forecast

Our model is able to predict the variations in the data as the position of the SCARA arm goes from a ready position to action and back again.

This post has taken you from getting presented with an initial problem, ingesting data, transforming data, and doing data exploration to model training and testing.

Part 2 will explain how the model can be deployed into a streaming application for real-time monitoring, analytics, and visualization.


This blog post was published February 26, 2018.
Categories

50,000+ of the smartest have already joined!

Stay ahead of the bleeding edge...get the best of Big Data in your inbox.


Get our latest posts in your inbox

Subscribe Now