Время прочтения: 8 мин.

Аналитику или исследователю данных приходится разрабатывать множество алгоритмов по обработке и анализу различных данных. Большинство алгоритмов разрабатываются для многоразового использования, а значит, код либо запускается разработчиком с определенной периодичностью, либо код передается другим пользователям для обработки своих данных. При этом алгоритмы имеют множество параметров и зависимостей, которые необходимо индивидуально настраивать под определенные данные.

Для того чтобы сделать процесс развертывания, использования и доработки алгоритма интуитивно понятным воспользуемся инструментом Kedro. Основная концепция kedro заключается в модульной структуре, где весь цикл работы с данными формируется из отдельных блоков в единый рабочий процесс. Проект на kedro имеет следующую структуру:

Где директория conf содержит конфигурационные файлы для подключения к внешним источникам, учетные данные, настройки ml-алгоритма и прочие параметры. data необходима для хранения данных на различных этапах обработки, docs предназначена для документации проекта, а logs для хранения журнала событий работы алгоритмов. Notebooks – предназначен для хранения jupyter-блокнотов и src, в которой размещается основной код для формирования pipeline проекта.

Для примера, возьмем хорошо известную задачу: предсказание выживших при кораблекрушении Титаника. Полный код, представленный в статье, можно найти в репозитории.

В первую очередь инициализируем git, создадим виртуальное окружение и установим kedro. Далее создадим файл config.yml со следующим содержимым:

output_dir: .
project_name: titanic
repo_name: titanic
python_package: titanic

А затем выполним команду kedro new —config config.yml для формирования оболочки будущего проекта.

Чтобы использовать функционал kedro необходимо установить зависимости с помощью команды: kedro install. Следующий шаг – регистрация входных данных, которые будут анализироваться. Для этого поместим данные в titanic\data\01_raw. Все данные, которые будут загружаться kedro, необходимо указать в конфигурационном файле titanic\conf\base\catalog.yml следующим образом:

input_data:
    type: pandas.CSVDataSet
    filepath: data/01_raw/train.csv

Где,

input_data – имя объекта данных, по которому будем обращаться в kedro;

type – модуль kedro, с помощью которого будем загружать данные;

filepath – расположение исходных данных.

Kedro поддерживает множество типов данных: csv, xlsx, json, parquet, sql-query и т.д. Следующий этап – настройка pipeline, который будет выполнять весь цикл работы алгоритма, от предобработки данных до оценки качества модели. Pipeline в Kedro выполняется с помощью nodes – блока модулей, которые выполняются последовательно, а полученные данные передаются от одного модуля к другому. Для начала напишем модуль предобработки данных. В директории src\titanic\pipelines создадим папку с именем data_processing, в которую поместим файл nodes.py со следующим содержимым:

import pandas as pd

def cat_to_num(arr):
    return {i:c for c, i in enumerate(arr)}
    
def preprocess_dataset(df: pd.DataFrame) -> pd.DataFrame:
    df = df.dropna(subset=["Embarked"])
    d_sex = cat_to_num(df["Sex"].unique())
    d_embarked = cat_to_num(df["Embarked"].unique()) 
    df["Sex"] = df["Sex"].map(d_sex)
    df["Embarked"]= df["Embarked"].map(d_embarked)
    return df

Для примера преобразовали 2 столбца («Sex» и «Embarked») к числовому типу и удалили строки, с пропущенными значениями по столбцу Embarked. Далее для инициализации обработки данных в pipeline нам необходимо создать файл pipeline.py рядом с nodes.py в директории src\titanic\pipelines\data_processing со следующим содержимым:

from kedro.pipeline import Pipeline, node
from .nodes import preprocess_dataset

def create_pipeline(**kwargs):
    return Pipeline(
                    [node(
                          func=preprocess_dataset,
                          inputs="input_data",
                          outputs="preprocessed_input_data",
                          name="preprocessing_input_data_node"
                          )
                    ]
                   )

Где,

func – метод для обработки входных данных, который реализован в nodes.py;

inputs – входные данные, имя указано в conf\base\catalog.yml;

outputs – задаем имя для обработанных данных;

name – задаем имя для модуля в pipeline. А также инициализируем pipeline в проектном пространстве, для этого создадим файл __init__.py в директории src\titanic\pipelines\data_processing\, в котором пропишем метод create_pipeline:

