Время прочтения: 5 мин.

При написании запросов во многих базах данных в целом используется схожий синтаксис, могут отличаться названия функций, типов данных и т.п. Научившись работать, например, в MS SQL Server затем можно довольно быстро освоиться в Teradata или Greenplum. Особняком стоит PySpark – это API Apache Spark, применяемый для распределенной обработки больших данных. Его отличие в том, что при написании запросов могут использоваться разные техники, в том числе:

  1. Использование SQL запросов
  2. Использование DataFrame API.

Если первый способ будет понятен подавляющему большинству пользователей баз данных, то для применения второго требуется некоторое время, чтобы его освоить. В публикации приведу примеры аналогов основных конструкций, используемых в SQL, написанных на синтаксисе Spark DataFrame.

Перед работой в PySpark рекомендую выполнить импорт всех функций и типов данных:

from pyspark.sql.functions import *
from pyspark.sql.types import *

Пример выбора всех строк из таблицы, написанного на языке SQL в PySpark:

tbl = spark.sql("select * from tablename")

spark.sql – означает, что будет использован текст SQL запроса. Сам запрос передается в качестве аргумента.

Причем при запуске строки набор данных сгенерирован не будет, поскольку в Spark используются «ленивые» вычисления. Для выполнения запроса необходимо выполнить, так называемое, действие (action): .show(), .count(), .take(), .write, .toPandas() и т.д.

Например, отображение 10 строк из таблицы, поля «field_1», «field_2»:

tbl = spark.sql("select field_1, field_2 from tablename") 
tbl.show(10)

При применении PySpark DataFrame API текст запроса не используется, а вместо spark.sql берётся spark.table:

tbl = spark.table("tablename").select('field_1','field_2')
tbl.show(10)

В select() указываю названия полей, которые необходимо выгрузить.

Выборка по условию:

Из таблицы «tablename» беру строки, которые содержат значения = ‘Condition_1’ в поле «field_2» и значения = ‘Condition_2’

SQL:

tbl = spark.sql("select * from tablename where filed_1 = 'Condition_1' and field_2 = 'Condition_2'")

PySpark Dataframe API:

tbl = spark.table("tablename").filter("filed_1 = 'Condition_1' and field_2 = 'Condition_2'")

или

tbl = spark.table("tablename")\
        		.filter("filed_1 = 'Condition_1'")\
        		.filter("field_2 = 'Condition_2'")

Вместо filter() можно использовать where(). Синтаксис указания условий точно такой же, как в SQL.

Сортировка и удаление дубликатов:

Из таблицы «tablename» два поля «date_field» и «oper_amt», сортирую по убыванию дат («date_field»), убираю дублируемые строки.

SQL:

tbl = spark.sql("select distinct date_field, oper_amt from tablename order by date_field desc")

PySpark Dataframe API:

tbl = spark.table("tablename").select('date_field', 'oper_amt').orderBy(desc('date_field')).distinct()

orderBy() – сортировка.

desc() – по убыванию, если требуется сортировка по возрастанию, то ничего не указывается.

.distinct () – удаление дублирующихся строк.

Добавление колонки с одинаковым значением:

Добавлю столбец, который называется «new_field» и содержит одинаковое значение —  ‘Some_text’

SQL:

tbl = spark.sql("select date_field, 'Some_text' as new_field from tablename")

PySpark Dataframe API:

tbl = spark.table("tablename").withColumn("new_field", lit('Some_text'))

или

tbl = spark.table("tablename")
tbl = tbl.withColumn("new_field", lit('Some_text'))

lit() – функция для использования константы в качестве значений

Добавление колонки с case-when:

Добавлю поле «operation_name» по условию, если значение в поле «operation_id» равно 1, то ‘Category_1’, иначе ‘Category_2’

SQL:

tbl = spark.sql("select operation_id, case when operation_id = 1 then 'Category_1' else 'Category_2' end as operation_name from tablename")

PySpark Dataframe API:

tbl = spark.table("tablename")
   	tbl = tbl.withColumn("operation_name", when(col('operation_id') == 1, 'Category_1').otherwise('Category_2'))

