Время прочтения: 3 мин.

Apache Spark – фреймворк, предоставляющий API для выполнения распределенной обработки данных. Его также можно использовать для разработки моделей машинного обучения и искусственного интеллекта, что позволит распараллеливать процессы обработки по нодам вычислительного кластера, влияя при этом на скорость обучения.

Apache Spark поддерживает языки Scala, Python, Java и R. PySpark – это Python API для использования Apache Spark. В данном посте рассмотрю реализацию модели градиентного бустинга над решающими деревьями (GBT) на примере популярного датасета табличных данных – Titanic[1]. Реализованный пример поможет снизить пороги входа для использования Spark ML.

Spark ML является основной библиотекой для разработки моделей машинного обучения в Apache Spark. С помощью данного модуля можно решать множество задач: регрессия, классификация, кластеризация, снижение размерности. Можно также обрабатывать пропущенные значения, искать и устранять выбросы. Для борьбы с переобучением Spark ML поддерживает стандартную L1 и L2 регуляризацию. Для графовых структур Apache Spark имеет модуль GraphX, а для потоковой обработки данных Spark Streaming. Фреймворк активно развивается для достижения максимально быстрых результатов обучения моделей искусственного интеллекта.

Датасет Titanic содержит в себе характеристики человека, такие как пол, возраст, статус билета, цена билета, номер кабины и др. Необходимо предсказать выжил ли человек в результате крушения корабля или нет, основываясь на входных данных. Таким образом, реализую начало сессии PySpark и осуществлю просмотр первых десяти строк датасета. На данном этапе будет использоваться Spark DataFrame.

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('example').getOrCreate()
df = spark.read.csv('/train.csv', header=True, inferSchema=True)
df.show(10)

Аналогичным образом можно выбрать только необходимые колонки для просмотра данных.

df1.select('Survived', 'Name', 'Parch').show(1)

Далее проведу обзорный взгляд на данные с помощью формирования дополнительных датафреймов. Таким образом, посчитаю сколько человек выжило и нет.

df.groupBy('Survived').count().show()
+--------+-----+
|Survived|count|
+--------+-----+
|       1|  342|
|       0|  549|

Сколько человек мужчин и сколько человек женщин.

df.groupBy('Survived').pivot('Sex').count().show()
+--------+------+----+
|Survived|female|male|
+--------+------+----+
|       1|   233| 109|
|       0|    81| 468|

Можно посмотреть сколько пропущенных значений наблюдается в датасете.

for col in df1.columns:
    print(col, df.filter(df[col].isNull()).count())

Теперь перейду непосредственно к части Spark построения модели. Использую StringIndexer для перевода текстовой фичи – пол, в категориальную переменную.

from pyspark.ml.feature import StringIndexer
new_index = StringIndexer(inputCols=['Sex'], 
                       outputCols=['SexNum'])

stringIndex_model = new_index.fit(df)
df_ = stringIndex_model.transform(df).drop('Sex')

Далее буду использовать инициализацию VectorAssembler для формирования матрицы объясняющих переменных и целевой переменой. VectorAssembler формирует один векторный столбец из заданного ему на вход списка столбцов (фичей + целевой переменной). Не может работать со строковыми данными.

from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=df_.columns[1:], 
                           outputCol='features')

df_ = assembler.transform(df_).select('features', 'Survived')

     После этого стандартным образом делю выборку на тренировочную и тестовую в пропорции 70% и 30%.

train_df, test_df = df_.randomSplit([0.7, 0.3])

            Теперь могу перейти к обучению модели и определению метрики качества построенной модели. Для этого сперва необходимо инициализировать экземпляр MulticlassClassificationEvaluator(), в который необходимо передать целевую переменную и метрику качества. Для оценки данной модели буду использовать метрику accuracy, так как она является распространенной и легко применимой.

evaluation = MulticlassClassificationEvaluator(labelCol='Survived', 
                                          metricName='accuracy')

Теперь обучаю модель градиентного бустинга.

gradient_boosting = GBTClassifier(labelCol='Survived')

model = gradient_boosting.fit(train_df)
pred = model.transform(test_df)
evaluation.evaluate(pred)

            В результате построения такой модели метрика получилась 0,85.

            Таким образом, в рамках данной публикации, были продемонстрированы первоначальная обработка данных датасета, а также обучение модели градиентного бустинга. Показаны базовые трансформации и действия, необходимые для получения результата обучения модели, что служит хорошим и быстрым стартом для понимания работы Spark ML.


[1] https://www.kaggle.com/competitions/titanic/code?competitionId=3136&searchQuery=pyspark