Время прочтения: 7 мин.

В конце 2020 г. мы приняли участие в соревновании Kaggle, цель которого — создание модели предсказания итогов сделок по покупке — продаже ценных бумаг. Для обучения модели Kaggle предоставил обезличенные данные с мировой финансовой площадки.

Разработка стратегии для торговли ценными бумагами является сложной задачей. Стратегия, прибыльная сегодня, может завтра принести убыток. Волатильность рынка затрудняет предсказание доходности сделок.

Мы решим эту задачу за 8 шагов.

Исходный пакет данных содержит обезличенный набор функций feature_{0…129}, представляющий реальные данные фондового рынка. Каждая строка в наборе данных это исход сделки, для которой будет предсказываться действие: 1 — совершить сделку, 0 — пройти мимо. Каждая сделка имеет соответствующий weight, который показывает доходность сделки. Столбец date — день сделки, ts_id — время сделки.

В обучающем наборе данных train.csv предоставляется значение resp, а также несколько других resp_{1,2,3,4}, которые являются результатами сделок за различные периоды.

Вывод head представлен ниже:

Теперь приступим к реализации.

  1. Используем алгоритмы Machine Learning — TensorFlow Keras.
from tensorflow.keras.layers import Input, Dense, BatchNormalization, Dropout, Concatenate, Lambda, GaussianNoise, Activation
from tensorflow.keras.models import Model, Sequential
from tensorflow.keras.losses import BinaryCrossentropy
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import EarlyStopping
from tensorflow.keras.layers.experimental.preprocessing import Normalization
import tensorflow as tf
import numpy as np
import pandas as pd
from sklearn.model_selection import GroupKFold
from tqdm import tqdm
from random import choices
import kerastuner as kt

2. Создадим кросс-валидатор временных рядов с непересекающимися группами. Это позволяет разделить массив на две части и избежать включение информации из тренировочных в тестовые данные (если модель имеет оконные или запаздывающие функции). Передаем кросс-валидатору индексы тренировочных и тестовых групп для разделения выборок данных временных рядов, которые наблюдаются через фиксированные интервалы. В каждой выборке (слое) тестовые индексы должны быть больше, чем в предыдущей, поэтому перетасовка в кросс-валидаторе недопустима. Этот объект класса перекрестной проверки является вариацией класса KFold. В k-м слое он возвращает первые k сгибов в качестве тренировочного набора и (k+1) — й сгиб в качестве тестового набора. Одна и та же группа не будет появляться в двух разных сгибах (число различных групп должно быть по крайней мере равно числу сгибов). Обратите внимание, что в отличие от стандартных методов перекрестной проверки, последовательные обучающие наборы являются супернаборами тех, которые предшествуют им.

Внутри функции split формируем следующие параметры:

  • X[n_samples][n_features] – обучающие данные, где n_samples — количество выборок, а n_features — количество функций.
  • Y[n_samples] – всегда игнорируется, оставляем для совместимости версий библиотек python.
  • groups[n_samples] – групповые метки для выборок, используем для разбиения набора данных на тренировочные/тестовые наборы.

Результатом работы функции будут следующие данные:

  • train – Обучающий набор индексов для этого разделения;
  • test – Тестовый набор индексов для этого разделения.
from sklearn.model_selection import KFold
from sklearn.model_selection._split import _BaseKFold, indexable, _num_samples
from sklearn.utils.validation import _deprecate_positional_args

