Время прочтения: 3 мин.

При работе с распределенными базами данных чаще всего используют Spark и его собственные DataFrame.

В данном посте разберём различные способы создания столбцов путем преобразования, вычислений, применения регулярных выражений и т.д. Также мы сравним длительность каждого способа и какой лучше где применить.

Итак, будем работать с одним DataFrame и двумя столбцами (ID и ФИО, для наших целей их вполне достаточно), прочитаем его с hdfs:

df_spark = spark.read\
    .format('csv')\
    .option('sep' , '~')\
    .option('header' , 'true')\
    .load("hdfs://arnsdpsbx/user/team/team_sva_oarb/sib/Sizov_AA/EPK/12345.csv")

df_spark.select('ID' , 'C_NAME').show(3, False, True)

Для примера возьмем срез для ФИО, первые 7 символов. Рассмотрим несколько вариантов решения данной задачи:

  • RDD
  • UDF
  • Трансформер

Работа с RDD

RDD – простая неизменяемая распределённая коллекция объектов. Она делится на множество частей, обрабатывающихся на разных частях кластера. Это объект более низкого уровня и к нему можно применить функцию MAP.

Преобразуем DataFrame в RDD:

start = dt.datetime.now()
rdd = df_spark.rdd.map(lambda x: (x[0], x[1] , x[1][0:7]))
df = spark.createDataFrame(rdd)
df.show(2)
end = dt.datetime.now()
print(str(end-start))

Через RDD обрабатывается практически мгновенно.

Работа с UDF функциями

В этом способе  применим UDF — функцию ко всей колонке нашего DataFrame.

start = dt.datetime.now()
import pyspark.sql.functions  as f
@f.pandas_udf(StringType())


def fam(s):
    return s.apply(lambda x:x[0:7])


df_spark.select('ID' ,'C_NAME', fam('C_NAME').alias('pyat')).show(3, False, True)
end = dt.datetime.now()
print(str(end-start))

Как можем видеть, данный вариант чуть медленнее предыдущего, но он намного удобнее, так как позволяет использовать все возможности библиотеки Pandas.

Работа с собственными трасформерами

Этот способ самый интересный и для простых преобразований он вряд ли будет использоваться, в основном он предназначен для DS задач.

Сначала импортируем необходимые библиотеки:

from pyspark.ml import Transformer
from pyspark.ml.param.shared import HasInputCol, HasOutputCol
from pyspark.ml.param import Param, Params, TypeConverters
from pyspark import keyword_onl

Далее опишем класс нашего трансформера:

class RegexTransformer(Transformer, HasInputCol, HasOutputCol):
        regex = Param(Params._dummy(), "regex",
                  "Python regular expression to match tokens",
                  typeConverter=TypeConverters.toString)
     @keyword_only
    def __init__(self, inputCol=None, outputCol=None, regex=""):
        super(RegexTransformer, self).__init__()
        if inputCol is not None:
            self.setInputCol(inputCol)
        if outputCol is not None:
            self.setOutputCol(outputCol)
        self._set(regex=regex)           
    def setRegex(self):
        self._set(regex=regex)           
    def _transform(self, dataset):
        pattern = re.compile(self.getOrDefault("regex"))
        tokenize_udf = F.pandas_udf(partial(tokenize, regex=pattern), returnType=ArrayType(StringType()))
        return dataset.withColumn(self.getOutputCol(), tokenize_udf(self.getInputCol()))

Определим наш трансформер:

trasformer = RegexTransformer(inputCol = 'C_NAME', outputCol = 'tokens' , regex="^.{6}" ,)

И применим его к нашему DataFrame:

Transformer.tranform(df)

Результат, будет точно такой же, как и в первых двух вариантах. Стоит отметить, что в различных версиях спарка, синтаксис может значительно отличатся.

Подводя итог, можно сказать, что map в RDD, хоть и самый быстрый способ, он не такой удобный как udf_pandas функции, где можно использовать все возможности библиотеки pandas. У трансформера практически безграничные возможности, но он самый сложный с точки зрения синтаксиса.