Python, SQL, Анализ данных

Сложности при выгрузке 500 ГБ из базы данных и пример их решения

Время прочтения: 4 мин.

Хочу поделиться с вами забавным случаем, который произошел со мной. Была поставлена задача — в кратчайшие сроки выгрузить 500 ГБ информации из базы данных (БД). Но на тот момент места на жестком диске катастрофически не хватало, и не было возможности оперативно очистить или добавить новый. К счастью, в наличии был защищенный файловый информационный ресурс (ФИР). Казалось бы, вот и решение — сохранять данные сразу в файл на ФИР. Но доступной скорости передачи информации на сетевой ресурс оказалось недостаточно, чтобы все выгрузить в полном объеме. У нас так: если SQL-запрос не отработал за сутки, он выпадает в ошибку из-за обрыва сессии. В итоге комичность в том, что на жесткий диск можно выгрузить быстро, но мало, а на ФИР много, но долго.

При учете всего вышеописанного, требовался инструмент, который мог выгружать из базы на жесткий диск определенный объем данных и перекладывать на ФИР без участия человека. Под такое описание подходил Python с библиотеками «cx_Oracle» и «shutil». Решение разделено на два скрипта:

Первый «Выгрузка данных»

Скрипт настроен на порционную выгрузку в отдельные файлы. Фильтрация в SQL-запросе подобрана с учетом оставшегося места на жестком диске. Чтобы исключить переполнение, учтена скорость скрипта на перенос файлов.

За основу взят пример кода из предыдущей статьи. Ссылки на разные сайты:

ссылка 1

ссылка 2

Без изменений:

  • Используемые библиотеки;
  • Проверка версии;
  • Дескриптор соединения с БД;
  • Указание логин/пароль;
  • Функция, в которой создается экземпляр класса connect;
  • Подключение к серверу.

Внесенные изменения:

Функция, в которой создается курсор, выполняется запрос в БД и сохраняются данные в файл:

def dfFromOracle(connection, sql, file):
            us=0
            outDF=pd.DataFrame() 
            success = 'False'
            with connection.cursor() as cursor1:
                cursor1.execute(sql) #Отправка SQL-запроса в базу данных
                trn=10
                while success == 'False' and trn>0:
                    try:
                        #Сохраняем колонки
                        outheader=[desc[0] for desc in cursor1.description]
                        header = ';'.join(_header)+'\n'
                        myfile=open(file, 'w',encoding='UTF-8')
                        myfile.writelines(header)
                        myfile.close()
                        #Цикл для выгрузки определенного количества строк
                        While try:
                            frame = cursor1.fetchmany(1000000) # Возвращаем определенное количество строк из результата запроса
                                if not frame: #Проверяем наличие данных в массиве 
                                    break
                            outDF = pd.DataFrame(frame) #Формируем ДФ из массива
                            outDF.to_csv(file, sep=';',encoding='UTF-8',mode='a',header=None,index=False) #Сохраняем сформированный ДФ
                            success = 'True'
                            us = 1
                            frame = '' #Очищаем массив
                    except:
                        trn=trn-1
                        print('Error')
                        time.sleep(60)
            return us

Создаем массивы:

collection_id = ('31184','25597','***','68823') #Уникальное значение присвоенное массиву в БД
year = ('2020','2019') #Год
month = ('12','11','10','09','08','07','06','05','04','03','02','01') #Месяц
npp = 0 #Номер по порядку выгруженных файлов для корректной сортировки
new_file = '_op_.csv' #Неизменная часть имени файла

Цикл для формирования уникальных имен файлов, создания SQL-запросов и вызова функций:

print(str(time.ctime()) + ' : Старт') #Время старта
for row_id in collection_id:
    for row_y in year:
        for row_m in month:
            npp += 1 #Счетчик выгруженных файлов
            #Собираем имя файла
            file = str(npp).rjust(5,'0') +'_'+ row_y +'_'+ row_m +'_'+ row_id + new_file
            #Собираем SQL-запрос
            sql = ("""select * from tabl where
                        to_char(c_date, 'YYYY') = '""" + row_y + """'
                        and to_char(c_date, 'MM') = '""" + row_m1 + """'
                        and coll_id = '""" + row_tb + """'""")
            #Вызов функций
            with getConn(odsLogin, odsConnectStr) as con1:
                us = dfFromOracle(con1, sql, file)
print(str(time.ctime()) + ' : Финиш') #Время финиша
chek = open('Выгрузка_завершена_V.txt', 'a',encoding='UTF-8') #Создаем файл
chek.close()

Второй «Перемещение выгруженных файлов»

Скрипт обеспечит постоянный контроль количества сохраняемых файлов. При превышении установленного лимита определит ранее выгруженные, скопирует в указанную папку и удалит из старой директории. Завершит свою работу после того как скрипт на выгрузку создаст файл «Выгрузка_завершена_V.txt».

Используем библиотеки:

import shutil
import os
import time

Создаем массивы:

path_file = r'\Users\ *** \ '[:-1] #Путь к выгруженным файлам на жестком диске
save_file = r'\\ *** \ '[:-1] #Путь на ФИР к папке переноса
chek = '' #Для проверки завершения цикла

Цикл мониторинга выгруженных файлов:

print(str(time.ctime()) + ' : Старт') #Время старта
#Цикл поиска файлов
while chek == '':
    count = 1
    #Собираем в папке имена выгруженных файлов, которые необходимо перенести
    list_file = []
    for file in os.listdir(path_file):
        if file.endswith(('op_.csv')): #Файл оканчивается на «op_.csv»
            list_file.append(file)
    list_file.sort()    
    #Если файлов больше двух, переходим к циклу переноса двух файлов
    if len(list_file) > 2:
        for row_file in list_file:
            if count < 3:
                #Копируем файл, затем его удаляем из старой директории
                filename = row_file #Имя файла
                filedir = path_file #Его текущая директория
                move_to = save_file #Куда надо перенести
                shutil.move(os.path.join(filedir, filename), os.path.join(move_to, filename))
                count += 1
                list_file = []                
    #Проверяем завершилась выгрузка из БД, переносим оставшиеся файлы и завершаем работу скрипта
    chek_file = []
    for file in os.listdir(path_file):
        if file.endswith(('_V.txt')): #Файл оканчивается на «_V.txt»
            chek_file.append(file)
    if len(chek_file) > 0:
        chek = 'V'
        time.sleep(15)
        for row_file in list_file:
            #Копирует файл, затем его удаляет из старой директории
            filename = row_file 
            filedir = path_file 
            move_to = save_file 
            shutil.move(os.path.join(filedir, filename), os.path.join(move_to, filename))
            count += 1
print(str(time.ctime()) + ' : Финиш') #Время финиша

В итоге, за два неполных дня автоматически выгружено и перенесено более 500 ГБ, а это 168 файлов в среднем по 3 ГБ (без учета одного обрыва сессии). Применен метод fetchmany() с возможностью установки приемлемого расхода памяти для комфортной работы с другими задачам без зависания системы и ошибок «memory error».

Советуем почитать