Время прочтения: 7 мин.
Потребность в подобной разработке возникла в связи с необходимостью перемещения больших объемов данных из одной системы управления базами данных в другую. Из-за большого размера выгрузки её пришлось разбивать на множество мелких CSV. Загрузка каждого файла вручную заняла бы много времени. Это и стало причиной создания программы, о которой пойдет речь. Разработанный ноутбук Python будет сам определять типы данных внутри CSV и автоматически загружать их в таблицу БД. В каталоге с ноутбуком должны быть созданы две папки: in (куда нужно сложить загружаемые CSV) и out (куда будут перемещены уже загруженные файлы). После создания папок можно приступить к написанию кода. Первым делом нужно импортировать библиотеки:
import os
import shutil
from datetime import datetime
import pyodbc #связь с СУБД
import pandas as pd #чтение csv
from tqdm import tqdm #полоска прогресса
Перед тем, как загружать CSV с помощью ноутбука, нужно обязательно проверить, чтобы все файлы CSV имели одинаковые разделители и кодировку. После проверки их нужно выписать в отдельные переменные. Последняя обязательная переменная – название таблицы, куда будут загружаться данные. По необходимости можно добавить прочие параметры для чтения CSV через pandas.
encd = 'utf-8'
delim = ','
table_name = 'schema.sample_table'
Теперь о типах данных в СУБД. Чаще всего в таблицах используются числа, строки и даты. Поэтому данные будут приводиться к четырем типам: int (целые числа), float (вещественные числа), str (строки) и datetime (дата-время). Изначально значения в CSV будут читаться как строки, поэтому необходимо написать функции для преобразования в остальные три типа. Преобразование значения в число можно сделать с помощью прямого приведения значения к типу, но есть нюансы:
- Если последовательность цифр начинается с нуля, то при преобразовании в целочисленный тип ноль потеряется, что нежелательно;
- Если последовательность цифр, как число по модулю, превосходит 2 в степени 63 (максимальное значение типа bigint), то произойдет переполнение и мы потеряем данные;
- Если в колонке с числами в датафрейме Pandas есть значения None, то все числа будут преобразованы в тип float, поэтому для типа int будем заполнять пустые значения нулями.
В итоге, если значение не проходит проверки, выдаётся такое же исключение, как если бы значение не подходило под тип.
ceiling = 2**63-1
def convert_int(x):
if pd.isnull(x):
return 0
if x[0] == '0' and len(x)>1:
raise ValueError
if abs(int(x)) >= ceiling:
raise ValueError
return int(x)
def convert_float(x):
if pd.isnull(x):
return None
if len(x) > 1:
if x[0] == 0 and x[1] != '.':
#два условия стоят раздельно, чтобы не вызывать исключений
raise ValueError
return float(x)
Преобразование в дату будет производиться через стандартную библиотеку datetime. При необходимости можно установить свой формат даты.
def convert_date(x):
if pd.isnull(x):
return None
return datetime.strptime(x,'%Y-%m-%d %H:%M:%S')
Теперь переходим к механизму задания типов для значений в CSV. Суть состоит в том, чтобы взять первый файл в массиве как образец, по которому будут определяться типы данных для таблицы в БД. Для этого создается класс SampleTable:
class SampleTable:
def __init__(self):
self.df = None #DataFrame-образец
self.dtypes = dict() #типы данных Python
self.dt_cols = list() #список колонок с датами
self.sql_types = dict() #типы данных SQL
В следующую очередь создается метод для взятия файла-образца:
def take_sample_df(self,file):
self.df = pd.read_csv(f'in/{file}', encoding = encd, sep = delim, dtype = str)
self.convert_columns() #метод для анализа типов Python
self.compile_sql_types() #метод для анализа типов SQL
self.col_names = self.df.columns.tolist()
shutil.move(f'in/{file}',f'out/{file}') #после обработки файла-образца перемещаем его в папку out
После чтения файла-образца программа попытается найти подходящие типы Python и SQL. Метод convert_columns проверит все колонки на соответствие типам чисел или дат. Для колонок и типами Python float и datetime сразу прописываются соответствующие типы SQL.
def convert_columns(self):
for column in self.df.columns:
#если колонка пустая, то помечаем ее как строчный тип
if not any(self.df[column]):
self.dtypes[column]=str
continue
#Сначала пробуем конвертировать значения в даты
try:
self.df[column] = self.df[column].apply(convert_date)
self.dtypes[column]=datetime
#Сразу прописываем SQL тип datetime
self.sql_types[column]='datetime'
except ValueError:
pass
else:
continue
#Если в колонке не даты, пробуем конвертировать в числа
try:
self.df[column] = self.df[column].apply(convert_int)
self.dtypes[column]=int
#Тип SQL пока что не указывается,
#так как есть два SQL типа для целых чисел
except ValueError:
pass
else:
continue
try:
self.df[column] = self.df[column].apply(convert_float)
self.dtypes[column]=float
self.sql_types[column]='float'
except ValueError:
#Если в колонке не даты и не числа, то оставляем значения в строках
self.dtypes[column]=str
#Тип SQL пока что не указывается, так как длина строк неизвестна
Далее следует метод для анализа типов SQL для строк и целых чисел на основе длин значений в двоичном коде.
def compile_sql_types(self):
for column in self.df.columns:
#Если колонка содержит целые числа
if self.dtypes[column] == int:
#Находим длину самого большого числа в битах
max_num = max(self.df[column])
if max_num.bit_length() <=32:
self.sql_types[column]='int'
else:
self.sql_types[column]='bigint'
continue
#Если колонка содержит строки и не содержит дат
if self.dtypes[column] == str and column not in self.dt_cols:
#Находим самую большую длину среди строк в байтах
lens = self.df[column].apply(lambda x: 0 if pd.isnull(x) else len(x.encode('utf-8'))).tolist()
max_len = max(lens)
if max_len <= 255:
self.sql_types[column] = f'varchar(255)'
else:
self.sql_types[column] = f'varchar(max)'
Теперь для класса SampleTable пишется метод для создания таблицы БД на основе выбранных типов SQL при условии, что она не существует:
def sql_create_table(self, conn):
cols = []
#Собираем пары вида "название_колонки тип"
for column in self.df.columns:
sql_col_type = self.sql_types[column]
cols.append(f'{column} {sql_col_type}')
#Объединяем пары через запятую для построения запроса
cols = ','.join(cols)
query = f"if not exists (select * from sys.tables where name='{table_name}') create table {table_name} ({cols})"
try:
cursor = conn.cursor()
cursor.execute(query)
cursor.commit()
except Exception as err:
#При неудаче откатываем операцию
print(err)
cursor.rollback()
finally:
cursor.close()
Следующий метод загрузит содержимое таблицы-образца в созданную таблицу БД:
def sql_load_sample(self, conn):
#Шаблон запроса для pyodbc имеет вид "insert into table values (?,?,?,?,...)"
#Количество вопросительных знаков должно соответствовать числу колонок в таблице
query = 'insert into {} values ({})'.format(table_name,','.join(['?']*len(self.df.columns)))
try:
cursor = conn.cursor()
#Опция cursor.fast_executemany доступна в pyodbc версии 4.0.19 и выше
cursor.fast_executemany = True
cursor.executemany(query, self.df.values.tolist())
cursor.commit()
except Exception as e:
#При неудаче откатываем операцию
print(e)
cursor.rollback()
finally:
cursor.close()
Последний метод класса SampleTable – sql_load_file, в который передается имя файла для его загрузки в таблицу БД. Колонки в загружаемом файле будут преобразованы в типы, соответствующие типам колонок в файле-образце.
def sql_load_file(self, file, conn):
df = pd.read_csv(f'in/{file}', encoding = encd, sep = delim, dtype = str)
#Преобразовываем колонки
for column in df.columns:
if self.dtypes[column]==datetime:
df[column] = df[column].apply(convert_date)
elif self.dtypes[column]==int:
df[column] = df[column].apply(convert_int)
elif self.dtypes[column]==float:
df[column] = df[column].apply(convert_float)
query = 'insert into {} values ({})'.format(table_name,','.join(['?']*len(df.columns)))
try:
cursor = conn.cursor()
cursor.fast_executemany = True
cursor.executemany(query, df.values.tolist())
cursor.commit()
cursor.close()
#После загрузки CSV в БД отправляем его в папку с отработанными файлами
shutil.move(f'in/{file}',f'out/{file}')
except Exception as e:
print(e)
print(file)
cursor.rollback()
finally:
cursor.close()
Теперь, когда класс SampleTable готов, можно приступать к процедуре загрузки CSV в БД. Как говорилось ранее, в качестве образца будет браться первый файл по списку из директории in. После чтения файла программа определит подходящие типы для колонок в таблице.
sample_table = SampleTable()
sample_table.take_sample_df(os.listdir('in')[0])
Далее создается соединение с СУБД. В примере показано соединение с Microsoft SQL Server.
conn = pyodbc.connect("Driver={SQL Server native Client 11.0};"
"Server=data_warehouse;"
"Port=0000;"
"Database=sample_db;"
"Trusted_Connection=yes", autocommit=False)
При получении соединения можно попытаться загрузить таблицу-образец:
sample_table.sql_create_table(conn)
sample_table.sql_load_sample(conn)
Если таблица-образец успешно загрузилась, то можно загружать остальные CSV.
for file in tqdm(os.listdir('in')):
sample_table.sql_load_file(file,conn)
При возникновении исключения при загрузке метод SampleTable.sql_load_file выведет текст ошибки и имя файла, который программа не смогла загрузить. Файлы, которые были успешно загружены, перемещаются в директорию out. Таким образом, если возникнет необходимость прервать работу программы, не придётся загружать все файлы заново, и можно будет продолжить работу с того же места, где прекратили загрузку. По окончанию загрузки нужно обязательно закрыть соединение:
conn.close()
Каковы итоги? Ноутбук позволит значительно сократить ручной труд при загрузке массива CSV. Однако, ошибки не исключены. В таком случае всё же придётся заниматься отладкой вручную и, возможно, искать изъяны в самих файлах. Что можно улучшить? Один из вариантов – использование библиотеки sqlalchemy вместо pyodbc.
- Во-первых, исходя из собственной практики, могу сказать, что pyodbc не всегда надёжно работает с пустыми значениями из датафреймов.
- Во вторых, датафреймы Pandas имеют метод to_sql, которые принимают соединения типа sqlalchemy.engine для загрузки своего содержимого в БД.
Если у вас есть иные идеи для улучшения программы, можете поделиться мнением в комментариях.
Вы можете просмотреть ноутбук по ссылке на GitHub.