Home > Archived > Examples and Tutorials > Tensorflow LSTM > TensorFlow LSTM

TensorFlow LSTM

In this tutorial, we’ll create an LSTM neural network using time series data ( historical S&P 500 closing prices), and then deploy this model in FastScore. The model will be written in Python (3) and use the TensorFlow library. An excellent introduction to LSTM networks can be found on Christopher Olah’s blog.

Contents

  1. The Dataset
  2. Generating Training Data
  3. Implementing the Model in TensorFlow
  4. Preparing the Model for Deployment
  5. Deploying the Model in FastScore

The Dataset

The model we’re building will predict the closing price of the S&P 500, based on previous days’ closing prices. So, the primary dataset will be a collection of daily S&P closing prices (download it here). Let’s take a look at this dataset.

First, import the necessary libraries:

import pandas as pd
import matplotlib.pyplot as plt
import random
import datetime
import numpy as np
%matplotlib inline

Then, let’s read in the data, make sure every price is indexed by date, and plot it:

sp500 = pd.read_csv('sp500.csv')

# Convert to Date
sp500['Date'] = pd.to_datetime(sp500['Date'])
sp500 = sp500.set_index('Date')

plt.figure(figsize=(15, 8))
plt.title('S&P 500 Daily Close 1950--2017')
plt.plot(sp500.index, sp500['Close'])

S&P 500 Closing Prices, 1950--2017

There’s a small problem with this dataset: the price is expressed in dollars, and the value of the dollar has changed over time. A reasonable proxy for the dollar value is the consumer price index—let’s rescale our S&P 500 closing prices by this data. The CPI dataset can be downloaded here.

cpi = pd.read_csv('CPIAUCSL.csv')

cpi['DATE'] = pd.to_datetime(cpi['DATE'])
cpi = cpi.set_index('DATE')

plt.figure(figsize=(15, 8))
plt.title('CPI 1945--2017')
plt.plot(cpi.index, cpi['CPIAUCSL'])

CPI 1945--2017

The CPI data is only reported monthly, whereas our S&P 500 data is reported daily. To estimate the CPI on a particular day, linearly interpolate between the values:

cpi = cpi.resample('B').asfreq().interpolate()

Now, we’re ready to rescale by the CPI, and drop any missing values from our dataset:

sp500['Normalized Close'] = sp500['Close']/cpi['CPIAUCSL']
sp500 = sp500.dropna() # drop any missing values

Let’s split the dataset up into training and test data by picking a date cutoff. For this example, we’ll pick June 1, 2007:

split_date = datetime.date(2007, 6, 1)

training_data = sp500[:split_date]
test_data = sp500[split_date:]

A further normalization step we can perform for time-series data is to subtract off the general linear trend (which, for the S&P 500 closing prices, is generally positive, even after rescaling by the CPI). This is analogous to recentering non-time-series data. The linear trend is easily computed with scikit-learn:

from sklearn.linear_model import LinearRegression

dates_to_array = lambda df: (df.index - df.index.min())/np.timedelta64(1, 'D')

sp500_linear_training_data = training_data['Normalized Close']
sp500_linear_training_dates = dates_to_array(sp500_linear_training_data)
linear_X = np.array(sp500_linear_training_dates).reshape(-1, 1)
linear_y = np.array(sp500_linear_training_data).reshape(-1, 1)

linear_reg = LinearRegression()
linear_reg = linear_reg.fit(linear_X, linear_y)

Here’s a plot of the CPI-normalized data, the positive linear trend, and the train/test split date:

linear_preds = linear_reg.predict(np.array(dates_to_array(sp500)).reshape(-1, 1))[:,0]

plt.figure(figsize=(15, 8))
plt.title('S&P 500 CPI-Normalized Daily Close 1950--2017')
plt.plot(sp500.index, sp500['Normalized Close'], label='Normalized Close')
plt.plot(sp500.index, linear_preds, label='Linear Regression')
plt.axvline(x=split_date, color='r')
plt.text(split_date, 2, " train/test split")
plt.legend()

