import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
import yfinance as yf
import talib
import datetime
import warnings
warnings.filterwarnings("ignore")
Stocks Chosen:
The Home Depot, Inc., HD (NYSE)
Lowe's Companies, Inc., LOW (NYSE)
Both Firms are Home Improvement Retailers based in US
# Importing Data
start = datetime.datetime(2014, 1, 1)
end = datetime.datetime(2019, 12, 31)
df = yf.download('HD', start=start, end=end)
df1 = yf.download('LOW', start=start, end=end)
# Visualizing Stock Prices
plt.figure(figsize=(10,6))
df['Adj Close'].plot(label='HD')
df1['Adj Close'].plot(label='LOW')
plt.legend()
plt.title('Stock Price Movememt',fontsize=16)
plt.grid(False);
10 Day Return Direction has been chosen for prediction
# Creating Features
HD = df.copy()
LOW = df1.copy()
stocks = {'HD':HD,'LOW':LOW}
for stock in stocks.values():
features = []
stock.rename(columns={'Adj Close':'price'},inplace=True)
stock['price_FD10'] = stock['price'].shift(-10)
stock['ret_D10'] = np.log(stock['price']/stock['price'].shift(10))
stock['ret_FD10'] = np.log(stock['price_FD10']/stock['price_FD10'].shift(10))
stock['label'] = np.where(stock['ret_FD10']>=0,1,-1)
for i in [10]:
stock['ret_10Dlag'+ str(i)] = stock['ret_D10'].shift(i)
features.extend(['ret_10Dlag'+str(i)])
for i in [28]:
stock['mom_D'+str(i)] = talib.MOM(stock['price'].values,timeperiod=i)
features.extend(['mom_D'+str(i)])
for i in [14,50,200]:
stock['sma_D'+str(i)] = talib.SMA(stock['price'].values,timeperiod=i)
stock['ema_D'+str(i)] = talib.EMA(stock['price'].values,timeperiod=i)
stock['rsi_D'+str(i)] = talib.RSI(stock['price'].values,timeperiod=i)
features.extend(['sma_D'+str(i),'ema_D'+str(i),'rsi_D'+str(i)])
stock.dropna(inplace=True)
target_names = {-1:"Down Move",1:"Up Move"}
# Feature Names
print(features)
# Sanity Check
np.all(HD.index == LOW.index)
Several functions have been defined below.These will be repeatedly called as we complete different tasks
# Function for creating Train and Test Sets
def createTrainTest(stock, features, testSize = 252):
totalRecords = len(stock.index)
test = np.arange(totalRecords - testSize, totalRecords)
train = np.arange(0,test[0])
X_train = stock.loc[stock.index[train],features]
X_test = stock.loc[stock.index[test],features]
y_train = stock.loc[stock.index[train],'label']
y_test = stock.loc[stock.index[test],'label']
return X_train, X_test, y_train, y_test
# Function for Scaling Data
def scaleTrainTest(X_train, X_test):
from sklearn.preprocessing import RobustScaler
scaler = RobustScaler().fit(X_train)
X_train_scaled = scaler.transform(X_train)
X_test_scaled = scaler.transform(X_test)
return X_train_scaled, X_test_scaled
# Function for ConfusionMatrix, Precision-Recall Curve, Area under ROC Curve (Task B.1)
def plotMetrics(clf, X_train, y_train, X_test, y_test, target_names, SVC_classifier=False):
from sklearn.metrics import precision_score, recall_score
from sklearn.metrics import plot_confusion_matrix
from yellowbrick.classifier import PrecisionRecallCurve
from yellowbrick.classifier import ROCAUC
y_predicted = clf.predict(X_test)
print('Precision Test Set: {:.2f}'.format(precision_score(y_test, y_predicted)))
print('Recall Test Set: {:.2f}'.format(recall_score(y_test, y_predicted)))
plot_confusion_matrix(clf, X_test, y_test, display_labels = target_names,cmap = plt.cm.Blues)
plt.grid(False)
plt.figure()
viz1 = PrecisionRecallCurve(clf).fit(X_train,y_train)
viz1.score(X_test, y_test)
viz1.show()
if SVC_classifier == False:
viz2 = ROCAUC(clf, micro=False, macro=False).fit(X_train,y_train)
else:
viz2 = ROCAUC(clf, micro=False, macro=False, per_class=False).fit(X_train,y_train)
viz2.score(X_test,y_test)
viz2.show();
# Function for Feature Scoring and Selection (Task B.2)
def featureImportances(X_train,y_train):
from sklearn.ensemble import RandomForestClassifier
from sklearn.ensemble import AdaBoostClassifier
from sklearn.ensemble import GradientBoostingClassifier
from yellowbrick.model_selection import FeatureImportances
rfc = RandomForestClassifier(max_depth=3, n_jobs=-1,random_state=0)
viz1 = FeatureImportances(rfc,relative=False,labels=features)
viz1.fit(X_train, y_train)
viz1.show()
abc = AdaBoostClassifier(n_estimators=100,random_state=0)
viz2 = FeatureImportances(abc,relative=False,labels=features)
viz2.fit(X_train, y_train)
viz2.show()
gbc = GradientBoostingClassifier(max_depth=3,random_state=0)
viz3= FeatureImportances(gbc,relative=False,labels=features)
viz3.fit(X_train, y_train)
viz3.show();
# Function for Plotting a Classifier (Task A.2 and Task A.3)
def plotClassRegions(clf, X_train, y_train, fig, subplot, X_test=None, y_test=None, title=None,target_names = None,
axis_labels = None, plot_decision_regions = True):
import matplotlib.pyplot as plt
import matplotlib.cm as cm
import matplotlib.patches as mpatches
import numpy as np
from matplotlib.colors import ListedColormap
color_list_light = ['#FFFFAA','#AAAAFF']
color_list_bold = ['#EEEE00','#0000CC']
cmap_light = ListedColormap(color_list_light)
cmap_bold = ListedColormap(color_list_bold)
h = 0.1
k = 0.1
x_plot_adjust = 0.1
y_plot_adjust = 0.1
plot_symbol_size = 50
x_min = X_train[:, 0].min()
x_max = X_train[:, 0].max()
y_min = X_train[:, 1].min()
y_max = X_train[:, 1].max()
x2, y2 = np.meshgrid(np.arange(x_min-k, x_max+k, h), np.arange(y_min-k, y_max+k, k))
P = clf.predict(np.c_[x2.ravel(), y2.ravel()])
P = P.reshape(x2.shape)
cs = subplot.contourf(x2, y2, P, cmap=cmap_light, alpha = 0.8)
cbar = fig.colorbar(cs,ticks=[-1,0,1])
cbar.ax.set_yticklabels(['-1', '0', '1'])
subplot.scatter(X_test[:, 0], X_test[:, 1], c=y_test, cmap=cmap_bold, s=plot_symbol_size, edgecolor = 'black')
subplot.set_xlim(x_min - x_plot_adjust, x_max + x_plot_adjust)
subplot.set_ylim(y_min - y_plot_adjust, y_max + y_plot_adjust)
subplot.set_xlabel(axis_labels[0])
subplot.set_ylabel(axis_labels[1])
subplot.set_title(title,fontsize=16)
if (target_names is not None):
legend_handles = []
for i in range(0, len(target_names)):
patch = mpatches.Patch(color=color_list_bold[i], label=target_names[i])
legend_handles.append(patch)
subplot.legend(loc=0,handles=legend_handles)
# Function for Plotting Transition Probabilities for Task B.3
def plotTransitionProb(clf,stock,X_test,y_test,title = None,testSize = 252):
import matplotlib.pyplot as plt
import matplotlib.patches as mpatches
from matplotlib.colors import ListedColormap
from sklearn.linear_model import LogisticRegression
from sklearn.neighbors import KNeighborsClassifier
totalRecords = len(stock.index)
test = np.arange(totalRecords - testSize, totalRecords)
target_names = ['Wrong Prediction','Correct Prediction']
c = y_test == clf.predict(X_test)
color_list = ['#EEEE00','#0000CC']
cmap = ListedColormap(color_list)
legend_handles = []
for i in range(0, len(target_names)):
patch = mpatches.Patch(color=color_list[i], label=target_names[i])
legend_handles.append(patch)
plt.figure()
plt.scatter(stock.index[test],clf.predict_proba(X_test)[:,1], c = c, cmap = cmap)
plt.legend(loc=0,handles=legend_handles)
plt.title('{}: Transition Probabilities for Up Moves'.format(title),fontdict = {'fontsize':16})
plt.figure()
plt.scatter(stock.index[test],clf.predict_proba(X_test)[:,0], c = c, cmap = cmap)
plt.legend(loc=0,handles=legend_handles)
plt.title('{}: Transition Probabilities for Down Moves'.format(title),fontdict = {'fontsize':16})
# Function for Calculating Daily Profit and Loss for Task B.3
def calcPnL(clf,stock,X_test,y_test,threshold = 0.5,testSize=252):
from sklearn.linear_model import LogisticRegression
from sklearn.neighbors import KNeighborsClassifier
totalRecords = len(stock.index)
test = np.arange(totalRecords - testSize, totalRecords)
analysis = pd.DataFrame(data = HD.loc[HD.index[test],'ret_FD10'],index = HD.index[test])
analysis['probUP'] = clf.predict_proba(X_test)[:,1]
analysis['betSize'] = np.where(analysis['probUP']>threshold,2*analysis['probUP']-1,0.0)
analysis['dailyP&L'] = analysis['ret_FD10']*analysis['betSize']
profit = analysis['dailyP&L'].sum()*100
return analysis['dailyP&L'], profit
Logistic Regression:
Logistic regression is a statistical model that in its basic form uses a logistic function to model a binary dependent variable. The logistic function transforms real-valued input to an output number y between 0 and 1, interpreted as the probability the input object belongs to the positive class, given its input features.
$$ \hat{y} = logistic(\hat{b}+\hat{w_1}x_1+.......+\hat{w_n}x_n)$$$$ = \frac{1}{1+exp[-(\hat{b}+\hat{w_1}x_1+.......+\hat{w_n}x_n)]} $$Penalty L1 or L2 is applicable on the decision function inside the logistic function
HD:
LOW:
# Logistic Regression with penalty l2
print ("\033[1m\t\t\033[4mLogisitic Classifier with L2 Penalty\033[0m\033[0m")
from sklearn.linear_model import LogisticRegression
l2_coeff = np.zeros([len(stocks),len(features)])
for i, (stock_name, stock) in enumerate(stocks.items()):
X_train, X_test, y_train, y_test = createTrainTest(stock, features)
X_train_scaled, X_test_scaled = scaleTrainTest(X_train, X_test)
clf = LogisticRegression(penalty = 'l2', C=1,random_state=0).fit(X_train_scaled, y_train)
l2_coeff[i,:] = clf.coef_
print ("\n\033[1m\t\t\033[4mAnalyzing {}\033[0m\033[0m\n".format(stock_name))
print('Accuracy training set: {:.2f}'.format(clf.score(X_train_scaled, y_train)))
print('Accuracy test set: {:.2f}'.format(clf.score(X_test_scaled, y_test)))
plotMetrics(clf, X_train_scaled, y_train, X_test_scaled, y_test, list(target_names.values()), SVC_classifier=False)
HD:
LOW:
# Logistic Regression with penalty l1
print ("\033[1m\t\t\033[4mLogisitic Classifier with L1 Penalty\033[0m\033[0m")
from sklearn.linear_model import LogisticRegression
l1_coeff = np.zeros([len(stocks),len(features)])
for i, (stock_name, stock) in enumerate(stocks.items()):
X_train, X_test, y_train, y_test = createTrainTest(stock, features)
X_train_scaled, X_test_scaled = scaleTrainTest(X_train, X_test)
clf = LogisticRegression(penalty = 'l1', solver='liblinear', C=1,random_state=0).fit(X_train_scaled, y_train)
l1_coeff[i,:] = clf.coef_
print ("\n\033[1m\t\t\033[4mAnalyzing {}\033[0m\033[0m\n".format(stock_name))
print('Accuracy training set: {:.2f}'.format(clf.score(X_train_scaled, y_train)))
print('Accuracy test set: {:.2f}'.format(clf.score(X_test_scaled, y_test)))
plotMetrics(clf, X_train_scaled, y_train, X_test_scaled, y_test, list(target_names.values()), SVC_classifier=False)
L1 regularization minimizes the sum of the absolute values of the coefficients while L2 regularization minimizes the sum of squares of the coefficients. Thus, L1 has the effect of setting coefficeints to zero for less influential features. L2 also reduces the coefficients but they don't become zero as easily as in the case of L1 penalty.
In the table below we can see that for HD, 6 feautres have zero value for L1 penalty while no feature is zero for L2 penalty. L1 penalty yields a sparse solution and can also be used for feature selection.
Thus, if we have many small to medium influential features, L2 penalty is preferred; and if we have a few large influential features, L1 penalty is preferred.
# Logistic Regression Comparing L2 and L1 Coefficients
pd.set_option("display.precision", 2)
for i, stock_name in enumerate(stocks):
print("\n\033[1m\t\t\033[4mAnalyzing {}\033[0m\033[0m\n".format(stock_name))
print(pd.DataFrame(np.vstack([l2_coeff[i,:],l1_coeff[i,:]]),index=['l2','l1'],
columns=features))
Naive Bayes Classifier:
HD:
LOW:
# Bayesian Classifier
from sklearn.naive_bayes import GaussianNB
for stock_name, stock in stocks.items():
X_train, X_test, y_train, y_test = createTrainTest(stock, features)
X_train_scaled, X_test_scaled = scaleTrainTest(X_train, X_test)
clf = GaussianNB().fit(X_train_scaled, y_train)
print ("\n\033[1m\033[4mAnalyzing {}\033[0m\033[0m\n".format(stock_name))
print('Accuracy training set: {:.2f}'.format(clf.score(X_train_scaled, y_train)))
print('Accuracy test set: {:.2f}'.format(clf.score(X_test_scaled, y_test)))
plotMetrics(clf, X_train_scaled, y_train, X_test_scaled, y_test, list(target_names.values()), SVC_classifier=False)
All three classifers have been considered viz. Logistic Regression with L2 penalty, Logistic Regression with L1 penalty and Bayesian Classifier
Three types of Splitters have been implemented:
1) KFold
2) Shuffle Split
3) Time Series Split
Time Series Split is the most appropriate as it makes sure that we are not using future data to predict past returns. Hence Time Series Split would also be used in GridSearchCV in Task B.2; Other two splitters are only for illustration purpose.
# K-Fold Cross Validation and Reshuffled Samples (Task A.1)
from sklearn.model_selection import ShuffleSplit
from sklearn.model_selection import TimeSeriesSplit
from sklearn.model_selection import KFold
from sklearn.model_selection import cross_validate
from sklearn.linear_model import LogisticRegression
from sklearn.naive_bayes import GaussianNB
np.set_printoptions(precision=2)
lrL2 = LogisticRegression(penalty = 'l2', C=1,random_state=0)
lrL1 = LogisticRegression(penalty = 'l1', solver='liblinear', C=1,random_state=0)
gnb = GaussianNB()
classifiers = {'Logistic Regression L2':lrL2, 'Logistic Regression L1':lrL1, 'Bayesian Classifier':gnb}
tss = TimeSeriesSplit(n_splits=3)
kfold = KFold(n_splits=3)
ss = ShuffleSplit(n_splits=3)
splitTypes = {'Time Series Split': tss, 'K Fold Split':kfold, 'Shuffle Split': ss}
for stock_name, stock in stocks.items():
print ("\n\033[1m\t\t\033[4mAnalyzing {}\033[0m\033[0m".format(stock_name))
X_train, X_test, y_train, y_test = createTrainTest(stock, features)
X_train_scaled, X_test_scaled = scaleTrainTest(X_train, X_test)
for classifier_name, clf in classifiers.items():
print ("\n\033[1m\t\033[4m{}\033[0m\033[0m\n".format(classifier_name))
for split_name, split in splitTypes.items():
print ("\033[1m\033[4m{}\033[0m\033[0m".format(split_name))
cv_results = cross_validate(clf,X_train_scaled,y_train,cv=split,n_jobs=-1)
print('Test Scores Across Folds:{}'.format(cv_results['test_score']))
print('Mean Test Score: {:.2f}\n'.format(cv_results['test_score'].mean()))
HD:
LOW:
# SVM Classifier Soft Margin
from sklearn.svm import SVC
for stock_name, stock in stocks.items():
X_train, X_test, y_train, y_test = createTrainTest(stock, features)
X_train_scaled, X_test_scaled = scaleTrainTest(X_train, X_test)
clf = SVC(kernel='rbf',C=1,gamma='auto',random_state=0).fit(X_train_scaled, y_train)
print ("\n\033[1m\033[4mAnalyzing {}\033[0m\033[0m\n".format(stock_name))
print('Accuracy training set: {:.2f}'.format(clf.score(X_train_scaled, y_train)))
print('Accuracy test set: {:.2f}'.format(clf.score(X_test_scaled, y_test)))
plotMetrics(clf, X_train_scaled, y_train, X_test_scaled, y_test, list(target_names.values()), SVC_classifier=True)
HD:
LOW:
# SVM Classifier Hard Margin
from sklearn.svm import SVC
for stock_name, stock in stocks.items():
X_train, X_test, y_train, y_test = createTrainTest(stock, features)
X_train_scaled, X_test_scaled = scaleTrainTest(X_train, X_test)
clf = SVC(kernel='rbf',C=1000,gamma='auto',random_state=0).fit(X_train_scaled, y_train)
print ("\n\033[1m\033[4mAnalyzing {}\033[0m\033[0m\n".format(stock_name))
print('Accuracy training set: {:.2f}'.format(clf.score(X_train_scaled, y_train)))
print('Accuracy test set: {:.2f}'.format(clf.score(X_test_scaled, y_test)))
plotMetrics(clf, X_train_scaled, y_train, X_test_scaled, y_test, list(target_names.values()), SVC_classifier=True)
Support Vectors are those datapoints that the Classifier Margin pushes up against. Thus, once support vectors are identified, the other training examples can be ignored for classification purpose.
Hard Margin:
Hard Margins imply that we classify all training examples 100% correctly. This may however result in over-fitting and less-smooth decision boundaries and thus the classifier may have poor generalization ability.
Hence, our job is to find the coefficient vector w and constant b, such that
$\quad\quad \Phi(w) = 0.5w^Tw \quad $is minimized
$\quad$ and for all $\quad (x_i,y_i): y_i(w^Tx_i+b) \geq 1 $
Soft Margins:
The data might not be linearly separable. Thus we allow for slack variables $\epsilon_i$ in our objective function to allow for possible miscalssification of difficult or noisy examples. This may lead to smoother boundaries and less overfitted models and consequently a better generalization ability.
Hence, our job is to find the coefficient vector w and constant b, such that
$\quad\quad \Phi(w) = 0.5w^Tw + \lambda\sum \epsilon_i\quad$is minimized
$\quad$ and for all $\quad (x_i,y_i): y_i(w^Tx_i+b) \geq 1 - \epsilon_i $
$\quad$ and $\quad\epsilon_i\geq0 $
We can see the impact of Soft and Hard Margins in the plots below. The classfier boundaries have been plotted and then Test Set Data has been plotted. For both the companies, Hard Margin SVM Classifiers (High value of C = 1000) result in more complex boundary surfaces and are not good at generalization to the Test Set as evident form a lot of miscalssified points.
Soft Margin SVM Classifiers (Low value of C = 1) result in smoother boundaries. Although the predictions are not good for Soft Margins as well.
Poor Prediction for both Hard and Soft Margins is in part due to the two features chosen (Return and Momentum) which are not very informative by themselves.
# SVC Plotting Momentum vs Return Feature
from sklearn.svm import SVC
for stock_name, stock in stocks.items():
X_train, X_test, y_train, y_test = createTrainTest(stock, features)
X_train_scaled, X_test_scaled = scaleTrainTest(X_train, X_test)
X_train_scaled_C2 = X_train_scaled[:,(0,1)]
X_test_scaled_C2 = X_test_scaled[:,(0,1)]
axis_labels = ['Return','Momentum']
for i in [1,1000]:
clf = SVC(kernel='rbf',C=i,gamma='auto',random_state=0).fit(X_train_scaled_C2, y_train)
fig, subplot = plt.subplots(figsize=(10,8))
title = '{} SVC Classifier, C = {}'.format(stock_name,i)
plotClassRegions(clf, X_train_scaled_C2, y_train, fig, subplot, X_test=X_test_scaled_C2,
y_test=y_test, title=title, target_names = list(target_names.values()),
axis_labels=axis_labels,plot_decision_regions = True)
K-NN Classfier:
Given a training set X_train with labels y_train, and given a new instance x_test, it can be classified using K-NN classifier as:
The similar instances (X_NN) to x_test are found out by calculating distance of this point from all the points in the training set. The distance metrics are:
HD:
LOW:
HD:
LOW:
HD:
LOW:
K-NN CLassifier is lazy:
In machine learning, lazy learning is a learning method in which generalization of the training data is, in theory, delayed until a query is made to the system, as opposed to in eager learning, where the system tries to generalize the training data before receiving queries.
K-NN is a lazy learner because it doesn't learn a discriminative function from the training data but memorizes the training dataset instead. Hence, the training time is brief in K-NN because the algorithm has to merely memorize all the examples. However, prediction step in K-NN is relatively expensive: Each time we want to make a prediction, we have to search for the nearest neighbors in the entire training set.
# K-NN Classifier Manhattan, Euclidean and Mahalanobis
from sklearn.neighbors import KNeighborsClassifier
metrics = ['manhattan','euclidean','mahalanobis']
best_neighbour_count = {'HD':[],'LOW':[]}
for stock_name, stock in stocks.items():
print ("\n\033[1m\t\t\033[4mAnalyzing {}\033[0m\033[0m".format(stock_name))
X_train, X_test, y_train, y_test = createTrainTest(stock, features)
X_train_scaled, X_test_scaled = scaleTrainTest(X_train, X_test)
for metric in metrics:
print ("\n\033[1m\t\033[4m{}\033[0m\033[0m\n".format(metric))
scores = []
for i in range(1,101):
if metric == 'mahalanobis':
knn = KNeighborsClassifier(n_neighbors = i, algorithm='brute', metric=metric,
metric_params={'V' : np.cov(X_train_scaled)},n_jobs=-1).fit(X_train_scaled,y_train)
else:
knn = KNeighborsClassifier(n_neighbors = i, metric=metric, n_jobs=-1).fit(X_train_scaled,y_train)
scores.append(knn.score(X_test_scaled, y_test))
n_neighbors = scores.index(max(scores))+1 # Number of Neighbours corresponding to Maximum Test Score
best_neighbour_count[stock_name].append(n_neighbors)
if metric == 'mahalanobis':
knn = KNeighborsClassifier(n_neighbors = n_neighbors, algorithm='brute', metric=metric,
metric_params={'V' : np.cov(X_train_scaled)},n_jobs=-1).fit(X_train_scaled,y_train)
else:
knn = KNeighborsClassifier(n_neighbors = n_neighbors, metric=metric, n_jobs=-1).fit(X_train_scaled,y_train)
print('Number of Neighbours for Max Test Score:{}'.format(n_neighbors))
print('Accuracy training set: {:.2f}'.format(knn.score(X_train_scaled, y_train)))
print('Accuracy test set: {:.2f}\n'.format(knn.score(X_test_scaled, y_test)))
plotMetrics(knn, X_train_scaled, y_train, X_test_scaled, y_test, list(target_names.values()))
We can observe that as number of n-neighbours increase, the decision boundaries become smoother. This is because, the ability of an outlier/single example to influence the prediction is reduced. A larger number of examples vote on predicting the class. Hence, the prediction tends to get averaged out and thus resulting in smoother decision boundaries.
# K-NN Classifier Manhattan, Euclidean and Mahalanobis (Plotting Decision Boundaries) Return vs RSI Features
from sklearn.neighbors import KNeighborsClassifier
metrics = ['manhattan','euclidean','mahalanobis']
for stock_name, stock in stocks.items():
X_train, X_test, y_train, y_test = createTrainTest(stock, features)
X_train_scaled, X_test_scaled = scaleTrainTest(X_train, X_test)
X_train_scaled_C2 = X_train_scaled[:,(0,10)]
X_test_scaled_C2 = X_test_scaled[:,(0,10)]
axis_labels = ['Return','RSI']
for i, metric in enumerate(metrics):
if metric == 'mahalanobis':
knn = KNeighborsClassifier(n_neighbors = best_neighbour_count[stock_name][i], algorithm='brute',
metric=metric, metric_params={'V' : np.cov(X_train_scaled)},
n_jobs=-1).fit(X_train_scaled_C2,y_train)
else:
knn = KNeighborsClassifier(n_neighbors = best_neighbour_count[stock_name][i], metric=metric,
n_jobs=-1).fit(X_train_scaled_C2,y_train)
fig, subplot = plt.subplots(figsize=(10,8))
title = '{} KNN Classifier, metric : {}, neighbours : {}'.format(stock_name,metric,best_neighbour_count[stock_name][i])
plotClassRegions(knn, X_train_scaled_C2, y_train, fig, subplot, X_test=X_test_scaled_C2,
y_test=y_test, title=title, target_names = list(target_names.values()),
axis_labels=axis_labels,plot_decision_regions = True)
Feature importances have been plotted using the following three classifiers:
Observations (for both HD and LOW):
All the features will be retained, as none of the features have very high importance and the difference between high and low ranking features is not pronounced.
# Feature Scoring or Selection (Task B.2):
for stock_name, stock in stocks.items():
X_train, X_test, y_train, y_test = createTrainTest(stock, features)
X_train_scaled, X_test_scaled = scaleTrainTest(X_train, X_test)
print ("\n\033[1m\t\t\033[4mAnalyzing {}\033[0m\033[0m".format(stock_name))
featureImportances(X_train_scaled,y_train)
Parameter Tuning has been done as follows:
On the basis of the results, for both the stocks, Logistic Regression L2 Penalty classifier and K-NN Mahalanobis Metric classifier are chosen for return prediction in Task B.3
How to reduce misclassified negative returns:
# Parameter Tuning using GridSearchCV for all Classifiers (Task B.2)
from sklearn.linear_model import LogisticRegression
from sklearn.svm import SVC
from sklearn.neighbors import KNeighborsClassifier
from sklearn.model_selection import GridSearchCV
from sklearn.model_selection import TimeSeriesSplit
bestClassifiers = {'HD':[],'LOW':[]}
tss = TimeSeriesSplit(n_splits=3)
lrcL2 = LogisticRegression(random_state = 0, n_jobs = -1, max_iter = 1000)
lrcL1 = LogisticRegression(penalty = 'l1', solver='liblinear',random_state=0, max_iter = 1000, tol = 0.01)
svc = SVC(kernel ='rbf',random_state = 0)
knn = KNeighborsClassifier(n_jobs = -1)
classifiers = {'Logistic Regression L2': lrcL2, 'Logistic Regression L1': lrcL1,
'SVC': svc,'KNN': knn,'KNN Mahalanobis': np.nan}
c = list(np.linspace(0.1,0.99,10))+list(np.arange(1,101,5))
gamma = list(np.linspace(0.01,0.099,10))+list(np.linspace(0.1,0.99,10))+list(np.arange(1,100,5))
n_neighbors = list(np.arange(1,101))
lrcL2_grid_values = {'C': c}
lrcL1_grid_values = {'C': c}
svc_grid_values = {'gamma': gamma}
knn_grid_values = {'n_neighbors':n_neighbors,'metric':['manhattan','euclidean']}
knn_mahalanobis_grid_values = {'n_neighbors':n_neighbors}
grid_values = [lrcL2_grid_values,lrcL1_grid_values,svc_grid_values,knn_grid_values,knn_mahalanobis_grid_values]
for stock_name, stock in stocks.items():
print ("\n\033[1m\t\t\033[4mAnalyzing {}\033[0m\033[0m".format(stock_name))
X_train, X_test, y_train, y_test = createTrainTest(stock, features)
X_train_scaled, X_test_scaled = scaleTrainTest(X_train, X_test)
knn_mahalanobis = KNeighborsClassifier(algorithm='brute', metric = 'mahalanobis',
metric_params={'V' : np.cov(X_train_scaled)},n_jobs=-1)
classifiers['KNN Mahalanobis'] = knn_mahalanobis
for (classifier_name, clf), param_grid in zip(classifiers.items(),grid_values):
print ("\n\033[1m\033[4m{}\033[0m\033[0m".format(classifier_name))
grid_clf = GridSearchCV(clf, param_grid = param_grid, n_jobs = -1, cv = tss, scoring = 'roc_auc')
grid_clf.fit(X_train_scaled, y_train)
clf_best = grid_clf.best_estimator_
bestClassifiers[stock_name].append(clf_best)
print('Grid best parameters: {}'.format(grid_clf.best_params_))
print('Grid best AUC: {:.2f}'.format(grid_clf.best_score_))
print('Test set Accuracy: {:.2f}'.format(clf_best.score(X_test_scaled, y_test)))
HD:
LOW:
# Estimators Chosen: Logistic Regression L2 and KNN Mahalanobis (Plotting Metrics)
from sklearn.linear_model import LogisticRegression
from sklearn.neighbors import KNeighborsClassifier
classifiers = {0:'Logistic Regression L2',3:'KNN Mahalanobis'}
for stock_name, stock in stocks.items():
print ("\n\033[1m\t\t\033[4mAnalyzing {}\033[0m\033[0m".format(stock_name))
X_train, X_test, y_train, y_test = createTrainTest(stock, features)
X_train_scaled, X_test_scaled = scaleTrainTest(X_train, X_test)
for index, classifier_name in classifiers.items():
print ("\n\033[1m\033[4m\t{}\033[0m\033[0m\n".format(classifier_name))
clf = bestClassifiers[stock_name][index]
plotMetrics(clf, X_train_scaled, y_train, X_test_scaled, y_test, list(target_names.values()))
# Estimators Chosen: Logistic Regression L2 and KNN Mahalanobis (Plotting Transitional Probabilities)
from sklearn.linear_model import LogisticRegression
from sklearn.neighbors import KNeighborsClassifier
classifiers = {0:'Logistic Regression L2',3:'KNN Mahalanobis'}
for stock_name, stock in stocks.items():
X_train, X_test, y_train, y_test = createTrainTest(stock, features)
X_train_scaled, X_test_scaled = scaleTrainTest(X_train, X_test)
for index, classifier_name in classifiers.items():
clf = bestClassifiers[stock_name][index]
title = stock_name + ', '+classifier_name
plotTransitionProb(clf,stock,X_test_scaled,y_test,title = title, testSize = 252)
# Calculating Daily Returns (Task B.3)
from sklearn.linear_model import LogisticRegression
from sklearn.neighbors import KNeighborsClassifier
dailyPnL = {'HD':[],'LOW':[]}
classifiers = {0:'Logistic Regression L2',3:'KNN Mahalanobis'}
for stock_name, stock in stocks.items():
print ("\n\033[1m\t\t\033[4mAnalyzing {}\033[0m\033[0m".format(stock_name))
X_train, X_test, y_train, y_test = createTrainTest(stock, features)
X_train_scaled, X_test_scaled = scaleTrainTest(X_train, X_test)
for index, classifier_name in classifiers.items():
print ("\n\033[1m\033[4m{}\033[0m\033[0m".format(classifier_name))
clf = bestClassifiers[stock_name][index]
threshold = 0.52
seriesPandL, profit = calcPnL(clf,stock,X_test_scaled,y_test,threshold = threshold)
dailyPnL[stock_name].append(seriesPandL)
print ("Total Annual Return: {:.2f} %".format(profit))
for stock_name in stocks.keys():
plt.figure(figsize=(10,6))
for i,classifier_name in enumerate(classifiers.values()):
plt.plot(dailyPnL[stock_name][i].cumsum(),label=classifier_name)
plt.legend()
plt.title('{}: Cumulative Returns'.format(stock_name),fontdict = {'fontsize':16})
plt.grid(False)