Ensemble Methods for Predicting Market Movements

This notebook explores various ML ensemble networks as the base structure of a forecast centric alpha model.

Since we are interested in forecasting, our general problem can be defined as

Given $t$ time units of an asset's price data, predict its price at $t+n$ units of time.

In other words, given for example 30 past observations what will the price look like at unseen observation 31? On an abstract level, we can define a function $F$ so that $F(x, n) = B(x, n+1)$ where $x \in T$, $n >= 1; n <= |T|$ and $B(x, n) = ||x_n - x_{n+1}|| \leq \delta$.

To start with, we can simplify and treat the problem as a classificaton task instead of a regression task. We are simply interested in predicting the direction of price movement. For example, given 30 hours of data, will the price go up/down/stay the same at hour 31?

To give the abstract function $B$ some kind of actual meaning we can treat it as a classifer that needs optimisation. Below, we will explore several classifiers from the ensemble family. This includes Naive Random Forests, Extra Trees and Gradient Boosted Machines

Imports

Keep this cell for imports only so we don't need to import random stuff all over the place

In [6]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as pt
import sklearn
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis
from sklearn.ensemble import (BaggingClassifier, RandomForestClassifier, GradientBoostingClassifier, ExtraTreesClassifier)
from sklearn.externals import joblib
from sklearn.metrics import confusion_matrix
from sklearn.model_selection import train_test_split
from sklearn.tree import DecisionTreeClassifier
from sklearn.model_selection import GridSearchCV
from sklearn.externals import joblib
from sklearn.linear_model import LogisticRegression
from mlxtend.classifier import StackingClassifier
from sklearn import model_selection
from sklearn.neighbors import KNeighborsClassifier
from mlxtend.classifier import StackingCVClassifier
from mlxtend.classifier import EnsembleVoteClassifier
from sklearn.preprocessing import normalize
import matplotlib.pyplot as plt
import itertools
import math as m
from sklearn import svm
from sklearn.naive_bayes import GaussianNB
from xgboost import XGBClassifier
import talib

pd.set_option('precision', 10)

Downloader

We need to download some BTC or altcoin data to train our classifier on. Here we use Poloniex, one of the bigger exchanges with a great historic data API. Below is a simple function that grabs OHLC (open, high, low, close) price data of a currency pair (etherum/bitcoin for example) up to N days back in time, with a set period denomination in T seconds.

In [7]:
import urllib
import json
import time
import numpy as np
import pandas as pd

class Poloniex:
    def get_trades(self, currency = "BTC_ETH", lookback = 365, up_to = np.inf, period = 1800):
        start = int(time.time()) - (lookback*24*60*60)
        end = 9999999999
        if up_to < np.inf: end = int(time.time()) - (up_to*24*60*60)
        uri = (
            'https://poloniex.com/public?command=returnChartData&currencyPair=%s&start=%d&end=%d&period=%d' %
            (currency, start, end, period)
        )
        ret = urllib.request.urlopen(uri, timeout=5)
        return self.__to_ohlcv(
                   self.__to_dataframe(
                       ret.read().decode('utf-8')
                   )
               )

    def __to_dataframe(self, data):
        j = json.loads(data)
        df = pd.DataFrame.from_records(j)
        df['date'] = pd.to_datetime(df['date'], unit='s')
        return df

    def __to_ohlcv(self, df):
        return df[['open', 'high', 'low', 'close', 'volume']]

Utility

This section just defines some common utility functions.

In [8]:
def plot_confusion_matrix(cm, classes,
                          normalize=False,
                          title='Confusion matrix',
                          cmap=plt.cm.Blues):
    """
    This function prints and plots the confusion matrix.
    Normalization can be applied by setting `normalize=True`.
    """
    if normalize:
        cm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]
        print("Normalized confusion matrix")
    else:
        print('Confusion matrix, without normalization')

    print(cm)

    plt.imshow(cm, interpolation='nearest', cmap=cmap)
    plt.title(title)
    plt.colorbar()
    tick_marks = np.arange(len(classes))
    plt.xticks(tick_marks, classes, rotation=45)
    plt.yticks(tick_marks, classes)

    fmt = '.2f' if normalize else 'd'
    thresh = cm.max() / 2.
    for i, j in itertools.product(range(cm.shape[0]), range(cm.shape[1])):
        plt.text(j, i, format(cm[i, j], fmt),
                 horizontalalignment="center",
                 color="white" if cm[i, j] > thresh else "black")

    plt.tight_layout()
    plt.ylabel('True label')
    plt.xlabel('Predicted label')