CPI-Normalized S&P 500 Daily Close 1950--2017

Generate our actual train and test data by subtracting off the linear regression’s predictions:

sp500['Adjusted Normalized Close'] = sp500['Normalized Close'] - linear_preds

A plot of the train and test data is below.

plt.figure(figsize=(15, 8))
plt.title('S&P 500 Adjusted CPI-Normalized Daily Close 1950--2017')
plt.plot(sp500.index, sp500['Adjusted Normalized Close'], label='Adjusted Normalized Close')
plt.plot(sp500.index, [0 for i in range(0, len(sp500))], label='y = 0')
plt.axvline(x=split_date, color='r')
plt.text(split_date, 3, " train/test split")
plt.legend()

S&P 500 Adjusted CPI-Normalized Daily Close 1950--2017

Generating Training Data

A standard approach when building a time-series model is to choose training instances consisting of randomly selected sequences of fixed length, and the target sequence for an input sequence is that same sequence shifted by one time step into the future. For example, if the time series consists of values [1, 2, 3, 4, 5], and we are training a model to predict the next item in the series from the previous 3 items, our model should map the input [2, 3, 4] to the output [3, 4, 5].

Since we’re only interested in predicting the adjusted CPI-normalized closing price, let’s rebind our training and test data variables:

training_data = sp500[:split_date]['Adjusted Normalized Close']
test_data = sp500[split_date:]['Adjusted Normalized Close']

We’ll need to generate batches of such training inputs from our time series data, so let’s define a function to do so:

def make_training_sequences(array, batch_size, n_steps = 30, seed = 2017):
    """
    Create batches of training data for the model. Sequences are time-ordered.
    """
    X_batch, y_batch = [],[]
    size = len(array)
    random.seed(seed)
    indices = sorted(random.sample(range(0, size - n_steps - 1), batch_size))
    for start in indices:
        x_slice = array[start:start + n_steps]
        y_slice = array[start + 1: start + n_steps + 1]
        X_batch.append(x_slice)
        y_batch.append(y_slice)
    return np.array(X_batch).reshape(-1, n_steps, 1), np.array(y_batch).reshape(-1, n_steps, 1)

This function accepts several inputs:

Implementing the Model in TensorFlow

With our input data scaled and normalized, and a function defined to generate training data, we’re now ready to build our model. Begin by importing TensorFlow:

import tensorflow as tf

Let’s specify the parameters of our model:

n_steps = 30
n_inputs = 1
n_outputs = 1
n_layers = 3
n_units = 100
save_name = 'tf_sp500_lstm'

These variables control the topology of our neural net:

Now, let’s initialize the model’s graph:

graph = tf.Graph()

with graph.as_default() as g:
    X = tf.placeholder(tf.float32, [None, n_steps, n_inputs])
    y = tf.placeholder(tf.float32, [None, n_steps, n_outputs])

    layers = [tf.contrib.rnn.BasicLSTMCell(num_units = n_units) for k in range(n_layers)]

    cell = tf.contrib.rnn.OutputProjectionWrapper(
      tf.contrib.rnn.MultiRNNCell(layers), output_size = n_outputs)

    outputs, states = tf.nn.dynamic_rnn(cell, X, dtype=tf.float32)

    saver = tf.train.Saver()

We’ll train the model with the mean squared error (MSE) as our loss function, using the Adam optimizer. The other parameters for the training process are the learning rate, number of iterations, batch size, and the random seed for the make_training_sequences function.

seed = 2017
learning_rate = 0.001
n_iterations = 1500
batch_size = 100