from .pipeline import create_pipeline

Заключительный момент создания отдельного модуля pipeline – регистрация в файле src\titanic\ pipeline_registry.py:

from typing import Dict
from kedro.pipeline import Pipeline
from titanic.pipelines import data_processing as dp

def register_pipelines() -> Dict[str, Pipeline]:
    
    data_processing_pipeline = dp.create_pipeline()
    
    return {"__default__": data_processing_pipeline,
            "dp": data_processing_pipeline}

Запустим проверку, чтобы убедиться в правильном составлении модуля обработки входных данных, для этого в терминале запустим команду, что запустит весь pipeline:

kedro run

или запустим отдельный модуль, указав имя через аргумент –node:

kedro run --node=preprocessing_input_data_node

В результате должны получить подобный вывод:

Обработка выполнена, однако результат обработки хранится в памяти (MemoryDataSet) до завершения pipeline. Для того чтобы зафиксировать промежуточный результат необходимо дополнить файл titanic\conf\base\catalog.yml:

………………………….
preprocessed_input_data:
    type: pandas.CSVDataSet
    filepath: data/02_intermediate/preprocessed_input_data.csv

Таким образом, результат метода preprocess_dataset будет сохранен в csv файл, в директорию data/02_intermediate/, к которому можно будет обратиться в любой момент. Далее приступим к DS составляющей pipeline. Для начала необходимо установить пакеты по работе с ml-алгоритмами, для этого отредактируем файл titanic\src\requirements.in:

………
scikit-learn==0.23.1

И вызовем в терминале команду:

kedro install –build-reqs

По подобию с обработкой данных, создадим папку data_science в директории src\titanic\pipelines\ и добавим файл nodes.py со следующим содержимым:

import logging
from typing import Dict, Tuple

import pandas as pd
import numpy as np

from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn import metrics

def get_score(name_metric: str, y_true: pd.Series, y_pred: (np.ndarray, np.array)) -> float:
    ## проверка на правильность названия метрики
    name_metrics = [i for i in dir(metrics) if callable(getattr(metrics, i)) and not i.startswith("__")]
    if name_metric not in name_metrics:
        print("Invalid metric name")
        return np.nan
    score = getattr(metrics, name_metric)(y_true, y_pred)
    return round(score, 2)

def split_data(df: pd.DataFrame, parameters: Dict) -> Tuple:
    X = df[parameters["features"]]
    y = df["Survived"]
    X_train, X_test, y_train, y_test = train_test_split(X, 
                                                        y,
                                                        test_size=parameters["test_size"],
                                                        random_state=parameters["random_state"])
    return X_train, X_test, y_train, y_test
    
def train_model(X_train: pd.DataFrame, y_train: pd.Series, parameters: Dict) -> RandomForestClassifier:
    rndm_forest = RandomForestClassifier(n_estimators=parameters["n_estimators"])
    rndm_forest.fit(X_train, y_train)
    return rndm_forest

def evaluate_model(rndm_forest: RandomForestClassifier, X_test: pd.DataFrame, y_test: pd.Series, parameters: Dict):
    y_pred = rndm_forest.predict(X_test)
    score = get_score(parameters["metric"], y_test, y_pred)
    logger = logging.getLogger(__name__)
    logger.info(f"Model has a value {score} for {parameters['metric']}")

Метод split_data разбивает обработанные данные на обучающую и тестовую выборку, train_model – обучение модели, evaluate_model -проверяет качество модели, метод get_score возвращает значение по заданной метрике.

Как вы уже заметили, на вход методов передается аргумент parameters, который мы не импортируем и не извлекаем из других методов. Данный объект будет подтягиваться из конфигурационного файла titanic\conf\base\parameters.yml, из которого будем брать необходимые значения для настройки гиперпараметров модели, размера тестовой выборки и т.д. Метод get_score реализован для примера, чтобы показать возможность настройки любых параметров, которые будут полезны при настройке вашего проекта. Таким образом вам не нужно искать параметры в исходном коде, нужно лишь подправить файл parameters.yml и вызвать команду kedro run, для обучения модели с новыми параметрами. Внесем необходимые значения в файл parameters.yml:

