Параллельная обработка и преобразование JSON-фалов в Pandas

Исходными данными являлись 10 тыс. json-файлов, в каждом из которых было около 3 тыс. конечных узлов. При последовательной загрузке всех файлов и преобразовании в pandas при помощи json_normalize время выполнения составило 10 минут, что довольно долго.

В связи с тем, что данную операцию необходимо было выполнить несколько раз на различных наборах данных, было принято решение ускорить процесс преобразования за счет реализации механизма распараллеливания и реализовать его как полноценный настраиваемый скрипт.

Для создания параллельных вычислений важны два условия: разделимость и независимость данных. Каждый из json-файлов представляет одну строку результирующей таблицы, что позволяет спокойно разделить их на партиции. Также для преобразования одного файла не нужен какой-либо другой, что удовлетворяет условию независимости данных. Условия соблюдены, можно приступать к коду.

Импорт библиотек:

from tqdm import tqdm from pathlib import Path import json from multiprocessing import Pool, RLock import pandas as pd import pickle import argparse

Весь алгоритм программы разделен на следующие блоки:

  • Парсинг аргументов
  • Получение всех путей до файлов и разделение их на n групп (Для n подпроцессов)
  • Запуск процессов, выполняющих выгрузку и преобразование в pandas
  • Объединение результатов выполнения подпроцессов в единый фрейм и сохранение

1. Парсинг аргументов

Для того, чтобы скрипт можно было удобно переиспользовать с другими настройками, необходимо определить входные параметры скрипта с помощью библиотеки argparse. Отделить инициализацию парсера аргументов от основной логики скрипта, описав ее в функции get_arg_pareser:

def get_arg_pareser(): parser = argparse.ArgumentParser(description= 'From json files creates pd.DataFrame') parser.add_argument('-i', '--input-folder', type=str, help='input data folder', required=True) parser.add_argument('-o', '--output-file', type=str, default=r'output.pickle', help='output.pickle') parser.add_argument('-e', '--n-executors', type=int, default=8, help='number of subprocesses (default: 8)') return parser

Вызвать его в блоке __main__:

args = get_arg_pareser().parse_args() N_GROUPS = args.n_executors jsons_folder_path = args.input_folder output_file = args.output_file

2. Пути до файлов

При помощи библиотеки Pathlib можно получить все пути до входных файлов:

f_paths = list(Path(jsons_folder_path).glob('*.json'))

Теперь разделить данные на n частей, пронумеровав каждую партицию (Для красивого отслеживания выполнения процесса)

in_group = len(f_paths) // N_GROUPS + 1 inp_args = [f_paths[i:i + in_group] for i in range(0, len(f_paths), in_group)] inp_args = list(enumerate(inp_args))

3. Распараллеливание

В любой задаче, в которой необходимо создать распараллеливание, необходимо определить логику выполнения подпроцесса в отдельном блоке (функции). Создать функцию one_process_execution, определяющую обработку одной партиции данных.

Функция в рамках одного процесса выгружает все json файлы партиции, сохраняя два массива: названия файлов (indexes) и сами данные (res_array). После выгрузки всех данных, при помощи функции pd.json_normalize, преобразую список словарей в таблицу и выставлю имена файлов как индекс.

Важным замечанием является то, что функцию json_normalize следует выполнять именно на массиве словарей внутри подпроцессов, а не на каждом отдельном файле. Если же поставить преобразование в pandas каждого файла отдельно и итерировано добавлять в pd.DataFrame по строчке, то это замедлит выполнение в 3 раза. Главное правило преобразования чего-либо в pandas, делать саму трансформацию как можно позднее.

def one_process_execution(pid, f_paths): res_arr = [] indexes = [] tqdm_text = '#' + f'{pid}'.zfill(3) with tqdm(total=len(f_paths), position=pid+1, desc=tqdm_text) as pbar: for path in f_paths: with open(str(path), 'r') as f: d = json.load(f) indexes.append(path.stem) res_arr.append(d) pbar.update(1) df = pd.json_normalize(res_arr).assign(index=indexes).set_index('index') print(f'Subproc {pid} done') return df

Теперь, когда у меня есть разделенные данные и есть описание функции подпроцесса – время приступать к созданию пула (контейнера) процессов. Для реализации параллельных вычислений в Python используется библиотека multiprocessing. При помощи класса Pool инициализирую контейнер задач, указав необходимое число подпроцессов и дополнительные параметры для работы отображения статуса выполнения. Далее необходимо заполнить этот пул описанной выше функции one_process_execution с входными данными, сформированными на шаге 1. Добавление задач происходит с помощью ключевого слова apply_async, определяющего поведение выполнения наших процессов.

pool = Pool(processes=N_GROUPS, initializer=tqdm.set_lock, initargs=(RLock(),)) jobs = [pool.apply_async(one_process_execution, args=x) for x in inp_args]

Следует отметить, что метод apply_async не запускает сам процесс, а лишь добавляет его в список задач. Таким образом, в переменной jobs хранятся определения подпроцессов, готовых к началу выполнения.

Для запуска вычислений надо у каждого элемента массива вызвать метод get, который после выполнения вернет результат нашей функции, по мере выполнения. Вызов этого метода для каждого элемента массива задач (Рис. 1):

# run pool df_lists = list(map(lambda x: x.get(), jobs))
Рисунок 1 - Статус выполнения программы
Рисунок 1 - Статус выполнения программы

4. Объединение и сохранение

Теперь дело за малым — осталось объединить все датафреймы в один единый и сохранить:

res_df = pd.concat(df_lists) # save with open(output_file, 'wb') as f: pickle.dump(res_df, f)

Заключение

Таким образом, был создан скрипт, позволяющий преобразовывать большое количество json файлов в единый датафрейм, используя технологии распараллеливания вычислений. Данная реализация преобразовала те же 10 тысяч файлов в единый датафрейм за 2 минуты, тем самым ускорив процесс в 5 раз.

Полноценный скрипт можно посмотреть на моей личной странице github. Он был реализован таким образом, что может быть запущен в двух режимах:

  1. Как скрипт, сохраняющий результат как pickle файл
  2. Как подключаемый модуль, функция main которого возвращает сгенерированный pd.DataFrame.
5
4 комментария