Время прочтения: 6 мин.

Импортируем все нужное:

from pyspark.sql import SparkSession
import pyspark.sql.functions as f

Создаем Спарк-сессию:

spark = SparkSession.builder \
        .master("local[*]") \
        .appName('testForNTA') \
        .getOrCreate()

Для теста нам хватит классического датасета про Титаник, возьмем от него только тренировочную часть:

df = spark.read.format('csv') \
               .option('header', True) \
               .load('titanic.csv')

Давайте посмотрим что получилось:

Проверим количество строк с датафрейме:

df.count()

Встроенную функцию для определения размерности нам не подвезли, поэтому давайте реализуем ее вручную:

def shape(df):
    return (df.count(), len(df.columns))
shape(df)

Так как наша задача сегодня поработать с оконными функциями — уберем большинство столбцов, оставив самые важные. Для этого создадим список нужный столбцов и передадим их в select. Посмотрим, что получилось в итоге:

cols = ['PassengerId', 'Pclass', 'Sex', 'Age', 'Embarked', 'Survived']
df = df.select(cols)
print(shape(df), '\n')
df.show(5)

Все хорошо. Двигаемся дальше. Допустим, у нас стоит задача в отдельный столбец записать среднюю выживаемость для мужчин и женщин. Давайте разберем решение данной задачи с помощью агрегатных функций. Для начала группируем наш датафрейм по полу, далее применяем агрегатную функцию для среднего по колонке выживаемости, после чего округляем до 2 знаков после запятой и даем имя новому столбцу через alias (аналогично SQLному AS).

mean = df.groupBy('Sex').agg(f.round(f.mean(f.col('Survived')), 2).alias('Mean'))
mean.orderBy(f.col('Mean').desc()).show()

Понимаем, что выгоднее в тот момент времени было быть женщиной… Ну да ладно, вторым действием произведем объединение первоначальной таблицы и нашего результата. Тут все аналогично pandas’овскому синтаксису (но merge -> join) — два датафрейма, столбцы по которым джойним и вид. Давайте взглянем на результат:

df.join(mean, on = ['Sex'], how = 'left').show(5)

Отлично, все на своем месте, но вернемся к основной теме — оконным функциям. Импортируем нужное:

from pyspark.sql.window import Window

Создаем окно, которое является аналогом group by, но при этом не схлопывает данные — количество строк будет неизменным. В данном случаем в одном окне будут данные только по мужчинам, в другой — по женщинам.

w = Window.partitionBy('Sex')

Давайте посчитаем среднее, аналогично задаче выше:

df = df.withColumn('Mean', f.mean(f.col('Survived')).over(w))
df = df.withColumn('Mean', f.round(f.col('Mean'), 2))

Кинем взор на результат, отсортировав по ИД пассажира:

df.orderBy('PassengerId').show(5)

Получаем странный результат: сортировка произошла, но очень криво, давайте посмотрим Schema:

df.printSchema()

При считывании с csv все столбцы получили тип string, хотя у нас есть числа. Ок, давайте исправлять.

Поменяем тип колонок на int с помощью cast, далее отсортируем, и проверим:

from pyspark.sql.types import IntegerType
df = df.withColumn('PassengerId', f.col('PassengerId').cast('int')) \
.withColumn('Pclass', f.col('Pclass').cast('int')) \
.withColumn('Age', f.col('Age').cast('int')) \
.withColumn('Survived', f.col('Survived').cast('int')) 
df = df.orderBy('PassengerId')
df.printSchema()
df.show(5)

Давайте двигаться дальше, создадим свой датафрейм с мифическим сотрудниками, которые работают в определенном отделе и имеют какую-никакую зарплату:

listData = [('Евгений А.', 'Отдел А', 50000), 
            ('Алексей Б.','Отдел А', 100000), 
            (АлександрВ.','Отдел А', 50000), 
            ('Дмитрий Г.','Отдел А', 35000), 
            ('АнтонД.','Отдел А', 35000)]
data = spark.createDataFrame(listData, ['Сотрудник', 'Отдел', 'Зарплата'])

Создадим окно по отделам, которое будет сортировать по колонке зарплаты. И посчитаем ранг каждого сотрудника в «зарплатной иерархии»:

w = Window.partitionBy('Отдел').orderBy('Зарплата')
data = data.withColumn('rank', f.rank().over(w))
data.show()

Давайте разбираться, что же делает rank в нашем случае. Как мы видим, отдел у нас один — отдел А, соответственно наше окно — вся таблица. Сортировка по умолчанию — по возрастанию. У двух сотрудников с зарплатой 35 тысяч условных единиц одинаковый ранг — 1, у двух других ранг уже 3, у последнего — 5. Rank выдает значение первому и всем остальным с одним и тем же значением одинаковый ранг, при смене зарплаты ранг меняется на номер строчки в данном окне (соответственно, если бы было 3 человека с зарплатой в 35 000, то следующий ранг после 1 был бы 4). Ух, получилось немного запутано, но, надеюсь, все осилили. Давайте возьмем модификацию, которая проще воспринимается «на глаз» — dense.

data = data.withColumn('dense_rank', f.dense_rank().over(w))
data.show()

Dense_rank делает тоже самое, но по порядку — ранги 1, 2, 3…и т.д. Давайте добавим новых работников в данное подразделение, но из другого отдела и создадим новый датафрейм.

listData += [('Иван И.', 'Отдел Б', 100000), ('Работник М.', 'Отдел Б', 30000), ('Петр П.', 'Отдел Б', 100000)]
data = spark.createDataFrame(listData, ['Сотрудник', 'Отдел', 'Зарплата'])

Проверим работоспособность нашего окна, когда отделов больше одного.

data = data.withColumn('rank', f.rank().over(w)).withColumn('dense_rank', f.dense_rank().over(w))
data.show()

Получаем то, что хотели. Но возникает вопрос, как идти от сотрудников с топовыми цифрами к обычным. Достаточно в сортировке выбрать столбец и указать desc для сортировки по убыванию.

w = Window.partitionBy('Отдел').orderBy(f.col('Зарплата').desc())
data = data.withColumn('rank', f.rank().over(w)).withColumn('dense_rank', f.dense_rank().over(w))
data.show()

Все, отлично. Давайте теперь разберемся со сдвиговыми функциями lead и lag. Часто встречаем задачи, когда нужно сдвинуть столбец и посчитать разницу (допустим, при работе с данными о покупках нужно будем найти время между покупками у одного клиента). Мы мудрить не будем и немного подвигаем зарплаты сотрудников разных отделов. Создадим новый столбец lag, в который запишем сдвиг столбца зарплаты на 1 вниз и посчитаем разницу соседей:

data = data.orderBy('Сотрудник')
data = data.withColumn('lag', f.lag(f.col('Зарплата'), 1).over(w))
data.show()

data = data.withColumn('diffLag', f.col('Зарплата') - f.col('lag'))
data.show()

Прошу заметить, что сдвиг выполнился для каждого отдела отдельно (благодаря окнам по отделам). Аналогично сделаем с lead — также двигаем на 1 строку.

data = data.withColumn('lead', f.lead(f.col('Зарплата'), 1).over(w))
data = data.withColumn('diffLead', f.col('Зарплата') - f.col('lead'))
data.show()

Порой нам нужно поделить окно поровну и расставить соответственно метки, к какой части относится там или иная строчка. На помощь приходит ntile. Она делит окно на несколько частей (у нас 2), давайте попробуем.

w = Window.partitionBy('Отдел').orderBy('Зарплата')
data = data.withColumn('group', f.ntile(2).over(w)).orderBy('group')
data.show()

На этом сегодня все. В конце хотелось бы сказать, что это только малая часть функционала PySpark, которая помогает с анализом данных. Мы рассмотрели только несколько примеров оконных функций. В следующих постах рассмотрим более сложную математику «под капотом» оконных функций. Стоит отметить, что любителям SQL этот инструмент будет довольно приятен и прост в освоении. Не забудьте остановить спарк-сессию, удачи!

spark.stop()