Время прочтения: 3 мин.
Во время работы с таблицами в Spark возникают ситуации, когда для обработки данных набора встроенных функций оказывается недостаточно. В этом случае можно выгрузить таблицу в Pandas DataFrame и обрабатывать данные на Python привычными функциями. Однако, есть способ сделать это быстрее – UDF-функции в Spark.
UDF (User Defined Functions) – это функции, которые не содержатся во встроенных модулях Spark и определяются самим пользователем. UDF позволяют расширить возможности обработки данных и могут содержать в себе комбинацию встроенных функций.
Разберём работу UDF-функций на примере задачи парсинга поля таблицы. Например, есть таблица Hive, состоящая из колонок ID и Comment. В последней содержатся строки с ФИО, датой рождения и номером договора клиента определенной структуры (таблица 1).
Таблица 1 – Пример таблицы в Hive (идентификационные данные пользователей фиктивны, любые совпадения случайны).
ID | Comment |
1 | ФИО: Иванов Иван Иванович, ДР: 01.01.2000, Номер договора: 0101 |
2 | ФИО: Петров Петр Петрович, ДР: 01.01.2001, Номер договора: 0102 |
3 | ФИО: Иванова Юлия Сергеевна, ДР: 01.01.2002, Номер договора: 0103 |
4 | ФИО: Сидорова Любовь Ивановна, ДР: 01.01.2003, Номер договора: 0104 |
5 | ФИО: Сидоров Василий Петрович, ДР: 01.01.2004, Номер договора: 0105 |
Задача – разбить комментарий на отдельные столбцы (ФИО, дата рождения и номер договора).
Записываем таблицу из Hive в Spark DataFrame и выводим результат на экран. По умолчанию, при использовании метода show(), длинные строки отображаются не полностью. Чтобы этого не произошло, используем параметр «truncate» со значением «False».
data = spark.sql('''select * from schema.table''')
data.show(truncate=False)
+---+-------------------------------------------------------------------+
|ID |Comment |
+---+-------------------------------------------------------------------+
|4 |ФИО: Сидорова Любовь Ивановна, ДР: 01.01.2003, Номер договора: 0104|
|3 |ФИО: Иванова Юлия Сергеевна, ДР: 01.01.2002, Номер договора: 0103 |
|5 |ФИО: Сидоров Василий Петрович, ДР: 01.01.2004, Номер договора: 0105|
|1 |ФИО: Иванов Иван Иванович, ДР: 01.01.2000, Номер договора: 0101 |
|2 |ФИО: Петров Петр Петрович, ДР: 01.01.2001, Номер договора: 0102 |
+---+-------------------------------------------------------------------+
Переходим, непосредственно, к объявлению UDF. Импортируем библиотеки с регулярными выражениями и модуль с функциями pyspark.
import re
import pyspark.sql.functions as f
Для каждого результирующего поля (ФИО, дата рождения, номер договора) будем применять свою функцию. Прежде всего, объявляем их с помощью «def» и описываем внутри выполняемые действия. Здесь для извлечения информации в каждой строке находим нужную подстроку и удаляем сами сущности, указывающие на структуру («ФИО:», «ДР:», «Номер договора:»).
def pars_name(text):
result = re.findall(r'ФИО:\s[^,]*', text)
result = re.sub(r'ФИО:\s','', result[0])
return result
def pars_birthday(text):
result = re.findall(r'ДР:\s[^,]*', text)
result = re.sub(r'ДР:\s','', result[0])
return result
def pars_contract(text):
result = re.findall(r'Номер договора:\s[^,]*', text)
result = re.sub(r'Номер договора:\s','', result[0])
return result
Далее, необходимо провести регистрацию функций, используя метод udf() модуля pyspark.sql.functions. В параметрах указываем саму функцию и тип её выходных данных. Для описанных выше процедур парсинга результатами будут строки (StringType()). Чтобы функция применилась к каждому элементу столбца, а не к одной строке, используем Лямбда-функцию Python.
U_name = f.udf(lambda text :pars_name(text), StringType())
U_birthday = f.udf(lambda text :pars_birthday(text), StringType())
U_contract = f.udf(lambda text :pars_contract(text), StringType())
Применение UDF-функций происходит путём добавления новых столбцов в DataFrame.
data = data.withColumn('Name',U_name('Comment')).withColumn('Birthday',
U_birthday('Comment')).withColumn('Contract',U_contract('Comment'))
data.show(truncate=False)
Результат – DataFrame Spark с новыми отдельными полями, который, при необходимости, можно легко сохранить как таблицу в Hive. Кроме того, появляется возможность продолжить обработку данных внутри Spark, соединяя результат с другими таблицами по полученным столбцам.
UDF-функции позволяют проводить дополнительную обработку данных в Spark, выходящую за рамки возможностей встроенных функций. В данной статье UDF рассматриваются на примере парсинга столбца со строками определённой структуры, однако, они могут находить применение в самых различных задачах обработки данных.