Время прочтения: 3 мин.
Apache Spark – фреймворк, предоставляющий API для выполнения распределенной обработки данных. Его также можно использовать для разработки моделей машинного обучения и искусственного интеллекта, что позволит распараллеливать процессы обработки по нодам вычислительного кластера, влияя при этом на скорость обучения.
Apache Spark поддерживает языки Scala, Python, Java и R. PySpark – это Python API для использования Apache Spark. В данном посте рассмотрю реализацию модели градиентного бустинга над решающими деревьями (GBT) на примере популярного датасета табличных данных – Titanic[1]. Реализованный пример поможет снизить пороги входа для использования Spark ML.
Spark ML является основной библиотекой для разработки моделей машинного обучения в Apache Spark. С помощью данного модуля можно решать множество задач: регрессия, классификация, кластеризация, снижение размерности. Можно также обрабатывать пропущенные значения, искать и устранять выбросы. Для борьбы с переобучением Spark ML поддерживает стандартную L1 и L2 регуляризацию. Для графовых структур Apache Spark имеет модуль GraphX, а для потоковой обработки данных Spark Streaming. Фреймворк активно развивается для достижения максимально быстрых результатов обучения моделей искусственного интеллекта.
Датасет Titanic содержит в себе характеристики человека, такие как пол, возраст, статус билета, цена билета, номер кабины и др. Необходимо предсказать выжил ли человек в результате крушения корабля или нет, основываясь на входных данных. Таким образом, реализую начало сессии PySpark и осуществлю просмотр первых десяти строк датасета. На данном этапе будет использоваться Spark DataFrame.
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('example').getOrCreate()
df = spark.read.csv('/train.csv', header=True, inferSchema=True)
df.show(10)
Аналогичным образом можно выбрать только необходимые колонки для просмотра данных.
df1.select('Survived', 'Name', 'Parch').show(1)
Далее проведу обзорный взгляд на данные с помощью формирования дополнительных датафреймов. Таким образом, посчитаю сколько человек выжило и нет.
df.groupBy('Survived').count().show()
+--------+-----+
|Survived|count|
+--------+-----+
| 1| 342|
| 0| 549|
Сколько человек мужчин и сколько человек женщин.
df.groupBy('Survived').pivot('Sex').count().show()
+--------+------+----+
|Survived|female|male|
+--------+------+----+
| 1| 233| 109|
| 0| 81| 468|
Можно посмотреть сколько пропущенных значений наблюдается в датасете.
for col in df1.columns:
print(col, df.filter(df[col].isNull()).count())
Теперь перейду непосредственно к части Spark построения модели. Использую StringIndexer для перевода текстовой фичи – пол, в категориальную переменную.
from pyspark.ml.feature import StringIndexer
new_index = StringIndexer(inputCols=['Sex'],
outputCols=['SexNum'])
stringIndex_model = new_index.fit(df)
df_ = stringIndex_model.transform(df).drop('Sex')
Далее буду использовать инициализацию VectorAssembler для формирования матрицы объясняющих переменных и целевой переменой. VectorAssembler формирует один векторный столбец из заданного ему на вход списка столбцов (фичей + целевой переменной). Не может работать со строковыми данными.
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=df_.columns[1:],
outputCol='features')
df_ = assembler.transform(df_).select('features', 'Survived')
После этого стандартным образом делю выборку на тренировочную и тестовую в пропорции 70% и 30%.
train_df, test_df = df_.randomSplit([0.7, 0.3])
Теперь могу перейти к обучению модели и определению метрики качества построенной модели. Для этого сперва необходимо инициализировать экземпляр MulticlassClassificationEvaluator(), в который необходимо передать целевую переменную и метрику качества. Для оценки данной модели буду использовать метрику accuracy, так как она является распространенной и легко применимой.
evaluation = MulticlassClassificationEvaluator(labelCol='Survived',
metricName='accuracy')
Теперь обучаю модель градиентного бустинга.
gradient_boosting = GBTClassifier(labelCol='Survived')
model = gradient_boosting.fit(train_df)
pred = model.transform(test_df)
evaluation.evaluate(pred)
В результате построения такой модели метрика получилась 0,85.
Таким образом, в рамках данной публикации, были продемонстрированы первоначальная обработка данных датасета, а также обучение модели градиентного бустинга. Показаны базовые трансформации и действия, необходимые для получения результата обучения модели, что служит хорошим и быстрым стартом для понимания работы Spark ML.
[1] https://www.kaggle.com/competitions/titanic/code?competitionId=3136&searchQuery=pyspark