when() – указывается условие, а после запятой – значение, если условие соблюдается.

otherwise() – значение, которое будет записано. Если условие не выполняется.

Также внутри when() можно указывать несколько условий, для их соединения используются побитовые операторы (& — и, | — или), а каждое условие заключается в скобки:

tbl = tbl.withColumn("operation_name", when((col('operation_id') == 1) | (col('date_field') >= '2022-01-01'), 'Category_1').otherwise('Category_2'))

Внутрь otherwise можно добавлять ещё один when():

tbl = tbl.withColumn("operation_name", when(col('operation_id') == 1, 'Category_1').otherwise(when(col('operation_id') == 2, 'Category_2').otherwise('Category_3')))

Функции агрегации:

Посчитаю общую сумму транзакций за каждый день.

SQL:

tbl = spark.sql("select date_field, sum(oper_amt) as sum_oper from tablename group by date_field")

PySpark Dataframe API:

tbl = spark.table("tablename").groupBy("date_field").sum("oper_amt")

.groupBy() – в скобках указываются поля агрегации.

.sum() – функция агрегации.

Пример указан для функции sum, аналогично применяются и другие функции: count(), min(), max() и т.п.

Оконные функции:

Выведу строку, которая содержит максимальную сумму операции по каждой «operation_id».

SQL:

tbl = spark.sql('''select 
                        operation_id,  
                        date_field,
                        oper_amt
                 from
                 (
                   select 
                      operation_id,  
                      date_field,
                      oper_amt, 
                      row_number() over(partition by operation_id order by oper_amt desc) as rn
                   from 
                       tablename 
                    group by 
                       date_field
                    ) tbl
                    where rn = 1''')

PySpark Dataframe API:

from pyspark.sql.window import Window
	tbl = spark.table('table_name')\         
            	.withColumn('rn',row_number().over(Window.partitionBy('operation_id').orderBy(desc('oper_amt'))))\
            	.select('operation_id','date_field','oper_amt','rn')\
            	.filter("rn = 1")\
            	.drop("rn")

Join-ы:

Join-ы:

К таблице «tablename_1» добавим название операции, которое найдено в таблице «tablename_2» по полю «operation_id».

SQL:

tbl = spark.sql('''select  
                       		t1.operation_id,  
                        		t1.date_field,
                        		t2.operation_name
                 	   from
                       		tablename_1 t1 
                 	   join
                       		tablename_2 t2
                  	   on
      				t1.operation_id = t2.operation_id
                         ''')

PySpark Dataframe API:

t1 = spark.table('tablename_1')
t2 = spark.table('tablename_2')

result = t1.join(t2,
     		 t1.operation_id == t2.operation_id,
      		'inner')\
   	  .select(t1.operation_id, t1.date_field, t2.operation_name)

inner– способ соединения таблиц, соответствует SQL ‘inner join’ или просто ‘join’, остаются строки из обеих таблиц, в которых найдено совпадение.

Так же есть следующие способы join-ов:

leftouter– соответствует SQL ‘left join’ (‘left outer join’), остаются все строки из первой (левой) таблицы, добавляются строки из второй (правой) таблицы, где найдено совпадение. Если совпадение не найдено, то ставится Null.

rightouter– соответствует SQL ‘right join’ (‘right outer join’), остаются все строки из правой таблицы, добавляются строки из левой таблицы, где найдено совпадение. Если совпадение не найдено, то ставится Null.

fullouter– соответствует SQL ‘full join’ (‘full outer join’), остаются все строки из левой и правой таблиц, где найдено совпадение. Если совпадение не найдено, то ставится Null.

leftsemi– остаются строки из левой таблицы, где найдены совпадения, из правой таблицы поля не берутся.

leftanti– остаются строки из левой таблицы, где НЕ найдены совпадения, из правой таблицы поля не берутся.

Главное преимущество использования DataFrame API в том, что это унифицированный формат работы с базами данных, который одинаково хорошо отрабатывается и в PySpark, и в Java, а так же ряде других актуальных движков, в которых так или иначе, приходится подключать вычисления BigData. В минусы можно записать только необходимость выучить новый синтаксис.