class PurgedGroupTimeSeriesSplit(_BaseKFold):
    
    @_deprecate_positional_args
    def __init__(self,
                 n_splits=5,
                 *,
                 max_train_group_size=np.inf,
                 max_test_group_size=np.inf,
                 group_gap=None,
                 verbose=False
                 ):
        super().__init__(n_splits, shuffle=False, random_state=None)
        self.max_train_group_size = max_train_group_size
        self.group_gap = group_gap
        self.max_test_group_size = max_test_group_size
        self.verbose = verbose

    def split(self, X, y=None, groups=None):
        if groups is None:
            raise ValueError(
                "The 'groups' parameter should not be None")
        X, y, groups = indexable(X, y, groups)
        n_samples = _num_samples(X)
        n_splits = self.n_splits
        group_gap = self.group_gap
        max_test_group_size = self.max_test_group_size
        max_train_group_size = self.max_train_group_size
        n_folds = n_splits + 1
        group_dict = {}
        u, ind = np.unique(groups, return_index=True)
        unique_groups = u[np.argsort(ind)]
        n_samples = _num_samples(X)
        n_groups = _num_samples(unique_groups)
        for idx in np.arange(n_samples):
            if (groups[idx] in group_dict):
                group_dict[groups[idx]].append(idx)
            else:
                group_dict[groups[idx]] = [idx]
        if n_folds > n_groups:
            raise ValueError(
                ("Cannot have number of folds={0} greater than"
                 " the number of groups={1}").format(n_folds,
                                                     n_groups))

        group_test_size = min(n_groups // n_folds, max_test_group_size)
        group_test_starts = range(n_groups - n_splits * group_test_size,
                                  n_groups, group_test_size)
        for group_test_start in group_test_starts:
            train_array = []
            test_array = []

            group_st = max(0, group_test_start - group_gap - max_train_group_size)
            for train_group_idx in unique_groups[group_st:(group_test_start - group_gap)]:
                train_array_tmp = group_dict[train_group_idx]
                
                train_array = np.sort(np.unique(
                                      np.concatenate((train_array,
                                                      train_array_tmp)),
                                      axis=None), axis=None)

            train_end = train_array.size
 
            for test_group_idx in unique_groups[group_test_start:
                                                group_test_start +
                                                group_test_size]:
                test_array_tmp = group_dict[test_group_idx]
                test_array = np.sort(np.unique(
                                              np.concatenate((test_array,
                                                              test_array_tmp)),
                                     axis=None), axis=None)

            test_array  = test_array[group_gap:]
            
            
            if self.verbose > 0:
                    pass
                    
            yield [int(i) for i in train_array], [int(i) for i in test_array]

3. Создадим класс CVTuner для формирования гиперпараметров обучаемой модели и выявления наилучшего результата подбора. Количество моделей мы определим сами, исходя из доступных мощностей, времени и необходимой точности прогноза.

Код класса:

class CVTuner(kt.engine.tuner.Tuner):
    def run_trial(self, trial, X, y, splits, batch_size=32, epochs=1,callbacks=None):
        val_losses = []
        for train_indices, test_indices in splits:
            X_train, X_test = [x[train_indices] for x in X], [x[test_indices] for x in X]
            y_train, y_test = [a[train_indices] for a in y], [a[test_indices] for a in y]
            if len(X_train) < 2:
                X_train = X_train[0]
                X_test = X_test[0]
            if len(y_train) < 2:
                y_train = y_train[0]
                y_test = y_test[0]
            
            model = self.hypermodel.build(trial.hyperparameters)
            hist = model.fit(X_train,y_train,
                      validation_data=(X_test,y_test),
                      epochs=epochs,
                        batch_size=batch_size,
                      callbacks=callbacks)
            
            val_losses.append([hist.history[k][-1] for k in hist.history])
        val_losses = np.asarray(val_losses)
        self.oracle.update_trial(trial.trial_id, {k:np.mean(val_losses[:,i]) for i,k in enumerate(hist.history.keys())})
        self.save_model(trial.trial_id, model)
def create_autoencoder(input_dim,output_dim,noise=0.05):
    i = Input(input_dim)
    encoded = BatchNormalization()(i)
    encoded = GaussianNoise(noise)(encoded)
    encoded = Dense(64,activation='relu')(encoded)
    decoded = Dropout(0.2)(encoded)
    decoded = Dense(input_dim,name='decoded')(decoded)
    x = Dense(32,activation='relu')(decoded)
    x = BatchNormalization()(x)
    x = Dropout(0.2)(x)
    x = Dense(output_dim,activation='sigmoid',name='label_output')(x)
    
    encoder = Model(inputs=i,outputs=encoded)
    autoencoder = Model(inputs=i,outputs=[decoded,x])
    
    autoencoder.compile(optimizer=Adam(0.001),loss={'decoded':'mse','label_output':'binary_crossentropy'})
    return autoencoder, encoder

4. Описываем функцию создания модели для обучения с подбором на основе байесовской оптимизации нормализированных данных:

def create_model(hp,input_dim,output_dim,encoder):
    inputs = Input(input_dim)
    
    x = encoder(inputs)
    x = Concatenate()([x,inputs]) #use both raw and encoded features
    x = BatchNormalization()(x)
    x = Dropout(hp.Float('init_dropout',0.0,0.5))(x)
    
    for i in range(hp.Int('num_layers',1,3)):
        x = Dense(hp.Int('num_units_{i}',64,256))(x)
        x = BatchNormalization()(x)
        x = Lambda(tf.keras.activations.swish)(x)
        x = Dropout(hp.Float(f'dropout_{i}',0.0,0.5))(x)
    x = Dense(output_dim,activation='sigmoid')(x)
    model = Model(inputs=inputs,outputs=x)
    model.compile(optimizer=Adam(hp.Float('lr',0.00001,0.1,default=0.001)),loss=BinaryCrossentropy(label_smoothing=hp.Float('label_smoothing',0.0,0.1)),metrics=[tf.keras.metrics.AUC(name = 'auc')])
    return model

5. Подготовим набор данных для обучения модели из файла train.csv.

Так как в набор данных были намеренно включены данные с weight = 0, удалим их, т.к. они включены для полноты наблюдения и не влияют на результат.

Загружаем данные для тренировки модели:

TRAINING = TRUE
USE_FINETUNE = False 
FOLDS = 5
SEED = 42

train = pd.read_csv('../input/jane-street-market-prediction/train.csv') 
train = train.astype({c: np.float32 for c in train.select_dtypes(include='float64').columns}) #limit memory use
train.fillna(train.mean(),inplace=True)
train = train.query('weight > 0').reset_index(drop = True)
train['action'] = (train['resp'] > 0).astype('int')
features = [c for c in train.columns if 'feature' in c]

resp_cols = ['resp_1', 'resp_2', 'resp_3', 'resp', 'resp_4']

X = train[features].values
y = np.stack([(train[c] > 0).astype('int') for c in resp_cols]).T #Multitarget

f_mean = np.mean(train[features[1:]].values,axis=0)

6. После подготовки данных, если модель еще не обучена, запускаем создание кодировщика, иначе – используем ранее созданный. 

autoencoder, encoder = create_autoencoder(X.shape[-1],y.shape[-1],noise=0.1)
if TRAINING:
    autoencoder.fit(X,(X,y),
                    epochs=1000,
                    batch_size=4096, 
                    validation_split=0.1,
                    callbacks=[EarlyStopping('val_loss',patience=10,restore_best_weights=True)])
    encoder.save_weights('/kaggle/working/encoder.hdf5')
else:
    encoder.load_weights('/kaggle/working/encoder.hdf5')
encoder.trainable = False

7. Создаем модель и отправляем её в CVTuner для определения гиперпараметров.

В обучении модели обеспечивается разбиение данных на части и группировка по значениям date. После запускаем поиск лучшего гиперпараметра, сохраняем его и сериализируем этот результат. Перебираем каждый полученный набор данных и сохраняем обученные модели.

Запускаем тренировку моделей и используем ее лучшую реализацию:

model_fn = lambda hp: create_model(hp,X.shape[-1],y.shape[-1],encoder)

tuner = CVTuner(
        hypermodel=model_fn,
        oracle=kt.oracles.BayesianOptimization(
        objective= kt.Objective('val_auc', direction='max'),
        num_initial_points=4,
        max_trials=20))

FOLDS = 5
SEED = 42

if TRAINING:
    gkf = PurgedGroupTimeSeriesSplit(n_splits = FOLDS, group_gap=20)
    splits = list(gkf.split(y, groups=train['date'].values))
    tuner.search((X,),(y,),splits=splits,batch_size=4096,epochs=100,callbacks=[EarlyStopping('val_auc', mode='max',patience=3)])
tuner.results_summary()
    hp  = tuner.get_best_hyperparameters(1)[0]
    pd.to_pickle(hp,f'./best_hp_{SEED}.pkl')
    for fold, (train_indices, test_indices) in enumerate(splits):
        model = model_fn(hp)
        X_train, X_test = X[train_indices], X[test_indices]
        y_train, y_test = y[train_indices], y[test_indices]
        model.fit(X_train,y_train,validation_data=(X_test,y_test),epochs=100,batch_size=4096,callbacks=[EarlyStopping('val_auc',mode='max',patience=10,restore_best_weights=True)])
        model.save_weights(f'./model_{SEED}_{fold}.hdf5')
        model.compile(Adam(hp.get('lr')/100),loss='binary_crossentropy')
        model.fit(X_test,y_test,epochs=3,batch_size=4096)
        model.save_weights(f'./model_{SEED}_{fold}_finetune.hdf5')
    tuner.results_summary()
else:
    models = []
    hp = pd.read_pickle(f'/kaggle/working/best_hp_{SEED}.pkl')
    for f in range(FOLDS):
        model = model_fn(hp)
        if USE_FINETUNE:
            
            model.load_weights(f'/kaggle/working/model_{SEED}_{f}_finetune.hdf5')
        else:
            model.load_weights(f'/kaggle/working/model_{SEED}_{f}.hdf5')
        models.append(model)

8. Для создания файла решения используем API временных рядов python. Проверяем модель:

if not TRAINING:
    f = np.median
    models = models[-3:]
    import janestreet
    env = janestreet.make_env()
    th = 0.5
    for (test_df, pred_df) in tqdm(env.iter_test()):
        if test_df['weight'].item() > 0:
            x_tt = test_df.loc[:, features].values
            if np.isnan(x_tt[:, 1:].sum()):
                x_tt[:, 1:] = np.nan_to_num(x_tt[:, 1:]) + np.isnan(x_tt[:, 1:]) * f_mean
            pred = np.mean([model(x_tt, training = False).numpy() for model in models],axis=0)
            pred = f(pred)
            pred_df.action = np.where(pred >= th, 1, 0).astype(int)
        else:
            pred_df.action = 0
        env.predict(pred_df)

Полученные результаты прогнозирования сохранены в файл submission.csv.

Таким образом с помощью построенной модели машинного обучения можно увеличить количество удачных сделок на финансовых рынках.

Финансового благополучия!