with graph.as_default() as g:
    tf.set_random_seed(seed)
    np.random.seed(seed)
    loss = tf.reduce_mean(tf.square(outputs - y))
    optimizer = tf.train.AdamOptimizer(learning_rate = learning_rate)
    training_op = optimizer.minimize(loss)

    init = tf.global_variables_initializer()

    with tf.Session(graph = graph) as sess:
        init.run()
        for iteration in range(n_iterations):
            X_batch, y_batch = make_training_sequences(training_data,
                                batch_size, n_steps, seed + iteration)
            sess.run(training_op, feed_dict = {X: X_batch, y: y_batch})
            if iteration % 100 == 0:
                mse = loss.eval(feed_dict = {X: X_batch, y: y_batch})
                print(iteration, "\tMSE:", mse)
        saver.save(sess, './{}'.format(save_name))

With the settings above on a moderately powerful machine, the training process takes ~5 minutes, and the model converges to a MSE of about 0.0013. The trained model is saved to tf_sp500_lstm.

Generating predictions with the model is straightforward:

def predict(inputs):
    """
    Make predictions based on the input inputs. If len(inputs) = n_steps, makes one prediction (the next item in the series).
    If len(inputs) < n_steps, doesn't make any predictions. If len(inputs) > n_steps, makes len(inputs) - n_steps + 1 predictions.
    """
    if len(inputs[0,:,0]) < n_steps:
        raise Exception("Insufficient inputs! X should be an array of shape (1, k, 1) where k >= n_steps")

    with tf.Session(graph = graph) as sess:
        saver.restore(sess, './{}'.format(save_name))

        preds = []
        steps_forward = len(inputs[0,:,0]) - n_steps + 1
        for j in range(0, steps_forward):
            X_in = inputs[:,j:j+n_steps,:]
            y_pred = sess.run(outputs, feed_dict = {X: X_in})
            preds.append(y_pred[0,-1,0]) # add the last one
    return np.array(preds).reshape(-1, len(preds), 1)

Let’s define a function to plot the model’s predictions:

def plot_predict_date(date, data, steps_forward = 1):
    in_data = data[date:].iloc[0:n_steps + steps_forward - 1]
    true_data = in_data.iloc[n_steps - 1:]
    X_in = np.array(in_data).reshape(-1, len(in_data), 1)
    y_pred = predict(X_in)

    fig = plt.figure(figsize=(15, 8))
    plt.title("Predicted and Observed")
    plt.plot(true_data.index, true_data, label="Observed")
    mse = np.square(y_pred[0,:,0] - true_data).mean()
    plt.plot(true_data.index, y_pred[0, :, 0], label="Predicted (MSE: {})".format(mse))
    plt.legend(loc = "lower right")
    plt.xlabel("Date")
    plt.ylabel("Adjusted Price")

Here’s what the predictions look like for the first 120 business days after July 1, 2007:

plot_predict_date(datetime.date(2007,7,1), sp500['Adjusted Normalized Close'], steps_forward = 120)

Predicted vs Observed, July 1 2007

We can also see how we did against all of the test data (the dates after June 1, 2007, denoted by split_date).

test_steps = len(sp500[split_date:])
plot_predict_date(split_date, sp500['Adjusted Normalized Close'], steps_forward = test_steps)

Predicted vs Observed, Test Data

We’ve achieved a MSE of 0.00028 on our test data, which is pretty good given that the model was only trained on pre-crisis data.

Preparing the Model for Deployment

Deploying our trained model in FastScore is straightforward. A FastScore Python model script must define an action method, and may additionally define begin and end methods for intialization/uninitialization, as well as any user-defined functions or imports needed.

For initialization, we’ll define a begin function that creates all of the same TensorFlow variables we constructed for model training, and initializes our session:

