SQL

Обработка больших данных средствами PySpark SQL

Время прочтения: 3 мин.

Наиболее популярными инструментами для сбора и обработки больших данных являются Hadoop MapReduce и Apache Spark. Оба этих инструмента имеют свои плюсы и минусы, однако в данной статье я буду рассматривать Apache Spark, в частности библиотеку для python PySpark.

Любая программа на PySpark начинается с определения конфигурации запускаемой задачи. В PySpark конфигурация задается классом SparkConf. Основными конфигурационными параметрами данного класса являются методы setAppName(‘SparkJobName’) – задает наименование Spark job. И метод setMaster(connection_url) – задает ссылку на подключение к кластеру данных. Данный параметр принимает следующие параметры:

  • URI кластера
  • ‘yarn-client’ — указатель на использование yarn менеджера задач
  • ‘local/local[*]’ – указатель на использование локального компьютера в качестве хранилища данных.

Остальные конфигурационные параметры задаются методом set(‘name_conf_param’,’value’), принимающим на вход 2 параметра:

  • Параметр конфигурирования spark job;
  • Значение этого параметра.
from pyspark import SparkContext, SparkConf, HiveContext
 
conf = SparkConf().setAppName('SparkContextExampleNew')\
    .setMaster("yarn-client")\
    .set('spark.dynamicAllocation.enabled', 'false')\
    .set('spark.ui.showConsoleProgress','true')\

После того, как определили параметры конфигурации, необходимо вызвать метод SparkContext и передать в него параметры SparkConf:

sc = SparkContext.getOrCreate(conf=conf)

Для работы непосредственно с кластером данных удобно использовать не так давно добавленное API  SparkSql, позволяющее более просто, а главное, в большинстве случаев, более оптимально производить различные манипуляции с данными. Для работы с данным API на кластере необходимо инициализировать HiveContext:

sqlc = HiveContext(sc)

Ниже представлены простейшие операции:

  • Create
  • Select
  • Update
  • Truncate
  • Delete
#create
sqlc.sql(f'create table default.test_table (id bigint, name string)').show()

#insert
numbers = [
    1,
    2,
    3,
    4,
    5,
]
for num in numbers:
    sqlc.sql(f'insert into default.test_table (id, name) set ({num}, value_{str(num)})').show()

#select
df = sqlc.sql(f'select * from default.test_table').collect()
df_.coalesce(1).write.csv(f'some_file.csv', sep=';')

#преобразование RDD Dataframe в Pandas Dataframe
df_ = sqlc.sql(f'select * from default.test_table)').toPandas()
df_.head()

#update
sqlc.sql(f'update default.test_table set name = a.name + '_0' from default.test_table a').show()
sqlc.sql(f'select * from default.test_table').show()

#truncate
sqlc.sql(f'truncate table default.test_table').show()
sqlc.sql(f'select * from default.test_table').show()

#delete
sqlc.sql(f'drop table default.test_table').show()

Помимо вышеизложенных операций, pyspark позволяет выгружать данные в различные форматы 2 способами: используя встроенные методы загрузки RDD Dataframe, либо преобразуя RDD Dataframe в Pandas Dataframe методом Pandas:

#select
df = sqlc.sql(f'select * from default.test_table').collect()
df_.coalesce(1).write.csv(f'some_file.csv', sep=';')

#преобразование RDD Dataframe в Pandas Dataframe
df_ = sqlc.sql(f'select * from default.test_table').toPandas()
df_.to_csv(f'some_file.csv', sep=';')

После того, как наша задача полностью отработает, необходимо завершить spark job и освободить ресурсы кластера. В pyspark за это отвечает метод stop() класса SparkContext.

sc.stop()

Также стоит отметить, что если spark job по тем или иным причинам «упал», то его необходимо так же завершить методом stop.

По итогу мы имеем достаточно мощный и быстрый инструмент, позволяющий, не изучая специализированные методологии и среды разработки экосистемы Hadoop, эффективно и просто обрабатывать огромные объемы данных, чем он и привлек меня.

Спасибо за прочтение статьи!

Советуем почитать