Streaming Predictive Maintenance for IoT using TensorFlow - Part 2

Contributed by

19 min read

Editor's Note: Read part 1 of this post here.

This is Part 2 of a two-part series that describes how to apply a recurrent neural network (RNN) for time series prediction on real-time data generated by a sensor attached to a device on a manufacturing assembly line. I describe the entire workflow of transforming the data, performing data exploration, training and testing a model, and then deploying that model on new data in real time.

Part 2

The value of data is determined by how it is used; focusing too much on how it is ingested and stored misses the real value of what could be achieved.

Before joining MapR, when I analyzed data, once I had performed my analysis and gained insight from data, I was never able to take the next step and deploy this insight. I wrote a report, sent an email, or presented some slides, but didn't get to dig in as much as I wanted. This frustrating situation was largely due to the fact that the data architecture I was working on made it very difficult to deploy models in a reasonable, effective, time efficient manner.

The MapR Data Platform removes this barrier because the MapR engineers who built it understand data utilization is just as important as data capture and storage. Working on MapR, all my development is done on the platform. All the data and tools I use are on the platform. I don't have to worry about accessing multiple clusters to perform different tasks just to get my model to run on new data. When I want to take my work live, the platform has the built-in capabilities to deploy on real-time data, and there are many ways to do so.

In Part 2 of this blog, I will describe how to use the TensorFlow model we developed in Part 1 to score new data as it arrives in a data stream.

The model in Part 1 was designed to process data recorded by a SCARA robot. Since I don't have a SCARA robot arm handy, I had to come up with another way to get new data to feed into our model in real time. So I wrote a Python script that generates synthetic data that looks like real sensor data. This generator sends data to a MapR stream.

But before our generator can generate and send data to a stream, the stream has to be created. In addition, we need to create a topic in that stream to hold the data. The stream is the actual file-system object that holds all of the messages that we will send. Within a stream, there are topics, each with a name. Each such topic contains messages, possibly billions of them. With MapR, there can be thousands or millions of topics within a single stream, and there are can be a huge number of streams in a single cluster.

I used a stream named iot_stream. To create it, I used this command:

[user01@maprdemo ~]$ maprcli stream create -path /user/user01/iot_stream -produceperm p -consumeperm p -topicperm p

The options on the stream's create command set up the stream so that that anybody (i.e., 'p' for public) can produce or consume messages or create topics.

In this demo, I used a single topic called sensor_record to store all the data, and I created that topic using this command:

[user01@maprdemo ~]$ maprcli stream topic create -path /user/user01/iot_stream -topic sensor_record

Now that the stream and topic have been created, let's take a look at the producer script. In part 1 of this series, I put 10 .xml files into a directory called rw_XML_stream.

The Producer Script

The producer script goes through this directory and opens each .xml file. In each file, it identifies each timestamp, performs some transformations, and then sends the timestamp data into the stream by writing a Python dict in JSON form. This script is on my GitHub page under Sensor_XML2MaprStreams_producer.py.

Let's look at the producer script more closely. At the top are all the dependencies for the scripts. One unusual thing to note is that the mapr_kafka_rest import refers to a package that you need to download explicitly into the same directory as the producer script. Make sure you have the MapR Kafka Rest API script in the same location as your current working directory.

import mapr_kafka_rest
import random
import time
import datetime
import os
import xml.etree.ElementTree as ET
import pandas as pd
import glob

The ElementTree XML API allows Python to process xml data and makes it easy to convert data from xml to a dataframe.

Define the topic and stream objects:

sensor_topic_name  = '/user/user01/iot_stream:sensor_record'
path = '/user/user01/rw_XML_stream'