def begin():
    tf.logging.set_verbosity(tf.logging.WARN)

    global past_days
    past_days = []

    global n_steps, X, outputs, sess
    n_steps = 30
    n_inputs = 1
    n_outputs = 1
    n_layers = 3
    n_units = 100
    save_name = 'tf_sp500_lstm'

    X = tf.placeholder(tf.float32, [None, n_steps, n_inputs])
    y = tf.placeholder(tf.float32, [None, n_steps, n_outputs])
    layers = [tf.contrib.rnn.BasicLSTMCell(num_units = n_units) for k in range(n_layers)]
    cell = tf.contrib.rnn.OutputProjectionWrapper(
      tf.contrib.rnn.MultiRNNCell(layers), output_size = n_outputs)
    outputs, states = tf.nn.dynamic_rnn(cell, X, dtype=tf.float32)
    saver = tf.train.Saver()
    sess = tf.Session()
    saver.restore(sess, './{}'.format(save_name))

Using the global keyword makes all of these variables accessible later in other methods. We also have a past_days variable—when deployed, this model will receive individual prices as inputs, and we’ll accumulate the last 30 of them in past_days. In contrast to the model training code, we’re not using context management to control our TensorFlow session—instead, the session is manually started in the begin method, and we’ll manually close it at the end of the model run with the end method:

def end():
    sess.close()

Next, let’s copy our predict method over from the model testing, and make some slight modifications:

def predict(inputs):
    if len(inputs[0,:,0]) < n_steps:
        raise Exception("Insufficient inputs! X should be an array of shape (1, k, 1) where k >= n_steps")

    preds = []
    steps_forward = len(inputs[0,:,0]) - n_steps + 1
    for j in range(0, steps_forward):
        X_in = inputs[:,j:j+n_steps,:]
        y_pred = sess.run(outputs, feed_dict = {X: X_in})
        preds.append(y_pred[0,-1,0]) # add the last one
    return np.array(preds).reshape(-1, len(preds), 1)

The predict method has been slightly simplified: we no longer restore the session from the save file (it is already restored and opened in the begin method), but is otherwise identical.

Finally, the action method is the hook that the FastScore engine uses to produce scores. It will take as input a closing price, and produce as output the prediction for the next day’s closing price.

def action(x):
    global past_days
    past_days.append(x)
    past_days = past_days[-n_steps:]
    if len(past_days) == n_steps:
        result = predict(np.array(past_days).reshape(-1, n_steps, 1))[0, 0, 0]
        yield np.asscalar(result)

Because the TensorFlow model uses 30 days’ worth of prices to make predictions, we need to keep track of the last 30 inputs received. (Recall that the 30 days is determined by the n_steps variable). Tracking the previous inputs is handled by past_days, which was initialized in the begin method to an empty list. As written, the model will only produce output when it has accumulated enough inputs to make a prediction. The result is cast to a standard Python float ( from NumPy’s float32) to ensure compatibility with data serialization (e.g., JSON).

The inputs and outputs of this model will be Avro doubles, so let’s add smart comments to the top of the model code to indicate this. We’ll also be manually controlling the version of TensorFlow we install, so we’ll use the fastscore.module-attached smart comment to indicate that the engine should not attempt to install TensorFlow automatically. In total, our FastScore-ready model is:

# fastscore.schema.0: double
# fastscore.schema.1: double
# fastscore.module-attached: tensorflow

import tensorflow as tf
import numpy as np

def begin():
    tf.logging.set_verbosity(tf.logging.WARN)

    global past_days
    past_days = []

    global n_steps, X, outputs, sess
    n_steps = 30
    n_inputs = 1
    n_outputs = 1
    n_layers = 3
    n_units = 100
    save_name = 'tf_sp500_lstm'

    X = tf.placeholder(tf.float32, [None, n_steps, n_inputs])
    y = tf.placeholder(tf.float32, [None, n_steps, n_outputs])
    layers = [tf.contrib.rnn.BasicLSTMCell(num_units = n_units) for k in range(n_layers)]
    cell = tf.contrib.rnn.OutputProjectionWrapper(
      tf.contrib.rnn.MultiRNNCell(layers), output_size = n_outputs)
    outputs, states = tf.nn.dynamic_rnn(cell, X, dtype=tf.float32)
    saver = tf.train.Saver()
    sess = tf.Session()
    saver.restore(sess, './{}'.format(save_name))

