Время прочтения: 7 мин.

Потребность в подобной разработке возникла в связи с необходимостью перемещения больших объемов данных из одной системы управления базами данных в другую. Из-за большого размера выгрузки её пришлось разбивать на множество мелких CSV. Загрузка каждого файла вручную заняла бы много времени. Это и стало причиной создания программы, о которой пойдет речь. Разработанный ноутбук Python будет сам определять типы данных внутри CSV и автоматически загружать их в таблицу БД. В каталоге с ноутбуком должны быть созданы две папки: in (куда нужно сложить загружаемые CSV) и out (куда будут перемещены уже загруженные файлы). После создания папок можно приступить к написанию кода. Первым делом нужно импортировать библиотеки:

import os
import shutil
from datetime import datetime
import pyodbc #связь с СУБД
import pandas as pd #чтение csv
from tqdm import tqdm #полоска прогресса

Перед тем, как загружать CSV с помощью ноутбука, нужно обязательно проверить, чтобы все файлы CSV имели одинаковые разделители и кодировку. После проверки их нужно выписать в отдельные переменные. Последняя обязательная переменная – название таблицы, куда будут загружаться данные. По необходимости можно добавить прочие параметры для чтения CSV через pandas.

encd = 'utf-8'
delim = ','
table_name = 'schema.sample_table'

Теперь о типах данных в СУБД. Чаще всего в таблицах используются числа, строки и даты. Поэтому данные будут приводиться к четырем типам: int (целые числа), float (вещественные числа), str (строки) и datetime (дата-время). Изначально значения в CSV будут читаться как строки, поэтому необходимо написать функции для преобразования в остальные три типа. Преобразование значения в число можно сделать с помощью прямого приведения значения к типу, но есть нюансы:

  • Если последовательность цифр начинается с нуля, то при преобразовании в целочисленный тип ноль потеряется, что нежелательно;
  • Если последовательность цифр, как число по модулю, превосходит 2 в степени 63 (максимальное значение типа bigint), то произойдет переполнение и мы потеряем данные;
  • Если в колонке с числами в датафрейме Pandas есть значения None, то все числа будут преобразованы в тип float, поэтому для типа int будем заполнять пустые значения нулями.

В итоге, если значение не проходит проверки, выдаётся такое же исключение, как если бы значение не подходило под тип.

ceiling = 2**63-1

def convert_int(x):
    if pd.isnull(x):
        return 0
    if x[0] == '0' and len(x)>1:
        raise ValueError
    if abs(int(x)) >= ceiling:
        raise ValueError
    return int(x)

def convert_float(x):
    if pd.isnull(x):
        return None
    if len(x) > 1:
        if x[0] == 0 and x[1] != '.': 
            #два условия стоят раздельно, чтобы не вызывать исключений
            raise ValueError
    return float(x)

Преобразование в дату будет производиться через стандартную библиотеку datetime. При необходимости можно установить свой формат даты.

def convert_date(x): 
    if pd.isnull(x):
        return None 
    return datetime.strptime(x,'%Y-%m-%d %H:%M:%S') 

Теперь переходим к механизму задания типов для значений в CSV. Суть состоит в том, чтобы взять первый файл в массиве как образец, по которому будут определяться типы данных для таблицы в БД. Для этого создается класс SampleTable:

class SampleTable:
    def __init__(self):
        self.df = None #DataFrame-образец
        self.dtypes = dict() #типы данных Python
        self.dt_cols = list() #список колонок с датами
        self.sql_types = dict() #типы данных SQL

В следующую очередь создается метод для взятия файла-образца:

def take_sample_df(self,file):
        self.df = pd.read_csv(f'in/{file}', encoding = encd, sep = delim, dtype = str)
        self.convert_columns() #метод для анализа типов Python
        self.compile_sql_types() #метод для анализа типов SQL
        self.col_names = self.df.columns.tolist()
        shutil.move(f'in/{file}',f'out/{file}') #после обработки файла-образца перемещаем его в папку out

После чтения файла-образца программа попытается найти подходящие типы Python и SQL. Метод convert_columns проверит все колонки на соответствие типам чисел или дат. Для колонок и типами Python float и datetime сразу прописываются соответствующие типы SQL.

def convert_columns(self):
    for column in self.df.columns:
        #если колонка пустая, то помечаем ее как строчный тип
        if not any(self.df[column]): 
            self.dtypes[column]=str
            continue
        #Сначала пробуем конвертировать значения в даты
        try:
            self.df[column] = self.df[column].apply(convert_date)
            self.dtypes[column]=datetime
            #Сразу прописываем SQL тип datetime
            self.sql_types[column]='datetime'
        except ValueError:
            pass
        else:
            continue
        #Если в колонке не даты, пробуем конвертировать в числа
        try:
            self.df[column] = self.df[column].apply(convert_int)
            self.dtypes[column]=int
            #Тип SQL пока что не указывается, 
            #так как есть два SQL типа для целых чисел
        except ValueError:
            pass
        else:
            continue
        try:
            self.df[column] = self.df[column].apply(convert_float)
            self.dtypes[column]=float
            self.sql_types[column]='float'
        except ValueError:
            #Если в колонке не даты и не числа, то оставляем значения в строках 
            self.dtypes[column]=str
            #Тип SQL пока что не указывается, так как длина строк неизвестна

Далее следует метод для анализа типов SQL для строк и целых чисел на основе длин значений в двоичном коде.