test_size: 0.2
random_state: 17
n_estimators: 100
metric: "accuracy_score"
features:
    - Pclass
    - Sex
    - SibSp
    - Parch
    - Embarked

Параметр features (используем в методе split_data) необходим для отбора только тех признаков, которые будем использовать для обучения и предсказания, также можно указать и целевой признак, если данные берутся из различных источников. Далее повторяем операции сохранения данных, и инициализации pipeline. Для начала дополним файл titanic\conf\base\catalog.yml для сохранения обученной модели:

………….
classificator:
    type: pickle.PickleDataSet
    filepath: data/06_models/classificator.pickle
    versioned: true

Отличием от предыдущих способов сохранения данных является аргумент versioned, если укажем значение true, модель будет сохраняться каждый раз при запуске pipeline. Далее создадим data_science модуль для pipeline, для этого поместим в директорию src\titanic\pipelines\data_science файл pipeline.py со следующим содержимым:

from kedro.pipeline import Pipeline, node
from .nodes import split_data, train_model, evaluate_model

def create_pipeline(**kwargs):
    return Pipeline([
                      node(
                            func=split_data,
                            inputs=["preprocessed_input_data", "parameters"],
                            outputs=["X_train", "X_test", "y_train", "y_test"]
                            name="split_data_node"
                           ),
                      node(
                            func=train_model,
                            inputs=["X_train", "y_train", "parameters"],
                            outputs="classificator",
                            name="train_model_node",
                          ),
                      node(
                            func=evaluate_model,
                            inputs=["classificator", "X_test", "y_test", "parameters"],
                            outputs=None,
                            name="evaluate_model_node"
                           )
                    ])

Данный модуль состоит из 3-х функций, где использование входных данных зависит от выполнения предыдущих функций. Не забываем подавать на вход функций объект parameters, для доступа к параметрам, влияющих на обучающую способность модели.

Обратите внимание, каким образом функции обращаются к данным. В kedro указывается не физическое расположение данных, а используется имя объекта, которое хранится либо в памяти (MemoryDataSet — пока выполняется pipeline), либо ссылается на уже существующий объект, указанный в titanic\conf\base\catalog.yml. Также инициализируем data_science модуль в проектном пространстве, для этого создадим файл __init__.py в директории src\titanic\pipelines\data_science, в котором укажем следующую строку:

from .pipeline import create_pipeline

Также зарегистрируем data_science модуль в общий pipeline проекта, для этого редактируем файл \src\titanic\ pipeline_registry.py:

from typing import Dict
from kedro.pipeline import Pipeline
from titanic.pipelines import data_processing as dp
from titanic.pipelines import data_science as ds

def register_pipelines() -> Dict[str, Pipeline]:

    data_processing_pipeline = dp.create_pipeline()
    data_science_pipeline = ds.create_pipeline()
    
    return {"__default__": data_processing_pipeline + data_science_pipeline,
            "dp": data_processing_pipeline,
            "ds": data_science_pipeline
           }

В данном случае знак «+» для двух pipelines используется для того, чтобы kedro понимал очередность выполнения модулей. Т.к. предобработку данных уже запускали раннее, можно запустить только ds часть с помощью аргумента –pipeline:

kedro run –pipeline=ds

В результате должны получить подобный вывод:

В консоли logger выводит сообщение о том, что точность модели на валидационных данных составляет 0.78 для метрики accuracy_score. По завершению pipeline, обученная модель появится в директории data/06_models/.

На этом закончим формирование pipeline. Создание модуля предсказания на тестовых данных, с лучшей моделью оставим для домашнего задания)))

Код в статье может показаться немного запутанным. Однако у kedro подробная и последовательная документация, которая ответит на любые вопросы по формированию проекта.

Подведем итоги:

Kedro – очень удобен для командной разработки, т.к. имеет определенную структуру, и не требуется поиск определенного параметра для настройки алгоритма в папке «temp» с названием файла «my_functions.py».

Для того чтобы прогнать новые данные, требуется лишь поместить данные в директорию data\01_raw и запустить команду в терминале — kedro run. Но главное – это наличие множества интересных плагинов для kedro, которые упрощают работу, как разработчика, так и исследователя данных. В следующей части статьи уделим внимание возможностям kedro, для визуального представления работы Вашего проекта.