Время прочтения: 4 мин.
Хочу поделиться с вами забавным случаем, который произошел со мной. Была поставлена задача — в кратчайшие сроки выгрузить 500 ГБ информации из базы данных (БД). Но на тот момент места на жестком диске катастрофически не хватало, и не было возможности оперативно очистить или добавить новый. К счастью, в наличии был защищенный файловый информационный ресурс (ФИР). Казалось бы, вот и решение — сохранять данные сразу в файл на ФИР. Но доступной скорости передачи информации на сетевой ресурс оказалось недостаточно, чтобы все выгрузить в полном объеме. У нас так: если SQL-запрос не отработал за сутки, он выпадает в ошибку из-за обрыва сессии. В итоге комичность в том, что на жесткий диск можно выгрузить быстро, но мало, а на ФИР много, но долго.
При учете всего вышеописанного, требовался инструмент, который мог выгружать из базы на жесткий диск определенный объем данных и перекладывать на ФИР без участия человека. Под такое описание подходил Python с библиотеками «cx_Oracle» и «shutil». Решение разделено на два скрипта:
Первый «Выгрузка данных»
Скрипт настроен на порционную выгрузку в отдельные файлы. Фильтрация в SQL-запросе подобрана с учетом оставшегося места на жестком диске. Чтобы исключить переполнение, учтена скорость скрипта на перенос файлов.
За основу взят пример кода из предыдущей статьи. Ссылки на разные сайты:
Без изменений:
- Используемые библиотеки;
- Проверка версии;
- Дескриптор соединения с БД;
- Указание логин/пароль;
- Функция, в которой создается экземпляр класса connect;
- Подключение к серверу.
Внесенные изменения:
Функция, в которой создается курсор, выполняется запрос в БД и сохраняются данные в файл:
def dfFromOracle(connection, sql, file):
us=0
outDF=pd.DataFrame()
success = 'False'
with connection.cursor() as cursor1:
cursor1.execute(sql) #Отправка SQL-запроса в базу данных
trn=10
while success == 'False' and trn>0:
try:
#Сохраняем колонки
outheader=[desc[0] for desc in cursor1.description]
header = ';'.join(_header)+'\n'
myfile=open(file, 'w',encoding='UTF-8')
myfile.writelines(header)
myfile.close()
#Цикл для выгрузки определенного количества строк
While try:
frame = cursor1.fetchmany(1000000) # Возвращаем определенное количество строк из результата запроса
if not frame: #Проверяем наличие данных в массиве
break
outDF = pd.DataFrame(frame) #Формируем ДФ из массива
outDF.to_csv(file, sep=';',encoding='UTF-8',mode='a',header=None,index=False) #Сохраняем сформированный ДФ
success = 'True'
us = 1
frame = '' #Очищаем массив
except:
trn=trn-1
print('Error')
time.sleep(60)
return us
Создаем массивы:
collection_id = ('31184','25597','***','68823') #Уникальное значение присвоенное массиву в БД
year = ('2020','2019') #Год
month = ('12','11','10','09','08','07','06','05','04','03','02','01') #Месяц
npp = 0 #Номер по порядку выгруженных файлов для корректной сортировки
new_file = '_op_.csv' #Неизменная часть имени файла
Цикл для формирования уникальных имен файлов, создания SQL-запросов и вызова функций:
print(str(time.ctime()) + ' : Старт') #Время старта
for row_id in collection_id:
for row_y in year:
for row_m in month:
npp += 1 #Счетчик выгруженных файлов
#Собираем имя файла
file = str(npp).rjust(5,'0') +'_'+ row_y +'_'+ row_m +'_'+ row_id + new_file
#Собираем SQL-запрос
sql = ("""select * from tabl where
to_char(c_date, 'YYYY') = '""" + row_y + """'
and to_char(c_date, 'MM') = '""" + row_m1 + """'
and coll_id = '""" + row_tb + """'""")
#Вызов функций
with getConn(odsLogin, odsConnectStr) as con1:
us = dfFromOracle(con1, sql, file)
print(str(time.ctime()) + ' : Финиш') #Время финиша
chek = open('Выгрузка_завершена_V.txt', 'a',encoding='UTF-8') #Создаем файл
chek.close()
Второй «Перемещение выгруженных файлов»
Скрипт обеспечит постоянный контроль количества сохраняемых файлов. При превышении установленного лимита определит ранее выгруженные, скопирует в указанную папку и удалит из старой директории. Завершит свою работу после того как скрипт на выгрузку создаст файл «Выгрузка_завершена_V.txt».
Используем библиотеки:
import shutil
import os
import time
Создаем массивы:
path_file = r'\Users\ *** \ '[:-1] #Путь к выгруженным файлам на жестком диске
save_file = r'\\ *** \ '[:-1] #Путь на ФИР к папке переноса
chek = '' #Для проверки завершения цикла
Цикл мониторинга выгруженных файлов:
print(str(time.ctime()) + ' : Старт') #Время старта
#Цикл поиска файлов
while chek == '':
count = 1
#Собираем в папке имена выгруженных файлов, которые необходимо перенести
list_file = []
for file in os.listdir(path_file):
if file.endswith(('op_.csv')): #Файл оканчивается на «op_.csv»
list_file.append(file)
list_file.sort()
#Если файлов больше двух, переходим к циклу переноса двух файлов
if len(list_file) > 2:
for row_file in list_file:
if count < 3:
#Копируем файл, затем его удаляем из старой директории
filename = row_file #Имя файла
filedir = path_file #Его текущая директория
move_to = save_file #Куда надо перенести
shutil.move(os.path.join(filedir, filename), os.path.join(move_to, filename))
count += 1
list_file = []
#Проверяем завершилась выгрузка из БД, переносим оставшиеся файлы и завершаем работу скрипта
chek_file = []
for file in os.listdir(path_file):
if file.endswith(('_V.txt')): #Файл оканчивается на «_V.txt»
chek_file.append(file)
if len(chek_file) > 0:
chek = 'V'
time.sleep(15)
for row_file in list_file:
#Копирует файл, затем его удаляет из старой директории
filename = row_file
filedir = path_file
move_to = save_file
shutil.move(os.path.join(filedir, filename), os.path.join(move_to, filename))
count += 1
print(str(time.ctime()) + ' : Финиш') #Время финиша
В итоге, за два неполных дня автоматически выгружено и перенесено более 500 ГБ, а это 168 файлов в среднем по 3 ГБ (без учета одного обрыва сессии). Применен метод fetchmany() с возможностью установки приемлемого расхода памяти для комфортной работы с другими задачам без зависания системы и ошибок «memory error».