def compile_sql_types(self):
    for column in self.df.columns:
        #Если колонка содержит целые числа
        if self.dtypes[column] == int:
            #Находим длину самого большого числа в битах
            max_num = max(self.df[column])
            if max_num.bit_length() <=32:
                self.sql_types[column]='int'
            else:
                self.sql_types[column]='bigint'
            continue
        #Если колонка содержит строки и не содержит дат
        if self.dtypes[column] == str and column not in self.dt_cols:
            #Находим самую большую длину среди строк в байтах
            lens = self.df[column].apply(lambda x: 0 if pd.isnull(x) else len(x.encode('utf-8'))).tolist()
            max_len = max(lens)
            if max_len <= 255:
                self.sql_types[column] = f'varchar(255)'
            else:
                self.sql_types[column] = f'varchar(max)'

Теперь для класса SampleTable пишется метод для создания таблицы БД на основе выбранных типов SQL при условии, что она не существует:

def sql_create_table(self, conn):
    cols = []
    #Собираем пары вида "название_колонки тип"
    for column in self.df.columns:
        sql_col_type = self.sql_types[column]
        cols.append(f'{column} {sql_col_type}')
    #Объединяем пары через запятую для построения запроса
    cols = ','.join(cols)
    query = f"if not exists (select * from sys.tables where name='{table_name}') create table {table_name} ({cols})"
    try:
        cursor = conn.cursor()
        cursor.execute(query)
        cursor.commit()
    except Exception as err:
        #При неудаче откатываем операцию
        print(err)
        cursor.rollback()
    finally:
        cursor.close()

Следующий метод загрузит содержимое таблицы-образца в созданную таблицу БД:

def sql_load_sample(self, conn):
    #Шаблон запроса для pyodbc имеет вид "insert into table values (?,?,?,?,...)"
    #Количество вопросительных знаков должно соответствовать числу колонок в таблице
    query = 'insert into {} values ({})'.format(table_name,','.join(['?']*len(self.df.columns)))
    try:
        cursor = conn.cursor()
        #Опция cursor.fast_executemany доступна в pyodbc версии 4.0.19 и выше
        cursor.fast_executemany = True
        cursor.executemany(query, self.df.values.tolist())
        cursor.commit()
    except Exception as e:
        #При неудаче откатываем операцию
        print(e)
        cursor.rollback()
    finally:
        cursor.close()

Последний метод класса SampleTable – sql_load_file, в который передается имя файла для его загрузки в таблицу БД. Колонки в загружаемом файле будут преобразованы в типы, соответствующие типам колонок в файле-образце.

def sql_load_file(self, file, conn):
    df = pd.read_csv(f'in/{file}', encoding = encd, sep = delim, dtype = str)
    #Преобразовываем колонки
    for column in df.columns:
        if self.dtypes[column]==datetime:
            df[column] = df[column].apply(convert_date)
        elif self.dtypes[column]==int:
            df[column] = df[column].apply(convert_int)
        elif self.dtypes[column]==float:
            df[column] = df[column].apply(convert_float)
    query = 'insert into {} values ({})'.format(table_name,','.join(['?']*len(df.columns)))
    try:
        cursor = conn.cursor()
        cursor.fast_executemany = True
        cursor.executemany(query, df.values.tolist())
        cursor.commit()
        cursor.close()
        #После загрузки CSV в БД отправляем его в папку с отработанными файлами
        shutil.move(f'in/{file}',f'out/{file}')
    except Exception as e:
        print(e)
        print(file)
        cursor.rollback()
    finally:
        cursor.close()

Теперь, когда класс SampleTable готов, можно приступать к процедуре загрузки CSV в БД. Как говорилось ранее, в качестве образца будет браться первый файл по списку из директории in. После чтения файла программа определит подходящие типы для колонок в таблице.

sample_table = SampleTable()
sample_table.take_sample_df(os.listdir('in')[0])
Далее создается соединение с СУБД. В примере показано соединение с Microsoft SQL Server.
conn = pyodbc.connect("Driver={SQL Server native Client 11.0};"
                      "Server=data_warehouse;"
                      "Port=0000;"
                      "Database=sample_db;"
                      "Trusted_Connection=yes", autocommit=False)

При получении соединения можно попытаться загрузить таблицу-образец:

sample_table.sql_create_table(conn)
sample_table.sql_load_sample(conn)

Если таблица-образец успешно загрузилась, то можно загружать остальные CSV.

for file in tqdm(os.listdir('in')):
    sample_table.sql_load_file(file,conn)

При возникновении исключения при загрузке метод SampleTable.sql_load_file выведет текст ошибки и имя файла, который программа не смогла загрузить. Файлы, которые были успешно загружены, перемещаются в директорию out. Таким образом, если возникнет необходимость прервать работу программы, не придётся загружать все файлы заново, и можно будет продолжить работу с того же места, где прекратили загрузку. По окончанию загрузки нужно обязательно закрыть соединение:

conn.close()

Каковы итоги? Ноутбук позволит значительно сократить ручной труд при загрузке массива CSV. Однако, ошибки не исключены. В таком случае всё же придётся заниматься отладкой вручную и, возможно, искать изъяны в самих файлах. Что можно улучшить? Один из вариантов – использование библиотеки sqlalchemy вместо pyodbc.

  • Во-первых, исходя из собственной практики, могу сказать, что pyodbc не всегда надёжно работает с пустыми значениями из датафреймов.
  • Во вторых, датафреймы Pandas имеют метод to_sql, которые принимают соединения типа sqlalchemy.engine для загрузки своего содержимого в БД.

Если у вас есть иные идеи для улучшения программы, можете поделиться мнением в комментариях.

Вы можете просмотреть ноутбук по ссылке на GitHub.