Data and Preprocessing

Defined below are several data preprocessing, cleaning and feature engineering techniques. The general process looks like this:

  • Grab a currency pair from Polo or read directly from a csv file
  • Read Open, High, Low, Close and Volume data
  • Sort from earliest to latest
  • Generate several quantatitive indicators like the RSI and William's R
  • Set a lookback window (for example 30 days)
  • Set a lookahead window (for example 1 day)
  • Calculate % returns for every timestamp
  • Define X as N lookback period observations. For example X[1] = % retruns1 to %returns30, X[2] = % returns2 to % returns31
  • Define Y as a positive or negative integer (1, -1) for every X, so that if %returns31 < %returns30, Y = -1 (price went down) and if %returns31 > %returns30, Y = 1 (price went down)
  • Selectively apply exponential smoothing to all of the data to get rid of noise.
  • Split the data into training and testing set with a split % of 70
In [10]:
def read_large():
    return pd.DataFrame(_PLARGE)

def read_walton():
    return pd.read_csv("/home/lisper/fintech/data/btc_wtc_1h_candles.csv", parse_dates=True)

def read_bitcoin():
    return pd.read_csv("/home/lisper/fintech/data/btc_usd_1h.csv", parse_dates=True)

def read_link():
    return pd.read_csv("/home/lisper/fintech/data/link_btc_1h.csv", parse_dates=True)

def read_etp():
    b = Poloniex()
    return b.get_trades('ETPBTC', '1h')

def read_candles():
    return pd.read_csv("/home/lisper/fintech/data/trees_wtc_btc_1h_candles.csv", parse_dates=True)

def read_unseen():
    return pd.read_csv("/home/lisper/fintech/data/new_candles.csv", parse_dates=True)

def apply_smoothing(ts):
    return ts.ewm(alpha=0.1).mean()
  
def calculate_indicators(ts):
    ts['rsi'] = talib.RSI(ts['close'].values, timeperiod=14)
    [fasts, _] = talib.STOCHF(ts['high'].values, ts['low'].values, ts['close'].values, fastk_period=14)
    ts['osc'] = fasts
    ts['wr'] = talib.WILLR(ts['high'].values, ts['low'].values, ts['close'].values, timeperiod=14)
    [macd, macdsig, _] = talib.MACD(ts['close'].values, fastperiod=12, slowperiod=26, signalperiod=9)
    ts['macd'] = macd
    ts['macdsig'] = macdsig
    ts['prc'] = talib.ROCP(ts['close'].values, timeperiod=14)
    ts['obv'] = talib.OBV(ts['close'].values, ts['volume'].values)
    ts = ts[ts.index >= 33]
    return ts

