Параллельная обработка и преобразование JSON-фалов в Pandas
Исходными данными являлись 10 тыс. json-файлов, в каждом из которых было около 3 тыс. конечных узлов. При последовательной загрузке всех файлов и преобразовании в pandas при помощи json_normalize время выполнения составило 10 минут, что довольно долго.
В связи с тем, что данную операцию необходимо было выполнить несколько раз на различных наборах данных, было принято решение ускорить процесс преобразования за счет реализации механизма распараллеливания и реализовать его как полноценный настраиваемый скрипт.
Для создания параллельных вычислений важны два условия: разделимость и независимость данных. Каждый из json-файлов представляет одну строку результирующей таблицы, что позволяет спокойно разделить их на партиции. Также для преобразования одного файла не нужен какой-либо другой, что удовлетворяет условию независимости данных. Условия соблюдены, можно приступать к коду.
Импорт библиотек:
Весь алгоритм программы разделен на следующие блоки:
- Парсинг аргументов
- Получение всех путей до файлов и разделение их на n групп (Для n подпроцессов)
- Запуск процессов, выполняющих выгрузку и преобразование в pandas
- Объединение результатов выполнения подпроцессов в единый фрейм и сохранение
1. Парсинг аргументов
Для того, чтобы скрипт можно было удобно переиспользовать с другими настройками, необходимо определить входные параметры скрипта с помощью библиотеки argparse. Отделить инициализацию парсера аргументов от основной логики скрипта, описав ее в функции get_arg_pareser:
Вызвать его в блоке __main__:
2. Пути до файлов
При помощи библиотеки Pathlib можно получить все пути до входных файлов:
Теперь разделить данные на n частей, пронумеровав каждую партицию (Для красивого отслеживания выполнения процесса)
3. Распараллеливание
В любой задаче, в которой необходимо создать распараллеливание, необходимо определить логику выполнения подпроцесса в отдельном блоке (функции). Создать функцию one_process_execution, определяющую обработку одной партиции данных.
Функция в рамках одного процесса выгружает все json файлы партиции, сохраняя два массива: названия файлов (indexes) и сами данные (res_array). После выгрузки всех данных, при помощи функции pd.json_normalize, преобразую список словарей в таблицу и выставлю имена файлов как индекс.
Важным замечанием является то, что функцию json_normalize следует выполнять именно на массиве словарей внутри подпроцессов, а не на каждом отдельном файле. Если же поставить преобразование в pandas каждого файла отдельно и итерировано добавлять в pd.DataFrame по строчке, то это замедлит выполнение в 3 раза. Главное правило преобразования чего-либо в pandas, делать саму трансформацию как можно позднее.
Теперь, когда у меня есть разделенные данные и есть описание функции подпроцесса – время приступать к созданию пула (контейнера) процессов. Для реализации параллельных вычислений в Python используется библиотека multiprocessing. При помощи класса Pool инициализирую контейнер задач, указав необходимое число подпроцессов и дополнительные параметры для работы отображения статуса выполнения. Далее необходимо заполнить этот пул описанной выше функции one_process_execution с входными данными, сформированными на шаге 1. Добавление задач происходит с помощью ключевого слова apply_async, определяющего поведение выполнения наших процессов.
Следует отметить, что метод apply_async не запускает сам процесс, а лишь добавляет его в список задач. Таким образом, в переменной jobs хранятся определения подпроцессов, готовых к началу выполнения.
Для запуска вычислений надо у каждого элемента массива вызвать метод get, который после выполнения вернет результат нашей функции, по мере выполнения. Вызов этого метода для каждого элемента массива задач (Рис. 1):
4. Объединение и сохранение
Теперь дело за малым — осталось объединить все датафреймы в один единый и сохранить:
Заключение
Таким образом, был создан скрипт, позволяющий преобразовывать большое количество json файлов в единый датафрейм, используя технологии распараллеливания вычислений. Данная реализация преобразовала те же 10 тысяч файлов в единый датафрейм за 2 минуты, тем самым ускорив процесс в 5 раз.
Полноценный скрипт можно посмотреть на моей личной странице github. Он был реализован таким образом, что может быть запущен в двух режимах:
- Как скрипт, сохраняющий результат как pickle файл
- Как подключаемый модуль, функция main которого возвращает сгенерированный pd.DataFrame.