Время прочтения: 6 мин.
Импортируем все нужное:
from pyspark.sql import SparkSession
import pyspark.sql.functions as f
Создаем Спарк-сессию:
spark = SparkSession.builder \
.master("local[*]") \
.appName('testForNTA') \
.getOrCreate()
Для теста нам хватит классического датасета про Титаник, возьмем от него только тренировочную часть:
df = spark.read.format('csv') \
.option('header', True) \
.load('titanic.csv')
Давайте посмотрим что получилось:
Проверим количество строк с датафрейме:
df.count()
Встроенную функцию для определения размерности нам не подвезли, поэтому давайте реализуем ее вручную:
def shape(df):
return (df.count(), len(df.columns))
shape(df)
Так как наша задача сегодня поработать с оконными функциями — уберем большинство столбцов, оставив самые важные. Для этого создадим список нужный столбцов и передадим их в select. Посмотрим, что получилось в итоге:
cols = ['PassengerId', 'Pclass', 'Sex', 'Age', 'Embarked', 'Survived']
df = df.select(cols)
print(shape(df), '\n')
df.show(5)
Все хорошо. Двигаемся дальше. Допустим, у нас стоит задача в отдельный столбец записать среднюю выживаемость для мужчин и женщин. Давайте разберем решение данной задачи с помощью агрегатных функций. Для начала группируем наш датафрейм по полу, далее применяем агрегатную функцию для среднего по колонке выживаемости, после чего округляем до 2 знаков после запятой и даем имя новому столбцу через alias (аналогично SQLному AS).
mean = df.groupBy('Sex').agg(f.round(f.mean(f.col('Survived')), 2).alias('Mean'))
mean.orderBy(f.col('Mean').desc()).show()
Понимаем, что выгоднее в тот момент времени было быть женщиной… Ну да ладно, вторым действием произведем объединение первоначальной таблицы и нашего результата. Тут все аналогично pandas’овскому синтаксису (но merge -> join) — два датафрейма, столбцы по которым джойним и вид. Давайте взглянем на результат:
df.join(mean, on = ['Sex'], how = 'left').show(5)
Отлично, все на своем месте, но вернемся к основной теме — оконным функциям. Импортируем нужное:
from pyspark.sql.window import Window
Создаем окно, которое является аналогом group by, но при этом не схлопывает данные — количество строк будет неизменным. В данном случаем в одном окне будут данные только по мужчинам, в другой — по женщинам.
w = Window.partitionBy('Sex')
Давайте посчитаем среднее, аналогично задаче выше:
df = df.withColumn('Mean', f.mean(f.col('Survived')).over(w))
df = df.withColumn('Mean', f.round(f.col('Mean'), 2))
Кинем взор на результат, отсортировав по ИД пассажира:
df.orderBy('PassengerId').show(5)
Получаем странный результат: сортировка произошла, но очень криво, давайте посмотрим Schema:
df.printSchema()
При считывании с csv все столбцы получили тип string, хотя у нас есть числа. Ок, давайте исправлять.
Поменяем тип колонок на int с помощью cast, далее отсортируем, и проверим:
from pyspark.sql.types import IntegerType
df = df.withColumn('PassengerId', f.col('PassengerId').cast('int')) \
.withColumn('Pclass', f.col('Pclass').cast('int')) \
.withColumn('Age', f.col('Age').cast('int')) \
.withColumn('Survived', f.col('Survived').cast('int'))
df = df.orderBy('PassengerId')
df.printSchema()
df.show(5)
Давайте двигаться дальше, создадим свой датафрейм с мифическим сотрудниками, которые работают в определенном отделе и имеют какую-никакую зарплату:
listData = [('Евгений А.', 'Отдел А', 50000),
('Алексей Б.','Отдел А', 100000),
(АлександрВ.','Отдел А', 50000),
('Дмитрий Г.','Отдел А', 35000),
('АнтонД.','Отдел А', 35000)]
data = spark.createDataFrame(listData, ['Сотрудник', 'Отдел', 'Зарплата'])
Создадим окно по отделам, которое будет сортировать по колонке зарплаты. И посчитаем ранг каждого сотрудника в «зарплатной иерархии»:
w = Window.partitionBy('Отдел').orderBy('Зарплата')
data = data.withColumn('rank', f.rank().over(w))
data.show()
Давайте разбираться, что же делает rank в нашем случае. Как мы видим, отдел у нас один — отдел А, соответственно наше окно — вся таблица. Сортировка по умолчанию — по возрастанию. У двух сотрудников с зарплатой 35 тысяч условных единиц одинаковый ранг — 1, у двух других ранг уже 3, у последнего — 5. Rank выдает значение первому и всем остальным с одним и тем же значением одинаковый ранг, при смене зарплаты ранг меняется на номер строчки в данном окне (соответственно, если бы было 3 человека с зарплатой в 35 000, то следующий ранг после 1 был бы 4). Ух, получилось немного запутано, но, надеюсь, все осилили. Давайте возьмем модификацию, которая проще воспринимается «на глаз» — dense.
data = data.withColumn('dense_rank', f.dense_rank().over(w))
data.show()
Dense_rank делает тоже самое, но по порядку — ранги 1, 2, 3…и т.д. Давайте добавим новых работников в данное подразделение, но из другого отдела и создадим новый датафрейм.
listData += [('Иван И.', 'Отдел Б', 100000), ('Работник М.', 'Отдел Б', 30000), ('Петр П.', 'Отдел Б', 100000)]
data = spark.createDataFrame(listData, ['Сотрудник', 'Отдел', 'Зарплата'])
Проверим работоспособность нашего окна, когда отделов больше одного.
data = data.withColumn('rank', f.rank().over(w)).withColumn('dense_rank', f.dense_rank().over(w))
data.show()
Получаем то, что хотели. Но возникает вопрос, как идти от сотрудников с топовыми цифрами к обычным. Достаточно в сортировке выбрать столбец и указать desc для сортировки по убыванию.
w = Window.partitionBy('Отдел').orderBy(f.col('Зарплата').desc())
data = data.withColumn('rank', f.rank().over(w)).withColumn('dense_rank', f.dense_rank().over(w))
data.show()
Все, отлично. Давайте теперь разберемся со сдвиговыми функциями lead и lag. Часто встречаем задачи, когда нужно сдвинуть столбец и посчитать разницу (допустим, при работе с данными о покупках нужно будем найти время между покупками у одного клиента). Мы мудрить не будем и немного подвигаем зарплаты сотрудников разных отделов. Создадим новый столбец lag, в который запишем сдвиг столбца зарплаты на 1 вниз и посчитаем разницу соседей:
data = data.orderBy('Сотрудник')
data = data.withColumn('lag', f.lag(f.col('Зарплата'), 1).over(w))
data.show()
data = data.withColumn('diffLag', f.col('Зарплата') - f.col('lag'))
data.show()
Прошу заметить, что сдвиг выполнился для каждого отдела отдельно (благодаря окнам по отделам). Аналогично сделаем с lead — также двигаем на 1 строку.
data = data.withColumn('lead', f.lead(f.col('Зарплата'), 1).over(w))
data = data.withColumn('diffLead', f.col('Зарплата') - f.col('lead'))
data.show()
Порой нам нужно поделить окно поровну и расставить соответственно метки, к какой части относится там или иная строчка. На помощь приходит ntile. Она делит окно на несколько частей (у нас 2), давайте попробуем.
w = Window.partitionBy('Отдел').orderBy('Зарплата')
data = data.withColumn('group', f.ntile(2).over(w)).orderBy('group')
data.show()
На этом сегодня все. В конце хотелось бы сказать, что это только малая часть функционала PySpark, которая помогает с анализом данных. Мы рассмотрели только несколько примеров оконных функций. В следующих постах рассмотрим более сложную математику «под капотом» оконных функций. Стоит отметить, что любителям SQL этот инструмент будет довольно приятен и прост в освоении. Не забудьте остановить спарк-сессию, удачи!
spark.stop()