def construct_lagged_data(ts, lookback=6, lookahead=1, features=['rsi', 'osc', 'wr', 'prc', 'obv', 'macd', 'macdsig']):
    for i in range(0, lookback):
        ts["Lookback%s" % str(i+1)] = ts["close"].shift(i+1)
        if 'rsi' in features: ts["RSI%s" % str(i+1)] = ts["rsi"].shift(i+1)
        if 'osc' in features: ts["OSC%s" % str(i+1)] = ts["osc"].shift(i+1)
        if 'wr' in features: ts["WR%s" % str(i+1)] = ts["wr"].shift(i+1)
        if 'prc' in features: ts["PRC%s" % str(i+1)] = ts["prc"].shift(i+1)
        if 'obv' in features: ts["OBV%s" % str(i+1)] = ts["obv"].shift(i+1)
        if 'macd' in features: ts["MACD%s" % str(i+1)] = ts["macd"].shift(i+1)
        if 'macdsig' in features: ts["MACDSIG%s" % str(i+1)] = ts["macdsig"].shift(i+1)
    for i in range(0, lookahead):
        ts["Lookforward%s" % str(i+1)] = ts["close"].shift(-(i+1))
    ts.dropna(inplace=True)
    
    #adjust to percentage returns
    ts["Lookback0"] = ts["close"].pct_change()*100.0
    if 'rsi' in features: ts["RSI0"] = ts["rsi"].pct_change()*100
    if 'osc' in features: ts["OSC0"] = ts["osc"].pct_change()*100
    if 'wr' in features: ts["WR0"] = ts["wr"].pct_change()*100
    if 'prc' in features: ts["PRC0"] = ts["prc"].pct_change()*100
    if 'obv' in features: ts["OBV0"] = ts["obv"].pct_change()*100
    if 'macd' in features: ts["MACD0"] = ts["macd"].pct_change()*100
    if 'macdsig' in features: ts["MACDSIG0"] = ts["macdsig"].pct_change()*100
    for i in range(0, lookback):
        ts["Lookback%s" % str(i+1)] = ts[
           "Lookback%s" % str(i+1)
          ].pct_change()*100.0
    for i in range(0, lookahead):
        ts["Lookforward%s" % str(i+1)] = ts[
           "Lookforward%s" % str(i+1)
          ].pct_change()*100.0
    for f in features:
        for i in range(0, lookback):
            ts["%s%s" % (f.upper(), str(i+1))] = ts[
               "%s%s" % (f.upper(), str(i+1))
              ].pct_change()*100.0
    ts.dropna(inplace=True)
    
    up_down_factor=2.0
    percent_factor=0.01
    
    up = up_down_factor*percent_factor
    down = percent_factor
    
    down_cols = [
        ts["Lookforward%s" % str(i+1)] > -down
        for i in range(0, lookahead)
        ]
    up_cols = [
        ts["Lookforward%s" % str(i+1)] > up
        for i in range(0, lookahead) 
        ]
    down_tot = down_cols[0]
    for c in down_cols[1:]:
        down_tot = down_tot & c 
        up_tot = up_cols[0]
    for c in up_cols[1:]:
        up_tot = up_tot | c
    ts["UpDown"] = down_tot & up_tot
    #ts["UpDown"] = np.sign(ts["Lookforward%d" % lookahead])
    ts["UpDown"] = ts["UpDown"].astype(int) 
    ts["UpDown"].replace(to_replace=0, value=-1, inplace=True)
    return ts

def construct_lagged_data_n(ts, lookback=6, lookahead=1, features=[]):
    for i in range(0, lookback):
        ts["Lookback%s" % str(i+1)] = ts["close"].shift(i+1)
        ts["Open%s" % str(i+1)] = ts["open"].shift(i+1)
        ts["High%s" % str(i+1)] = ts["high"].shift(i+1)
        ts["Low%s" % str(i+1)] = ts["low"].shift(i+1)
        ts["Volume%s" % str(i+1)] = ts["volume"].shift(i+1)
    for i in range(0, lookahead):
        ts["Lookforward%s" % str(i+1)] = ts["close"].shift(-(i+1))
    ts.dropna(inplace=True)
    
    #adjust to percentage returns
    ts["Lookback0"] = ts["close"].pct_change()*100.0
    ts["Open0"] = ts["open"].pct_change()*100
    ts["High0"] = ts["high"].pct_change()*100
    ts["Low0"] = ts["low"].pct_change()*100
    ts["Volume0"] = ts["volume"].pct_change()*100
    for i in range(0, lookback):
        ts["Lookback%s" % str(i+1)] = ts[
           "Lookback%s" % str(i+1)
          ].pct_change()*100.0
    for i in range(0, lookahead):
        ts["Lookforward%s" % str(i+1)] = ts[
           "Lookforward%s" % str(i+1)
          ].pct_change()*100.0
    for f in ['Open', 'High', 'Low', 'Volume']:
        for i in range(0, lookback):
            ts["%s%s" % (f, str(i+1))] = ts[
               "%s%s" % (f, str(i+1))
              ].pct_change()*100.0
    ts.dropna(inplace=True)
    
    down = 0.01
    up = 0.01
    
    down_cols = [
        ts["Lookforward%s" % str(i+1)] > -down
        for i in range(0, lookahead)
        ]
    up_cols = [
        ts["Lookforward%s" % str(i+1)] > up
        for i in range(0, lookahead) 
        ]
    ts["UpDown"] = np.sign(ts["Lookforward%d" % lookahead])
    ts["UpDown"] = ts["UpDown"].astype(int) 
    ts["UpDown"].replace(to_replace=0, value=-1, inplace=True)
    return ts

