Время прочтения: 4 мин.

Исходными данными являлись 10 тыс. json-файлов, в каждом из которых было около 3 тыс. конечных узлов. При последовательной загрузке всех файлов и преобразовании в pandas при помощи json_normalize время выполнения составило 10 минут, что довольно долго. В связи с тем, что данную операцию необходимо было выполнить несколько раз на различных наборах данных, было принято решение ускорить процесс преобразования за счет реализации механизма распараллеливания и реализовать его как полноценный настраиваемый скрипт.

Для создания параллельных вычислений важны два условия: разделимость и независимость данных. Каждый из json-файлов представляет одну строку результирующей таблицы, что позволяет спокойно разделить их на партиции. Также для преобразования одного файла не нужен какой-либо другой, что удовлетворяет условию независимости данных. Условия соблюдены, можно приступать к коду.

Импорт библиотек:

from tqdm import tqdm
from pathlib import Path
import json
from multiprocessing import Pool, RLock
import pandas as pd
import pickle
import argparse

Весь алгоритм программы разделен на следующие блоки:

  1. Парсинг аргументов
  2. Получение всех путей до файлов и разделение их на n групп (Для n подпроцессов)
  3. Запуск процессов, выполняющих выгрузку и преобразование в pandas
  4. Объединение результатов выполнения подпроцессов в единый фрейм и сохранение

1.     Парсинг аргументов

Для того, чтобы скрипт можно было удобно переиспользовать с другими настройками, необходимо определить входные параметры скрипта с помощью библиотеки argparse. Отделить инициализацию парсера аргументов от основной логики скрипта, описав ее в функции get_arg_pareser:

def get_arg_pareser():

    parser = argparse.ArgumentParser(description=
            'From json files creates pd.DataFrame')
    parser.add_argument('-i', '--input-folder', type=str,
                        help='input data folder', required=True)
    parser.add_argument('-o', '--output-file', type=str, default=r'output.pickle', 
                        help='output.pickle')
    parser.add_argument('-e', '--n-executors', type=int, default=8, 
                        help='number of subprocesses (default: 8)')
    return parser

Вызвать его в блоке __main__:

    args = get_arg_pareser().parse_args()

    N_GROUPS = args.n_executors
    jsons_folder_path = args.input_folder
    output_file = args.output_file

2. Пути до файлов

При помощи библиотеки Pathlib можно получить все пути до входных файлов:

f_paths = list(Path(jsons_folder_path).glob('*.json'))

Теперь разделить данные на n частей, пронумеровав каждую партицию (Для красивого отслеживания выполнения процесса)

    in_group = len(f_paths) // N_GROUPS + 1
    inp_args = [f_paths[i:i + in_group] for i in range(0, len(f_paths), in_group)]
    inp_args = list(enumerate(inp_args))

3. Распараллеливание

В любой задаче, в которой необходимо создать распараллеливание, необходимо определить логику выполнения подпроцесса в отдельном блоке (функции). Создать функцию one_process_execution, определяющую обработку одной партиции данных.

Функция в рамках одного процесса выгружает все json файлы партиции, сохраняя два массива: названия файлов (indexes) и сами данные (res_array). После выгрузки всех данных, при помощи функции pd.json_normalize, преобразую список словарей в таблицу и выставлю имена файлов как индекс.

Важным замечанием является то, что функцию json_normalize следует выполнять именно на массиве словарей внутри подпроцессов, а не на каждом отдельном файле. Если же поставить преобразование в pandas каждого файла отдельно и итерировано добавлять в pd.DataFrame по строчке, то это замедлит выполнение в 3 раза. Главное правило преобразования чего-либо в pandas, делать саму трансформацию как можно позднее.

def one_process_execution(pid, f_paths):

    res_arr = []
    indexes = []

    tqdm_text = '#' + f'{pid}'.zfill(3)
    with tqdm(total=len(f_paths), position=pid+1, desc=tqdm_text) as pbar:
        for path in f_paths:
            with open(str(path), 'r') as f:
                d = json.load(f)

                indexes.append(path.stem)
                res_arr.append(d)

                pbar.update(1)
    df = pd.json_normalize(res_arr).assign(index=indexes).set_index('index')
    print(f'Subproc {pid} done')
    return df

Теперь, когда у меня есть разделенные данные и есть описание функции подпроцесса – время приступать к созданию пула (контейнера) процессов. Для реализации параллельных вычислений в Python используется библиотека multiprocessing. При помощи класса Pool инициализирую контейнер задач, указав необходимое число подпроцессов и дополнительные параметры для работы отображения статуса выполнения. Далее необходимо заполнить этот пул описанной выше функции one_process_execution с входными данными, сформированными на шаге 1. Добавление задач происходит с помощью ключевого слова apply_async, определяющего поведение выполнения наших процессов.

    pool = Pool(processes=N_GROUPS, initializer=tqdm.set_lock, initargs=(RLock(),))
    jobs = [pool.apply_async(one_process_execution, args=x) for x in inp_args]

Следует отметить, что метод apply_async не запускает сам процесс, а лишь добавляет его в список задач. Таким образом, в переменной jobs хранятся определения подпроцессов, готовых к началу выполнения.

Для запуска вычислений надо у каждого элемента массива вызвать метод get, который после выполнения вернет результат нашей функции, по мере выполнения. Вызов этого метода для каждого элемента массива задач (Рис. 1):

    # run pool
    df_lists = list(map(lambda x: x.get(), jobs))
Рисунок 1 – Статус выполнения программы

4. Объединение и сохранение

Теперь дело за малым — осталось объединить все датафреймы в один единый и сохранить:

    res_df = pd.concat(df_lists)

    # save
    with open(output_file, 'wb') as f:
        pickle.dump(res_df, f)

Заключение

Таким образом, был создан скрипт, позволяющий преобразовывать большое количество json файлов в единый датафрейм, используя технологии распараллеливания вычислений. Данная реализация преобразовала те же 10 тысяч файлов в единый датафрейм за 2 минуты, тем самым ускорив процесс в 5 раз.

Полноценный скрипт можно посмотреть на моей личной странице github. Он был реализован таким образом, что может быть запущен в двух режимах:

  1. Как скрипт, сохраняющий результат как pickle файл
  2. Как подключаемый модуль, функция main которого возвращает сгенерированный pd.DataFrame.