To convert input files to a data frame, we can iterate through the document, collecting a list of records. (I found this method on stack overflow, by the way, and can't tell who the original author is.)

def xml2df(xml_file):
    root = ET.XML(xml_file) # element tree
    all_records = [] #Our record list which we will convert into a dataframe
    for i, child in enumerate(root): #Begin looping through our root tree
        record = {} #Place holder for our record
      #iterate through the sub layers of reach record
        for subchild in child:
        #Extract the text create a new dictionary key, value pair
            record[subchild.tag] = subchild.text
            all_records.append(record) #Append this record to all_records.
    return pd.DataFrame(all_records) #return records as DataFrame

I will use this function to perform the conversion into a dataframe row that will then be passed to an empty Python dictionary that will be the format sent to our stream. I will keep a count of the records created for our own data management.

records = 0
for xml_filename in glob.glob(path+"/*.xml"):
    f = open(xml_filename).read()
    df = xml2df(f).drop_duplicates().reset_index(drop=True).sort_values(['TimeStamp'], ascending=True)
    df['TagValue']=df.TagValue.astype(float).fillna(0)
    df = df.pivot(index='TimeStamp', columns='TagName', values='TagValue').fillna(0).rename_axis(None, axis=1).reset_index()
    df['filename'] = xml_filename

    for index,row in df.iterrows():
        records +=1
        sensor_df_record = {}
        sensor_df_record['filename']=row["filename"]
        sensor_df_record['TimeStamp']=row['TimeStamp']
        �..


        response = mapr_kafka_rest.post_topic_message(sensor_topic_name, sensor_df_record)
        if (response.status_code == 200):
            print("POSTED: " + str(sensor_df_record))
        else:
            print('ERROR: %d "%s"' % (response.status_code,response.reason))
time.sleep(0.25)

This loop has been truncated, but you can find the entire program on GitHub. The producer is going through each file, converting it from xml to a dataframe row. Then it takes that row and passes it to a dictionary with a key value. All the keys are data points collected at each timestamp. When all the keys have a value, the Kafka Rest API feeds the dictionary into the stream as a message destined for the designated topic.

To run this script from the command line, use the following:

[user01@maprdemo ~]$ python Sensor_XML2MaprStreams_producer.py

The output will look something like this as the data is fed into the stream:

Data is fed into the stream

The Consumer Script

So now the new data is streaming into the iot_stream stream under the topic sensor_record, and the messages are waiting for a consumer to read the messages. A consumer can be a variety of things. It can be another file that is appending the data. It can be a dashboard visualizing the data as it arrives. It can be a model that is being applied to the data. It can be all three at once.

The consumer will write the new data to file in MapR-FS, will apply the RNN model that was saved, and display both the actual and predicted values in a dashboard for real-time monitoring and predictive maintenance. I used Plotly as a visualization dashboard, but other dashboard tools like Shiny and Graphana are also great tools.

Before setting up the consumer, set up a Plotly account if you are going to use that for your visualization. Refer to the README file on GitHub for details.

The name of the consumer script is Stream_IoT_Prediction_Dashboard_plotly.py. Look at the consumer more closely. After loading the package dependencies, the first function is a conversion function.

def sensor_conversion(record):
    sensor_frame = pd.DataFrame()
    sensor_frame = sensor_frame.append(record,ignore_index=True)
    sensor_frame['TimeStamp']= pd.to_datetime(sensor_frame['TimeStamp'])#.dt.strftime('%Y-%m-%d %H:%M:%S.%f')
    sensor_frame.sort_values(['TimeStamp'], ascending=True)
    sensor_frame['Total']=sensor_frame.select_dtypes(include=['float64','float32']).apply(lambda row: np.sum(row),axis=1)
    if (not os.path.isfile("IoT_Data_From_Sensor.csv")):   
            sensor_frame.to_csv("IoT_Data_From_Sensor.csv")     
        #if csv is not there, create it
    else:
        with open('IoT_Data_From_Sensor.csv', 'a') as newFile:
            newFileWriter = csv.writer(newFile)
            newFileWriter.writerow(sensor_frame.tail(1))   
        #if csv is there, append new row to file
return (sensor_frame)

This is reading the message from the stream and converting it to a row in a dataframe that was created. It is also looking to write to a file in MapR-FS. If that file is not there, it will be created at the first message. If it is there, the row will be appended to that file. The file created is "IoT_Data_From_Sensor.csv."

Next, load the saved RNN model:

def rnn_model(array, num_periods):
    x_data = array.reshape(-1,num_periods,1)
    #print (x_data)
    tf.reset_default_graph()   

    inputs = 1            #number of vectors submitted
    #number of neurons we will recursively work through,
    #can be changed to improve accuracy
    hidden = 100          
    output = 1            #number of output vectors
    #create variable objects
    X = tf.placeholder(tf.float32, [None, num_periods, inputs], name = "X")   
    y = tf.placeholder(tf.float32, [None, num_periods, output], name = "y")
    #create our RNN object
    basic_cell = tf.contrib.rnn.BasicRNNCell(num_units=hidden, activation=tf.nn.relu)   
    #choose dynamic over static
    rnn_output, states = tf.nn.dynamic_rnn(basic_cell, X, dtype=tf.float32)               
    #small learning rate so we don't overshoot the minimum
    learning_rate = 0.001   
    #change the form into a tensor
    stacked_rnn_output = tf.reshape(rnn_output, [-1, hidden])           
    #specify the type of layer (dense)
    stacked_outputs = tf.layers.dense(stacked_rnn_output, output)        
    #shape of results
    outputs = tf.reshape(stacked_outputs, [-1, num_periods, output])          
    #define the cost function which evaluates the quality of our model
    loss = tf.reduce_sum(tf.square(outputs - y))    
    #gradient descent method
    optimizer = tf.train.AdamOptimizer(learning_rate=learning_rate)           
    #train the result of the application of the cost_function                    
    training_op = optimizer.minimize(loss)

    init = tf.global_variables_initializer()      #initialize inputs
    saver = tf.train.Saver()                      #specify saver function
    #directory where trained TF model is saved
    DIR="/user/user01/TFmodel"                  

    #start a new tensorflow session
    with tf.Session() as sess:  
        #restore model                           
        saver.restore(sess, os.path.join(DIR,"IoT_TF_model-1000"))
        #load data from streams
        y_pred = sess.run(outputs, feed_dict={X: x_data})  
        #only print out the last prediction, which is for next period    
        FORECAST = y_pred[:,(num_periods-1):num_periods]       
return (FORECAST)

This function will load the saved model. Many of the details are the same from the Jupyter notebook; however, this session is loading the check-pointed model from epoch 1000. The output is the prediction for the next time period in the sequence.

The next function processes the messages from the stream and calls the conversion and RNN functions to be performed.

def stream_data(topic_name, max):
    topic_partition = 0
    topic_offset = 0
    min_offset = 1
    df = pd.DataFrame()
    #df.append(df)
    total_list_for_RNN = []
    num_periods = 100  #number of periods entered into batch
    for i in range(0,max):
        topic_offset = min_offset + i
        message = mapr_kafka_rest.get_topic_message(topic_name, topic_partition, topic_offset)
        # Reconstitue the dataframe record by unpacking the message...
        msg_as_list  = json.loads(message)
        json_as_dict = msg_as_list[0]
        df_record = json_as_dict['value']
        df = df.append(sensor_conversion(df_record),ignore_index=True)
        df['TimePeriod'] = df.index + 1
        if len(df) < num_periods:
            x1 = df["TimePeriod"].iloc[-1]
            y1 = int(df["Total"].iloc[-1])
            x2 = df["TimePeriod"].iloc[-1] + 1
            y2 = 0
            s_1.write(dict(x=x1,y=y1))
            s_2.write(dict(x=x2,y=y2))
            total_list_for_RNN.append((df["Total"].iloc[-1]))
        else:
            total_list_for_RNN.append((df["Total"].iloc[-1]))
            total_metric_array = np.array(total_list_for_RNN)
            predicted_value = rnn_model(total_metric_array, num_periods)
            x1 = df["TimePeriod"].iloc[-1]
            y1 = int(df["Total"].iloc[-1])
            x2 = df["TimePeriod"].iloc[-1] + 1
            y2 = int(predicted_value)
            s_1.write(dict(x=x1,y=y1))
            s_2.write(dict(x=x2,y=y2))
            #predicted_metric_list.append(int(predicted_value))
            print ("Next timestamp aggregate metric prediction: " + str(predicted_value))
            if (predicted_value < 450) or (predicted_value > -200) :
                print ("Forecast does not exceed threshold for alert!\n")
            else:
                print ("Forecast exceeds acceptable threshold - Alert Sent!\n")
del total_list_for_RNN[0]

There is a lot going on here. The data is read from the stream and the sensor conversion function is performed. A batch size of 100 is generated before implementing the RNN model, so the data waits until 100 timestamps have come through before it then feeds the data. The s_1 and s_2 variables are the streaming variables that will be displaying the data on the Plotly dashboard.

One of the great things about MapR Event Store is that a consumer can easily be a producer as well. The last instructions of this function state that if the prediction exceeds the threshold values determined by historical behavior, an alert will be generated. This alert will print to your console, but I could have set up a stream:topic object that sent this alert to someone's email, or to their cell phone via a text. MapR Event Store is an extremely versatile tool that enables the flow of information to be enhanced by analytics and is accessible in a variety of ways.

Declare the stream variables, and specify the max number of messages:

IoT_Demo_Topic = '/user/user01/iot_stream:sensor_record'
max = 402
DIR="/user/user01/TFmodel"

The next portion of the script is for the visualization. Visualization is a key component to this solution, as it will help monitor and highlight the activity of the mechanism that a sensor is attached to. I chose Plotly for this because it integrates well with my existing Python code and Plotly has a built-in streaming functionality. The dashboard will open in a browser once I have logged into Plotly and can be customized to include more data feeds than the single one I am depicting.

stream_tokens = tls.get_credentials_file()['stream_ids']
# I'm getting my stream tokens from the end to ensure I'm not reusing tokens
token_1 = stream_tokens[0]   
token_2 = stream_tokens[1]

stream_id1 = dict(token=token_1, maxpoints=60)
stream_id2 = dict(token=token_2, maxpoints=60)

trace1 = go.Scatter(x=[],y=[],mode='lines',
                    line = dict(color = ('rgb(22, 96, 167)'),width = 4),
                    stream=stream_id1,
                    name='Sensor')

trace2 = go.Scatter(x=[],y=[],mode='markers',
                    stream=stream_id2,
                    marker=dict(color='rgb(255, 0, 0)',size=10),
                    name = 'Prediction')

The stream tokens are the tokens you set up in your Plotly account. I have two: one for the actual data and one for my predicted values. The trace variables are the two time series lines that will overlay on a single graph.

data = [trace1, trace2]
layout = go.Layout(
    images= [
        dict(
            source= "https://raw.githubusercontent.com/JustinBurg/IoT_Predictive_Maintenance_Demo/master/mapr_logo.png",
            xref= "paper",
            yref= "paper",
            x= 0.389,
            y= 1.03,
            sizex= 0.1,
            sizey= 0.1,
            xanchor= "right",
            yanchor= "bottom"),
        dict(
            source= "https://raw.githubusercontent.com/JustinBurg/IoT_Predictive_Maintenance_Demo/master/mapr_logo.png",
            xref= "paper",
            yref= "paper",
            x= 0.80,
            y= 1.03,
            sizex= 0.1,
            sizey= 0.1,
            xanchor= "right",
            yanchor= "bottom")],
    title='Stream from Device 1',
    font=dict(family='Roboto, monospace', size=30, color='white'),
    plot_bgcolor='rgba(8,8,8,1)',
    paper_bgcolor='rgba(8,8,8,1)',
    xaxis=dict(
        domain=[0, 1.00],
        title='Time Period',
        titlefont=dict(family='Roboto, monospace',size=18,color='white'),
        tickfont=dict(family='Roboto, monospace',size=16,color='white')
    ),
    yaxis=dict(
        domain=[0, 1.00],
        title='Measurement',
        titlefont=dict(family='Roboto, monospace',size=18,color='white'),
        tickfont=dict(family='Roboto, monospace',size=16,color='white')
    ))

fig = go.Figure(data=data, layout=layout)
py.iplot(fig,fileopt='overwrite', filename='IoT Predictive Maintenance Demo')

s_1 = py.Stream(stream_id=token_1)
s_2 = py.Stream(stream_id=token_2)

This is where the layout is specified. Two lines on a single graph that overlay on top of each other. At this point, all variables have been declared, and they have been defined. The last step is to issue the statement to initiate all of these actions.

s_1.open()
s_2.open()

while True:
    stream_data(IoT_Demo_Topic,max)
# Close the stream when done plotting
s_1.close()
s_2.close()

Initiate the two streaming tokens, and while those are open, execute the stream_data function that will call the other functions for message conversion and implementation of the TensorFlow RNN model.

To run the script from the command line, use the following:

[user01@maprdemo ~]$ python Stream_IoT_Prediction_Dashboard_plotly.py

Once the command to execute the consumer script is issued, there will be a moment where the dependencies are loaded and then the prediction will begin to be displayed in the console.

Prediction will begin to be displayed in the console

Opening up plotly in a browser, the graph will begin streaming the data, first the actual real-time data, followed by the predictions.

Predictions

In this two-part blog series, I have gone through the lifecycle for this particular use case for real-time predictive maintenance with the following workflow:

  • Identification of the problem
  • Evaluation of the data size and type
  • Data transformation and exploration
  • Model identification, training, testing
  • Model deployment
  • Data visualization

Leveraging a deep learning model was an effective choice to predict extreme events that could indicate degradation of performance. The value offered by this choice is enhanced by the fact the architecture is in place that can effectively apply my model on to new data in real time, not in past tense.

The insight generated by my deep learning model was just a part of the overall solution. How this fits into an organization's data infrastructure involves looking at the big picture. The logistics of machine learning can be just as important as the algorithm (See Ted & Ellen's book on Machine Learning Logistics). Proper logistics will increase the value derived from the insight generated from the data scientists.

Additional thoughts:

  • This implementation can scale to thousands of sensors. This RNN is general enough that it can transfer to other data feeds and be customized to fit, based on need.
  • Other RNN implementations, such as LSTM, would also fit into this pipeline well.
  • TensorFlow is one tool among many (but a great one). I also could have tried MXNet to get to my solution.
  • This was performed on a single node. All components of this can be integrated into a container (see MapR Data Science Refinery) to be deployed on a large cluster or edge cluster.
  • This type of model and framework is very lightweight and deploying on an edge cluster enables an organization to process and analyze data close to the source for quicker response and triage.

This blog post was published March 14, 2018.
Categories

50,000+ of the smartest have already joined!

Stay ahead of the bleeding edge...get the best of Big Data in your inbox.


Get our latest posts in your inbox

Subscribe Now