Время прочтения: 3 мин.
При работе с распределенными базами данных чаще всего используют Spark и его собственные DataFrame.
В данном посте разберём различные способы создания столбцов путем преобразования, вычислений, применения регулярных выражений и т.д. Также мы сравним длительность каждого способа и какой лучше где применить.
Итак, будем работать с одним DataFrame и двумя столбцами (ID и ФИО, для наших целей их вполне достаточно), прочитаем его с hdfs:
df_spark = spark.read\
.format('csv')\
.option('sep' , '~')\
.option('header' , 'true')\
.load("hdfs://arnsdpsbx/user/team/team_sva_oarb/sib/Sizov_AA/EPK/12345.csv")
df_spark.select('ID' , 'C_NAME').show(3, False, True)
Для примера возьмем срез для ФИО, первые 7 символов. Рассмотрим несколько вариантов решения данной задачи:
- RDD
- UDF
- Трансформер
Работа с RDD
RDD – простая неизменяемая распределённая коллекция объектов. Она делится на множество частей, обрабатывающихся на разных частях кластера. Это объект более низкого уровня и к нему можно применить функцию MAP.
Преобразуем DataFrame в RDD:
start = dt.datetime.now()
rdd = df_spark.rdd.map(lambda x: (x[0], x[1] , x[1][0:7]))
df = spark.createDataFrame(rdd)
df.show(2)
end = dt.datetime.now()
print(str(end-start))
Через RDD обрабатывается практически мгновенно.
Работа с UDF функциями
В этом способе применим UDF — функцию ко всей колонке нашего DataFrame.
start = dt.datetime.now()
import pyspark.sql.functions as f
@f.pandas_udf(StringType())
def fam(s):
return s.apply(lambda x:x[0:7])
df_spark.select('ID' ,'C_NAME', fam('C_NAME').alias('pyat')).show(3, False, True)
end = dt.datetime.now()
print(str(end-start))
Как можем видеть, данный вариант чуть медленнее предыдущего, но он намного удобнее, так как позволяет использовать все возможности библиотеки Pandas.
Работа с собственными трасформерами
Этот способ самый интересный и для простых преобразований он вряд ли будет использоваться, в основном он предназначен для DS задач.
Сначала импортируем необходимые библиотеки:
from pyspark.ml import Transformer
from pyspark.ml.param.shared import HasInputCol, HasOutputCol
from pyspark.ml.param import Param, Params, TypeConverters
from pyspark import keyword_onl
Далее опишем класс нашего трансформера:
class RegexTransformer(Transformer, HasInputCol, HasOutputCol):
regex = Param(Params._dummy(), "regex",
"Python regular expression to match tokens",
typeConverter=TypeConverters.toString)
@keyword_only
def __init__(self, inputCol=None, outputCol=None, regex=""):
super(RegexTransformer, self).__init__()
if inputCol is not None:
self.setInputCol(inputCol)
if outputCol is not None:
self.setOutputCol(outputCol)
self._set(regex=regex)
def setRegex(self):
self._set(regex=regex)
def _transform(self, dataset):
pattern = re.compile(self.getOrDefault("regex"))
tokenize_udf = F.pandas_udf(partial(tokenize, regex=pattern), returnType=ArrayType(StringType()))
return dataset.withColumn(self.getOutputCol(), tokenize_udf(self.getInputCol()))
Определим наш трансформер:
trasformer = RegexTransformer(inputCol = 'C_NAME', outputCol = 'tokens' , regex="^.{6}" ,)
И применим его к нашему DataFrame:
Transformer.tranform(df)
Результат, будет точно такой же, как и в первых двух вариантах. Стоит отметить, что в различных версиях спарка, синтаксис может значительно отличатся.
Подводя итог, можно сказать, что map в RDD, хоть и самый быстрый способ, он не такой удобный как udf_pandas
функции, где можно использовать все возможности библиотеки pandas. У трансформера практически безграничные возможности, но он самый сложный с точки зрения синтаксиса.