def predict(inputs):
    if len(inputs[0,:,0]) < n_steps:
        raise Exception("Insufficient inputs! X should be an array of shape (1, k, 1) where k >= n_steps")

    preds = []
    steps_forward = len(inputs[0,:,0]) - n_steps + 1
    for j in range(0, steps_forward):
        X_in = inputs[:,j:j+n_steps,:]
        y_pred = sess.run(outputs, feed_dict = {X: X_in})
        preds.append(y_pred[0,-1,0]) # add the last one
    return np.array(preds).reshape(-1, len(preds), 1)

def action(x):
    global past_days
    past_days.append(x)
    past_days = past_days[-n_steps:]
    if len(past_days) == n_steps:
        result = predict(np.array(past_days).reshape(-1, n_steps, 1))[0, 0, 0]
        yield np.asscalar(result)

def end():
    sess.close()

Note that we’ll additionally need to bundle the tf_sp500_lstm TensorFlow session checkpoints with our model in order for it to execute in FastScore; to do this, save the checkpoint as a model attachment:

tar czvf attachment.tar.gz tf_sp500_lstm*

Deploying the Model in FastScore

Finally, let’s deploy our FastScore-ready model into the FastScore engine. First, we’ll need to create a custom engine image with the specific environment we used to create this model. Create a text file called requirements.txt, and include the following:

requirements.txt

numpy==1.13.3
pandas==0.20.2
tensorflow==1.4.0

Adjust the specific versions of the libraries to match the versions in your working environment. (Use pip freeze | grep <library-name> to find out what version of a library you have installed.)

Next, let’s build a custom Docker container image from the base engine image:

Dockerfile

FROM fastscore/engine:1.6.1
ADD ./requirements.txt .
RUN pip3 install --isolated -r requirements.txt

We’re starting with the 1.6.1 FastScore engine image, and then installing the the libraries listed in requirements.txt.

To build the container image, just run

docker build -t localrepo/engine:tensorflow .

from within the same directory as the Dockerfile and requirements.txt file.

All that’s needed to add a custom TensorFlow-ready engine to your fleet is to run the engine container and update your FastScore fleet’s configuration to include the new engine. If you’re starting from scratch, we have prepared a Docker-Compose file together with some shell scripts to build a full FastScore fleet with the custom container image. Download the files here.

To add our model to Model Manage, you may directly upload the files using the Dashboard, or run the following commands with the CLI:

fastscore schema add double double.avsc
fastscore model add -type:python3 tf_sp500_lstm tf_sp500_lstm.py
fastscore attachment upload tf_sp500_lstm attachment.tar.gz
fastscore stream add rest rest.json

where double.avsc is the Avro schema file

double.avsc

{"type": "double"}

and rest.json is the stream descriptor:

rest.json

{
  "Transport": "REST",
  "Encoding": "json"
}

Note that we have to add the -type:python3 option when adding our model to Model Manage (otherwise, the engine will assume it is a Python 2 model).

Next, deploy the model with the following commands:

fastscore model load tf_sp500_lstm
fastscore stream attach rest 1
fastscore stream attach rest 0

The numbers in the stream attach command indicate the slot number to use for that stream: 0 is the default input stream, and 1 is the default output stream. Models with multiple data sources or multiple outputs may take advantage of multiple stream slots, but as designed our model only takes in data from a single source and produces a single output stream.

Now the model is exposed on the engine’s REST endpoint, and can be used for scoring. Use the fastscore model input command to deliver inputs to the model, and fastscore model output to retrieve outputs (for asynchronous REST scoring). In this case, the model is configured to use asynchoronous REST because it needs to receive 30 inputs before it can start producing output.

Download scripts here to demonstrate how to produce scores with this model.

Happy scoring!

Further Reading