Сложности при выгрузке 500 гб из базы данных и пример их решения

Хочу поделиться с вами забавным случаем, который произошел со мной. Была поставлена задача — в кратчайшие сроки выгрузить 500 ГБ информации из базы данных (БД). Но на тот момент места на жестком диске катастрофически не хватало, и не было возможности оперативно очистить или добавить новый. К счастью, в наличии был защищенный файловый информационный ресурс (ФИР). Казалось бы, вот и решение — сохранять данные сразу в файл на ФИР. Но доступной скорости передачи информации на сетевой ресурс оказалось недостаточно, чтобы все выгрузить в полном объеме. У нас так: если SQL-запрос не отработал за сутки, он выпадает в ошибку из-за обрыва сессии. В итоге комичность в том, что на жесткий диск можно выгрузить быстро, но мало, а на ФИР много, но долго.

При учете всего вышеописанного, требовался инструмент, который мог выгружать из базы на жесткий диск определенный объем данных и перекладывать на ФИР без участия человека. Под такое описание подходил Python с библиотеками «cx_Oracle» и «shutil». Решение разделено на два скрипта:

Первый «Выгрузка данных»

Скрипт настроен на порционную выгрузку в отдельные файлы. Фильтрация в SQL-запросе подобрана с учетом оставшегося места на жестком диске. Чтобы исключить переполнение, учтена скорость скрипта на перенос файлов.

За основу взят пример кода из предыдущей статьи. Ссылки на разные сайты:

Без изменений:

  1. Используемые библиотеки
  2. Проверка версии
  3. Дескриптор соединения с БД
  4. Указание логин/пароль
  5. Функция, в которой создается экземпляр класса connect
  6. Подключение к серверу

Внесенные изменения:

Функция, в которой создается курсор, выполняется запрос в БД и сохраняются данные в файл:

def dfFromOracle(connection, sql, file): us=0 outDF=pd.DataFrame() success = 'False' with connection.cursor() as cursor1: cursor1.execute(sql) #Отправка SQL-запроса в базу данных trn=10 while success == 'False' and trn>0: try: #Сохраняем колонки outheader=[desc[0] for desc in cursor1.description] header = ';'.join(_header)+'\n' myfile=open(file, 'w',encoding='UTF-8') myfile.writelines(header) myfile.close() #Цикл для выгрузки определенного количества строк While try: frame = cursor1.fetchmany(1000000) # Возвращаем определенное количество строк из результата запроса if not frame: #Проверяем наличие данных в массиве break outDF = pd.DataFrame(frame) #Формируем ДФ из массива outDF.to_csv(file, sep=';',encoding='UTF-8',mode='a',header=None,index=False) #Сохраняем сформированный ДФ success = 'True' us = 1 frame = '' #Очищаем массив except: trn=trn-1 print('Error') time.sleep(60) return us

Создаем массивы:

collection_id = ('31184','25597','***','68823') #Уникальное значение присвоенное массиву в БД year = ('2020','2019') #Год month = ('12','11','10','09','08','07','06','05','04','03','02','01') #Месяц npp = 0 #Номер по порядку выгруженных файлов для корректной сортировки new_file = '_op_.csv' #Неизменная часть имени файла

Цикл для формирования уникальных имен файлов, создания SQL-запросов и вызова функций:

print(str(time.ctime()) + ' : Старт') #Время старта for row_id in collection_id: for row_y in year: for row_m in month: npp += 1 #Счетчик выгруженных файлов #Собираем имя файла file = str(npp).rjust(5,'0') +'_'+ row_y +'_'+ row_m +'_'+ row_id + new_file #Собираем SQL-запрос sql = ("""select * from tabl where to_char(c_date, 'YYYY') = '""" + row_y + """' and to_char(c_date, 'MM') = '""" + row_m1 + """' and coll_id = '""" + row_tb + """'""") #Вызов функций with getConn(odsLogin, odsConnectStr) as con1: us = dfFromOracle(con1, sql, file) print(str(time.ctime()) + ' : Финиш') #Время финиша chek = open('Выгрузка_завершена_V.txt', 'a',encoding='UTF-8') #Создаем файл chek.close()

Второй «Перемещение выгруженных файлов»

Скрипт обеспечит постоянный контроль количества сохраняемых файлов. При превышении установленного лимита определит ранее выгруженные, скопирует в указанную папку и удалит из старой директории. Завершит свою работу после того как скрипт на выгрузку создаст файл «Выгрузка_завершена_V.txt».

Используем библиотеки:

import shutil import os import time

Создаем массивы:

path_file = r'\Users\ *** \ '[:-1] #Путь к выгруженным файлам на жестком диске save_file = r'\\ *** \ '[:-1] #Путь на ФИР к папке переноса chek = '' #Для проверки завершения цикла

Цикл мониторинга выгруженных файлов:

print(str(time.ctime()) + ' : Старт') #Время старта #Цикл поиска файлов while chek == '': count = 1 #Собираем в папке имена выгруженных файлов, которые необходимо перенести list_file = [] for file in os.listdir(path_file): if file.endswith(('op_.csv')): #Файл оканчивается на «op_.csv» list_file.append(file) list_file.sort() #Если файлов больше двух, переходим к циклу переноса двух файлов if len(list_file) > 2: for row_file in list_file: if count < 3: #Копируем файл, затем его удаляем из старой директории filename = row_file #Имя файла filedir = path_file #Его текущая директория move_to = save_file #Куда надо перенести shutil.move(os.path.join(filedir, filename), os.path.join(move_to, filename)) count += 1 list_file = [] #Проверяем завершилась выгрузка из БД, переносим оставшиеся файлы и завершаем работу скрипта chek_file = [] for file in os.listdir(path_file): if file.endswith(('_V.txt')): #Файл оканчивается на «_V.txt» chek_file.append(file) if len(chek_file) > 0: chek = 'V' time.sleep(15) for row_file in list_file: #Копирует файл, затем его удаляет из старой директории filename = row_file filedir = path_file move_to = save_file shutil.move(os.path.join(filedir, filename), os.path.join(move_to, filename)) count += 1 print(str(time.ctime()) + ' : Финиш') #Время финиша

В итоге, за два неполных дня автоматически выгружено и перенесено более 500 ГБ, а это 168 файлов в среднем по 3 ГБ (без учета одного обрыва сессии). Применен метод fetchmany() с возможностью установки приемлемого расхода памяти для комфортной работы с другими задачам без зависания системы и ошибок «memory error».

2 комментария

А чем не устраивал штатный sqlplus? В 12.2 есть корректная выгрузка csv.
После выгрузки таблицы запаковываете ее gz, 40-50 гб займет

1
Ответить
Автор

Для штатной выгрузки на жестком диске не хватало места, поэтому выгружали частями и переносили на ФИР.
Свободного места было около 20 ГБ или чуть больше.

Ответить