def construct_last(ts, lookback=6, lookahead=1, features=['rsi', 'osc', 'wr', 'prc', 'obv', 'macd', 'macdsig']):
    for i in range(0, lookback):
        ts["Lookback%s" % str(i+1)] = ts["close"].shift(i+1)
        if 'rsi' in features: ts["RSI%s" % str(i+1)] = ts["rsi"].shift(i+1)
        if 'osc' in features: ts["OSC%s" % str(i+1)] = ts["osc"].shift(i+1)
        if 'wr' in features: ts["WR%s" % str(i+1)] = ts["wr"].shift(i+1)
        if 'prc' in features: ts["PRC%s" % str(i+1)] = ts["prc"].shift(i+1)
        if 'obv' in features: ts["OBV%s" % str(i+1)] = ts["obv"].shift(i+1)
        if 'macd' in features: ts["MACD%s" % str(i+1)] = ts["macd"].shift(i+1)
        if 'macdsig' in features: ts["MACDSIG%s" % str(i+1)] = ts["macdsig"].shift(i+1)
    ts.dropna(inplace=True)
    ts["Lookback0"] = ts["close"].pct_change()*100.0
    if 'rsi' in features: ts["RSI0"] = ts["rsi"]
    if 'osc' in features: ts["OSC0"] = ts["osc"]
    if 'wr' in features: ts["WR0"] = ts["wr"]
    if 'prc' in features: ts["PRC0"] = ts["prc"]
    if 'obv' in features: ts["OBV0"] = ts["obv"]
    if 'macd' in features: ts["MACD0"] = ts["macd"]
    if 'macdsig' in features: ts["MACDSIG0"] = ts["macdsig"]
    for i in range(0, lookback):
        ts["Lookback%s" % str(i+1)] = ts[
           "Lookback%s" % str(i+1)
          ].pct_change()*100.0
    ts.dropna(inplace=True)
    random_state = 42
    tn = pd.DataFrame()
    for i in range(0, lookback):
        tn["Lookback%d" % i] = ts["Lookback%d" % i]
        if 'rsi' in features: tn["RSI%d" % i] = ts["RSI%d" % i]
        if 'osc' in features: tn["OSC%d" % i] = ts["OSC%d" % i]
        if 'wr' in features: tn["WR%d" % i] = ts["WR%d" % i]
        if 'prc' in features: tn["PRC%d" % i] = ts["PRC%d" % i]
        if 'obv' in features: tn["OBV%d" % i] = ts["OBV%d" % i]
        if 'macd' in features: tn["MACD%d" % i] = ts["MACD%d" % i]
        if 'macdsig' in features: tn["MACDSIG%d" % i] = ts["MACDSIG%d" % i]
    return tn.tail(1)

def preprocess_data(ts, lookback=6, features=['rsi', 'osc', 'wr', 'prc', 'obv', 'macd', 'macdsig']):
    random_state = 42
    tn = pd.DataFrame()
    for i in range(0, lookback):
        tn["Lookback%d" % i] = ts["Lookback%d" % i]
        if 'rsi' in features: tn["RSI%d" % i] = ts["RSI%d" % i]
        if 'osc' in features: tn["OSC%d" % i] = ts["OSC%d" % i]
        if 'wr' in features: tn["WR%d" % i] = ts["WR%d" % i]
        if 'prc' in features: tn["PRC%d" % i] = ts["PRC%d" % i]
        if 'obv' in features: tn["OBV%d" % i] = ts["OBV%d" % i]
        if 'macd' in features: tn["MACD%d" % i] = ts["MACD%d" % i]
        if 'macdsig' in features: tn["MACDSIG%d" % i] = ts["MACDSIG%d" % i]
    X = normalize(tn)
    y = ts["UpDown"]
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=0.3, random_state=random_state, shuffle=False
    )
    return X_train, X_test, y_train, y_test

def preprocess_data_n(ts, lookback=6, features=[]):
    random_state = 42
    tn = pd.DataFrame()
    for i in range(0, lookback):
        tn["Lookback%d" % i] = ts["Lookback%d" % i]
        tn["Open%d" % i] = ts["Open%d" % i]
        tn["High%d" % i] = ts["High%d" % i]
        tn["Low%d" % i] = ts["Low%d" % i]
        tn["Volume%d" % i] = ts["Volume%d" % i]
    X = normalize(tn)
    y = ts["UpDown"]
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=0.3, random_state=random_state, shuffle=False
    )
    return X_train, X_test, y_train, y_test

def preprocess_for_forecast(ts, lookback=6, features=['rsi', 'osc', 'wr', 'prc', 'obv']):
    random_state = 42
    tn = pd.DataFrame()
    for i in range(0, lookback):
        tn["Lookback%d" % i] = ts["Lookback%d" % i]
        if 'rsi' in features: tn["RSI%d" % i] = ts["RSI%d" % i]
        if 'osc' in features: tn["OSC%d" % i] = ts["OSC%d" % i]
        if 'wr' in features: tn["WR%d" % i] = ts["WR%d" % i]
        if 'prc' in features: tn["PRC%d" % i] = ts["PRC%d" % i]
        if 'obv' in features: tn["OBV%d" % i] = ts["OBV%d" % i]
    X = tn
    y = ts["UpDown"]
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=0.3, random_state=random_state, shuffle=False
    )
    return X_train, X_test, y_train, y_test

