Two stocks, from the same industry, viz. The Home Depot, Inc., HD (NYSE) and Lowe's Companies, Inc., LOW (NYSE) have been chosen. Their six years of traded stock price data (2014-19) has been obtained. Then technical features for these two stocks corresponding to their six year of data have been designed and analyzed.
Post that LSTM Attention Models have been built using the first five years of data (2014-2018) for both the stocks individually. Subsequently, the models have been assessed to check for the predictive power and efficacy.
Google Colab has been used to build the model because of GPU constraints of a standard computer
# Installing yfinance, ta-lib and yellowbrick libraries
!pip install yfinance
!wget https://launchpad.net/~mario-mariomedina/+archive/ubuntu/talib/+files/libta-lib0_0.4.0-oneiric1_amd64.deb -qO libta.deb
!wget https://launchpad.net/~mario-mariomedina/+archive/ubuntu/talib/+files/ta-lib0-dev_0.4.0-oneiric1_amd64.deb -qO ta.deb
!dpkg -i libta.deb ta.deb
!pip install ta-lib
!pip install -U yellowbrick
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
import yfinance as yf
import talib
import datetime
import warnings
warnings.filterwarnings("ignore")
from tensorflow.keras.layers import LSTM, Dense, Dot, Activation, Input
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.models import Model
import tensorflow.keras.backend as K
import tensorflow as tf
# Importing Data
# Stocks chosen are HD and LOW
start = datetime.datetime(2013, 12, 31)
end = datetime.datetime(2019, 12, 31)
df = yf.download('HD', start=start, end=end)
df1 = yf.download('LOW', start=start, end=end)
df.head()
# Visualizing Stock Prices
plt.figure(figsize=(10,6))
df['Adj Close'].plot(label='HD')
df1['Adj Close'].plot(label='LOW')
plt.legend()
plt.title('Stock Price Movememt',fontsize=16)
plt.grid(False);
# Creating Features
# 10 Day return direction has been chosen for prediction
HD = df.copy()
LOW = df1.copy()
stocks = {'HD':HD,'LOW':LOW}
for stock in stocks.values():
features = []
stock.rename(columns={'Adj Close':'price'}, inplace=True)
stock['price_FD10'] = stock['price'].shift(-10)
stock['ret_D10'] = np.log(stock['price']/stock['price'].shift(10))
stock['ret_FD10'] = np.log(stock['price_FD10']/stock['price_FD10'].shift(10))
stock['label'] = np.where(stock['ret_FD10']>=0,1,0)
for i in [10]:
stock['ret_10Dlag'+ str(i)] = stock['ret_D10'].shift(i)
features.extend(['ret_10Dlag'+str(i)])
for i in [28]:
stock['mom_D'+str(i)] = talib.MOM(stock['price'], timeperiod=i)
features.extend(['mom_D'+str(i)])
for i in [14,50,200]:
stock['sma_D'+str(i)] = talib.SMA(stock['price'], timeperiod=i)
stock['ema_D'+str(i)] = talib.EMA(stock['price'], timeperiod=i)
stock['rsi_D'+str(i)] = talib.RSI(stock['price'], timeperiod=i)
stock['cmo_D'+str(i)] = talib.CMO(stock['price'], timeperiod=i)
features.extend(['sma_D'+str(i),'ema_D'+str(i),'rsi_D'+str(i),'cmo_D'+str(i)])
for i in [20]:
stock['bolUP_D'+str(i)], middleband, stock['bolDOWN_D'+str(i)] = talib.BBANDS(stock['price'], timeperiod=i)
stock['bolBandwidth_D'+str(i)] = (stock['bolUP_D'+str(i)] - stock['bolDOWN_D'+str(i)])/middleband
features.extend(['bolUP_D'+str(i),'bolDOWN_D'+str(i),'bolBandwidth_D'+str(i)])
stock['macd'], stock['macdSignal'], _ = talib.MACD(stock['price'])
features.extend(['macd','macdSignal'])
stock.dropna(inplace=True)
target_names = {0:"Down Move",1:"Up Move"}
print(features)
# Sanity Check
np.all(HD.index == LOW.index)
# Function for creating Train and Test Sets
def createTrainTest(stock, features, testSize = 252):
totalRecords = len(stock.index)
test = np.arange(totalRecords - testSize, totalRecords)
train = np.arange(0,test[0])
X_train = stock.loc[stock.index[train],features]
X_test = stock.loc[stock.index[test],features]
y_train = stock.loc[stock.index[train],'label']
y_test = stock.loc[stock.index[test],'label']
return X_train, X_test, y_train.to_numpy().reshape(-1,1), y_test.to_numpy().reshape(-1,1)
# Function for Scaling Data
def scaleTrainTest(X_train, X_test):
from sklearn.preprocessing import RobustScaler
scaler = RobustScaler().fit(X_train)
X_train_scaled = scaler.transform(X_train)
X_test_scaled = scaler.transform(X_test)
return X_train_scaled, X_test_scaled
# Function for creating Data for Deep Learning Network
def createDataLSTM(X_train, X_test, y_train, y_test, n_features, n_days=28, testSize=252):
X_train_LSTM = np.zeros((X_train.shape[0]-n_days+1, n_days, n_features))
X_test_LSTM = np.zeros((testSize, n_days, n_features))
y_train_LSTM = np.zeros((X_train.shape[0]-n_days+1,1))
for i in range(X_train.shape[0]-n_days+1):
X_train_LSTM[i,0:n_days] = X_train[i:i+n_days]
y_train_LSTM[i] = y_train[i+n_days-1]
j = 0
for i in range(testSize):
if i<(n_days-1):
X_test_LSTM[i,0:n_days-1-i] = X_train[X_train.shape[0]-n_days+1+i:]
X_test_LSTM[i,n_days-1-i:] = X_test[0:i+1]
else:
X_test_LSTM[i,0:n_days] = X_test[j:j+n_days]
j = j+1
return X_train_LSTM, X_test_LSTM, y_train_LSTM, y_test
# Function for Feature Scoring and Selection
def featureImportances(X_train,y_train):
from sklearn.ensemble import RandomForestClassifier
from sklearn.ensemble import AdaBoostClassifier
from sklearn.ensemble import GradientBoostingClassifier
from yellowbrick.model_selection import FeatureImportances
rfc = RandomForestClassifier(max_depth=3, n_jobs=-1,random_state=0)
viz1 = FeatureImportances(rfc,relative=False,labels=features)
viz1.fit(X_train, y_train)
viz1.show()
abc = AdaBoostClassifier(n_estimators=100,random_state=0)
viz2 = FeatureImportances(abc,relative=False,labels=features)
viz2.fit(X_train, y_train)
viz2.show()
gbc = GradientBoostingClassifier(max_depth=3,random_state=0)
viz3= FeatureImportances(gbc,relative=False,labels=features)
viz3.fit(X_train, y_train)
viz3.show();
# Function for plotting K-Means Clustering
def plotKMeans(X_train, y_train, cols, axis_labels=None, title=None):
import matplotlib.pyplot as plt
import matplotlib.patches as mpatches
from matplotlib.colors import ListedColormap, BoundaryNorm
x_min, x_max = X_train[:, cols[0]].min() - 1, X_train[:, cols[0]].max() + 1
y_min, y_max = X_train[:, cols[1]].min() - 1, X_train[:, cols[1]].max() + 1
class_names = ['Cluster 1', 'Cluster 2']
color_list = ['#FFFF00', '#00AAFF']
cmap_bold = ListedColormap(color_list)
bnorm = BoundaryNorm(np.arange(0, 3), ncolors=2)
plt.figure(figsize=(7,7))
plt.xlim(x_min, x_max)
plt.ylim(y_min, y_max)
plt.scatter(X_train[:, cols[0]], X_train[:, cols[1]], s=100, c=y_train, cmap=cmap_bold, norm = bnorm, edgecolor='black', lw = 1)
plt.xlabel(axis_labels[0], fontsize=13)
plt.ylabel(axis_labels[1], fontsize=13)
legend_handles = []
for i in range(0, 2):
patch = mpatches.Patch(color=color_list[i], label=class_names[i])
legend_handles.append(patch)
plt.legend(handles=legend_handles, fontsize=13)
plt.title('{}: K-Means Clustering'.format(title),fontdict = {'fontsize':16})
plt.show();
# Function for Calculating Daily Profit and Loss over the Test Set
def calcPnL(model,stock,X_test,y_test,threshold = 0.5,testSize=252, stock_name=None):
totalRecords = len(stock.index)
test = np.arange(totalRecords - testSize, totalRecords)
analysis = pd.DataFrame(data = stock.loc[stock.index[test],'ret_FD10'],index = stock.index[test])
analysis['buyAndHold'] = (stock.loc[stock.index[test],'price']/stock.loc[stock.index[test[0]],'price'])-1.0
analysis['probUP'] = model.predict(X_test)
analysis['betSize'] = np.where(analysis['probUP']>threshold,2*analysis['probUP']-1,0.0)
analysis['dailyP&L'] = analysis['ret_FD10']*analysis['betSize']
profitLSTM = analysis['dailyP&L'].sum()*100
profitBuyHold = (analysis.loc[analysis.index[testSize-1],'buyAndHold'])*100
plt.figure(figsize=(10,6))
plt.plot(analysis['dailyP&L'].cumsum(),label='LSTM Attention, Return = {:.2f} %'.format(profitLSTM))
plt.plot(analysis['buyAndHold'], label='Buy and Hold, Return = {:.2f} %'.format(profitBuyHold))
plt.legend(fontsize=13)
plt.title('{}: Returns generated over 2019 (Test Set)'.format(stock_name),fontdict = {'fontsize':16})
plt.grid(False);
# Function for ConfusionMatrix, Classification Report Precision-Recall Curve, Area under ROC Curve
def plotMetrics(model, X_test, y_test, target_names, title=None):
from sklearn.metrics import confusion_matrix, plot_confusion_matrix, classification_report, precision_recall_curve, roc_curve, auc
y_scores = model.predict(X_test)
y_pred = np.where(model.predict(X_test)>=0.5,1,0)
print("\n\033[1m\t\t\033[4m {} Confusion Matrix\033[0m\033[0m\n".format(title))
print(confusion_matrix(y_test, y_pred))
print("\n\033[1m\t\t\033[4m {} Classification Report\033[0m\033[0m\n".format(title))
print(classification_report(y_test, y_pred, target_names=target_names))
precision, recall, _ = precision_recall_curve(y_test, y_scores)
plt.figure(figsize=(7,7))
plt.xlim([0.0, 1.01])
plt.ylim([0.0, 1.01])
plt.plot(precision, recall, label='Precision-Recall Curve')
plt.xlabel('Precision', fontsize=14)
plt.ylabel('Recall', fontsize=14)
plt.title("{} Precision Recall Curve".format(title), fontsize=16)
plt.grid(True)
plt.show();
print('\n\n')
fpr, tpr, _ = roc_curve(y_test, y_scores)
roc_auc = auc(fpr, tpr)
plt.figure(figsize=(7,7))
plt.xlim([-0.01, 1.01])
plt.ylim([-0.01, 1.01])
plt.plot(fpr, tpr, lw=3, label='AUC = {:0.2f}'.format(roc_auc))
plt.xlabel('False Positive Rate', fontsize=12)
plt.ylabel('True Positive Rate', fontsize=12)
plt.title('{} ROC curve'.format(title), fontsize=16)
plt.legend(loc='lower right', fontsize=13)
plt.plot([0, 1], [0, 1], lw=2, linestyle='-.')
plt.grid(True)
plt.show();
# Function for Plotting Transition Probabilities
def plotTransitionProb(model,stock,X_test,y_test,title = None,testSize = 252):
import matplotlib.pyplot as plt
import matplotlib.patches as mpatches
from matplotlib.colors import ListedColormap
y_scores = model.predict(X_test)
y_pred = np.where(model.predict(X_test)>=0.5,1,0)
totalRecords = len(stock.index)
test = np.arange(totalRecords - testSize, totalRecords)
target_names = ['Wrong Prediction','Correct Prediction']
c = y_test == y_pred
color_list = ['#EEEE00','#0000CC']
cmap = ListedColormap(color_list)
legend_handles = []
for i in range(0, len(target_names)):
patch = mpatches.Patch(color=color_list[i], label=target_names[i])
legend_handles.append(patch)
plt.figure(figsize=(10,6))
plt.scatter(stock.index[test],y_scores, c = c, cmap = cmap)
plt.legend(loc=0,handles=legend_handles)
plt.title('{}: Transition Probabilities for Up Moves'.format(title),fontdict = {'fontsize':16})
plt.grid(True);
plt.figure(figsize=(10,6))
plt.scatter(stock.index[test], 1-y_scores, c = c, cmap = cmap)
plt.legend(loc=0,handles=legend_handles)
plt.title('{}: Transition Probabilities for Down Moves'.format(title),fontdict = {'fontsize':16})
plt.grid(True);
# Feature Scoring:
for stock_name, stock in stocks.items():
X_train, X_test, y_train, y_test = createTrainTest(stock, features)
X_train_scaled, X_test_scaled = scaleTrainTest(X_train, X_test)
print ("\n\033[1m\t\t\t\t\033[4mAnalyzing {}\033[0m\033[0m".format(stock_name))
featureImportances(X_train_scaled,y_train)
# K-Means Clustering
from sklearn.cluster import KMeans
for stock_name, stock in stocks.items():
X_train, X_test, y_train, y_test = createTrainTest(stock, features)
X_train_scaled, X_test_scaled = scaleTrainTest(X_train, X_test)
kmeans = KMeans(n_clusters = 2, random_state=0)
X_new = kmeans.fit_transform(X_train_scaled)
# Plotting the two new Transformed Axes
cols = [0,1]
axis_labels = ['Transformed Axis 1','Transformed Axis 2']
plotKMeans(X_new, kmeans.labels_, cols, axis_labels, title=stock_name)
# Plotting EMA50 vs MACD
cols = [7,17]
axis_labels = [features[cols[0]],features[cols[1]]]
plotKMeans(X_train_scaled, kmeans.labels_, cols, axis_labels, title=stock_name)
# Plotting RSI50 vs Bollinger Bandwidth
cols = [8,16]
axis_labels = [features[cols[0]],features[cols[1]]]
plotKMeans(X_train_scaled, kmeans.labels_, cols, axis_labels, title=stock_name)
#Plotting SMA200 vs CMA200:
cols = [10,13]
axis_labels = [features[cols[0]],features[cols[1]]]
plotKMeans(X_train_scaled, kmeans.labels_, cols, axis_labels, title=stock_name)
#Plotting Return vs Momentum:
cols = [0,1]
axis_labels = [features[cols[0]],features[cols[1]]]
plotKMeans(X_train_scaled, kmeans.labels_, cols, axis_labels, title=stock_name)
#Setting number of units for different layers
np.random.seed(1)
tf.random.set_seed(1)
inputDays = 28
unitsPreLSTM = 128
unitsPostLSTM = 32
unitsDenseAttention = 32
# Function for creating the Attention LSTM Model
def model(inputDays, unitsPreLSTM , unitsPostLSTM, unitsDenseAttention, featuresLen, name=None):
# Pre Attention LSTM
X = Input(shape=(inputDays, featuresLen), name='Input')
a = LSTM(units = unitsPreLSTM, return_sequences=True, dropout=0.05, name='PreAttention_LSTM')(X)
# Computing Attention and Context
e1 = Dense(unitsDenseAttention, activation = "relu", name='Dense1_Attention')(a)
e2 = Dense(1, activation = "relu", name='Dense2_Attention')(e1)
alphas = Activation(lambda x: K.softmax(x, axis=1), name='attention_weights')(e2)
context = Dot(axes = 1, name='context')([alphas,a])
# Post Attention LSTM
s = LSTM(unitsPostLSTM, dropout=0.05, name='PostAttention_LSTM')(context)
output = Dense(1, activation='sigmoid', name='Output')(s)
model = Model(inputs=X, outputs=output, name=name)
return model
# Creating the Attention LSTM Model for HD
modelHD = model(inputDays, unitsPreLSTM, unitsPostLSTM, unitsDenseAttention, len(features), name='model_HD')
modelHD.summary()
# Model Compile Settings for HD
opt = Adam(lr=0.0005, beta_1=0.9, beta_2=0.999, decay=0.01)
modelHD.compile(optimizer=opt, loss='binary_crossentropy', metrics=['accuracy'])
# Creating Data for HD Attention LSTM Model
X_train, X_test, y_train, y_test = createTrainTest(stocks['HD'], features)
X_train_scaled, X_test_scaled = scaleTrainTest(X_train, X_test)
X_train_LSTM, X_test_LSTM, y_train_LSTM, y_test_LSTM = createDataLSTM(
X_train_scaled, X_test_scaled, y_train, y_test, len(features), n_days=28, testSize=252)
# Training the HD Model
modelHD.fit(X_train_LSTM, y_train_LSTM, epochs=50, batch_size=28, verbose=2)
# Evaluating HD Model
modelHD.evaluate(X_test_LSTM, y_test_LSTM)
# Plotting Metrics for HD Model
plotMetrics(modelHD, X_test_LSTM, y_test_LSTM, list(target_names.values()), 'HD')
plotTransitionProb(modelHD,stocks['HD'],X_test_LSTM,y_test_LSTM,'HD',testSize = 252)
# Forecasting using HD Model on the test set
calcPnL(modelHD,stocks['HD'],X_test_LSTM,y_test_LSTM,threshold = 0.52, stock_name='HD')
# Creating the Attention LSTM Model for LOW
modelLOW = model(inputDays, unitsPreLSTM, unitsPostLSTM, unitsDenseAttention, len(features), name='model_LOW')
modelLOW.summary()
# Model Compile Settings for LOW
opt = Adam(lr=0.0004, beta_1=0.9, beta_2=0.999, decay=0.01)
modelLOW.compile(optimizer=opt, loss='binary_crossentropy', metrics=['accuracy'])
# Creating Data for LOW Attention LSTM Model
X_train, X_test, y_train, y_test = createTrainTest(stocks['LOW'], features)
X_train_scaled, X_test_scaled = scaleTrainTest(X_train, X_test)
X_train_LSTM, X_test_LSTM, y_train_LSTM, y_test_LSTM = createDataLSTM(
X_train_scaled, X_test_scaled, y_train, y_test, len(features), n_days=28, testSize=252)
# Training the LOW Model
modelLOW.fit(X_train_LSTM, y_train_LSTM, epochs=50, batch_size=28, verbose=2)
# Evaluating LOW Model
modelLOW.evaluate(X_test_LSTM, y_test_LSTM)
# Plotting Metrics for LOW Model
plotMetrics(modelLOW, X_test_LSTM, y_test_LSTM, list(target_names.values()), 'LOW')
plotTransitionProb(modelLOW,stocks['LOW'],X_test_LSTM,y_test_LSTM,'LOW',testSize = 252)
# Forecasting using HD Model on the test set
calcPnL(modelLOW,stocks['LOW'],X_test_LSTM,y_test_LSTM,threshold = 0.52, stock_name='LOW')