def trees_model():
    return ExtraTreesClassifier(random_state=42, n_jobs=4)

def fit_model(X_train, y_train, n_estimators, max_depth):
    model = ExtraTreesClassifier(
        n_estimators=n_estimators,
        n_jobs=4,
        random_state=42,
        max_depth=max_depth
    )
    model.fit(X_train, y_train)
    return model

Model fitting and selection

Here we put everything together.
We grab hourly ETH/BTC data going back 1 year in time. We set a lookback window of 6 hours and a lookahead window of 1 hour. In other words, given 6 hours of data, predict price movement at 7th hour. Then we train several models on this data and select the best model based on its test score.

The best model is then saved to disk.

In [569]:
def gen_and_preprocess(data = 'large', lookback=6, alt=True):
    ts = generate_frame(data, lookback, alt)
    return process_and_split(ts, lookback, alt)
    
def generate_frame(data = 'large', lookback=6, alt=False):
    if data == 'large':
        ts = read_large()
    elif data == 'etp':
        ts = read_etp()
    elif data == 'walton':
        ts = read_walton()
    ts = apply_smoothing(ts)
    ts = calculate_indicators(ts)
    if alt:
        return construct_lagged_data_n(ts, lookback=lookback, lookahead=1)
    else:
        return construct_lagged_data(ts, lookback=lookback, lookahead=1)

def process_and_split(ts, lookback=6, alt=False):
    if alt:
        return preprocess_data_n(ts, lookback=6)
    else:
        return preprocess_data(ts, lookback=6)

X_train, X_test, y_train, y_test = gen_and_preprocess()

models = ["random_forext", "extra_trees", "boosted_trees"]
score = np.inf
best_model = None

for model in  models:
    model = model.()
    parameters = {"n_estimators": range(1000, 7000, 1000), "max_depth": range(10, 70, 10)}
    clf = GridSearchCV(model, parameters, n_jobs=16)
    clf.fit(X_train, y_train)
    nscore = clf.score(X_test, y_test)
    if nscore <= score:
        score = nscore
        best_model = model

joblib.dump(model, "/home/lisper/fintech/models/random_forest_ethbtc.mdl")
Out[569]:
GridSearchCV(cv=None, error_score='raise',
       estimator=ExtraTreesClassifier(bootstrap=False, class_weight=None, criterion='gini',
           max_depth=None, max_features='auto', max_leaf_nodes=None,
           min_impurity_decrease=0.0, min_impurity_split=None,
           min_samples_leaf=1, min_samples_split=2,
           min_weight_fraction_leaf=0.0, n_estimators=10, n_jobs=4,
           oob_score=False, random_state=42, verbose=0, warm_start=False),
       fit_params=None, iid=True, n_jobs=4,
       param_grid={'max_depth': range(10, 70, 10), 'n_estimators': range(1000, 7000, 1000)},
       pre_dispatch='2*n_jobs', refit=True, return_train_score=True,
       scoring=None, verbose=0)
In [572]:
best_model.score(X_test, y_test)
Out[572]:
0.88141025641025639

Not bad! 88% accuracy on smoothed data.

Backtest

Does our model generalise correctly? How will it behave with completely unseen data? Can it actually pull a profit if we include broker commissions, etc...

Here we test our model on ~2 months of unseen data WTC/BTC data. With a reasonable %.01 broker commission and no slippage.

We call an external program defined in backtrader.py. Our original budget is set to 100 BTC. The trading strategy is very simple:

  • If we are not currently holding a position, buy with 50% of our money if our model predicts positive price movement
  • If we are holding a position, sell it if model predicts negative price movement, do nothing otherwise
In [ ]:
!backtrader.py --model '/home/lisper/fintech/models/random_forest_ethbtc.mdl' --pair "BTC/WTC" -output "/home/lisper/fintech/backtrader/results/wtc_btc_random_forest_test.png"
In [2]:
from IPython.display import Image
In [4]:
Image("/home/lisper/fintech/backtrader/results/ensemble_ethbtc.png")
Out[4]: