Python, SQL

Автоматизация загрузки данных из Oracle в MS SQL Server с помощью Python, без создания таблиц вручную и указания типа полей

Время прочтения: 7 мин.

Задачу по загрузке данных из Oracle и их записи в таблицу на MS SQL Server можно решить большим количеством способов. Как правило, даже для выполнения простого запроса приходится потратить большое количество времени, особенно, когда надо передать в Oracle перечень ID для поиска информации. Для решения задачи можно загрузить данные в файлы, затем написать код создания таблицы и в нее с помощью BCP загрузить информацию, либо использовать Мастер импорта и экспорта.

На самом деле, получить данные и создать для них таблицу на MS SQL Server можно используя Python, при этом весь процесс возможно заложить в одну функцию. В такую функцию необходимо заложить следующие алгоритмы:

  • загрузка данных с Oracle;
  • автоматическое определение типов столбцов для таблицы на MS SQL Server на основании типов столбцов Oracle;
  • создание таблицы и вставка в нее полученной с Oracle информации.

Назовем эту функцию oracle_download и определим для нее перечень входящих переменных:

  • table_name – наименование таблицы, в которой мы хотим видеть полученные из Oracle данные;
  • ids_sql – запрос для получения перечня ID, которые будут переданы в запросе к Oracle;
  • ids_column – наименование столбца, содержащего ID из запроса в переменной ids_sql;
  • sql_mask – запрос к Oracle.

В рассматриваемом примере в Oracle будет передаваться перечень ID, содержащий числовые значения. Ниже представлен код для импорта необходимых библиотек, определения строк подключения и указания входных переменных для функции.

import pandas as pd
import pyodbc as sql_db_connect
import cx_Oracle

#Установка подключения к MS SQL Server
link_server = 'Driver={SQL Server Native Client 11.0};Server=*наименование сервера*;Network Library=DBMSSOCN;Trusted_Connection=Yes'
ms_server = sql_db_connect.connect(link_server)

#Установка подключения к Oracle
oracle = cx_Oracle.connect('', '', '*наименование сервера*', encoding = 'UTF-8', nencoding = 'UTF-8')

#Переменная table_name содержит наименование целевой таблицы
table_name = '[DATABASE].[kay].[NTA_test_created_table]'

#Переменная ids_sql содержит запрос для получения перечня ID с MS SQL Server
ids_sql = """select
ID
from [DATABASE].[kay].[NTA_ids] with (nolock)
order by ID"""

#Переменная ids_column содержит наименование столбца, который содержит перечень ID
ids_column = 'ID'

#Переменная sql_mask содержит запрос, выполняемый на стороне Oracle
sql_mask = """SELECT
t1.column_1
,t1.column_2
,t1.column_3
,t1.column_4
,t2.column_ext
FROM oracle_table_1 t1
,oracle_table_2     t2
WHERE (1 = 1)
AND t1.ID in  ( %s )
and t1.EXT_ID = t2.ID"""

Сразу определим функцию для разделения перечня ID на части по несколько штук, так как передавать более 1000 в IN нельзя.

# Функция chunks разделяет выборку на части
def chunks(lst, n):
    for i in range(0, len(lst), n):
        yield lst[i:i + n]

Теперь перейдем непосредственно к написанию кода функции.

# Функция oracle_download содержит код для загрузки по перечню ID информации из БД Oracle и создания таблицы на основе полученных данных
def oracle_download(table_name, ids_sql, ids_column, sql_mask):

Для загрузки данных из таблиц MS SQL Server используется функция read_sql и полученный перечень ID делится на несколько частей, например, по 100 ID (как в коде ниже) для подстановки в запрос к Oracle.

# Получение перечня ID с MS SQL Server
ids_df = pd.read_sql(ids_sql, ms_server)

# Все ID разделяются на несколько частей
val_list = list(ids_df[ids_column].sort_values())
parts = list(chunks(val_list, 100))  # По 100 ID за один запрос к Oracle
sql_lists = []
for part in parts:
    sql_list = ",".join(str(id_) for id_ in part)
    sql_lists.append(sql_list)

Чтобы было удобнее отслеживать процесс загрузки данных, определим значение переменной step, и для создания таблицы статус ее наличия на MS SQL Server.

# Для удобства введена переменная, позволяющая отслеживать прогресс загрузки (увеличивается с каждым успешным выполнением)
step = 1

# Признак наличия таблицы в БД
table_created = 0

Теперь необходимо загрузить данные с Oracle, создать для них таблицу на MS SQL Server и записать в нее данные. Ниже представлен полный код написанной функции (с кодом, представленным ранее):

# Функция oracle_download содержит код для загрузки по перечню ID информации из БД Oracle и создания таблицы на основе полученных данных и з
def oracle_download(table_name, ids_sql, ids_column, sql_mask):
    # Получение перечня ID с MS SQL Server
    ids_df = pd.read_sql(ids_sql, ms_server)

    # Все ID разделяются на несколько частей 
    val_list = list(ids_df[ids_column].sort_values())
    parts = list(chunks(val_list, 100))  # По 100 ID за один запрос к Oracle
    sql_lists = []
    for part in parts:
        sql_list = ",".join(str(id_) for id_ in part)
        sql_lists.append(sql_list)

    # Для удобства введена переменная, позволяющая отслеживать прогресс загрузки (увеличивается с каждым успешным выполнением)
    step = 1

    # Признак наличия таблицы в БД
    table_created = 0

    for sql_list in sql_lists:

        # Подстановка перечня ID в запрос
        sql_query = sql_mask % sql_list

        # Выполнение запроса на Oracle       
        part_df = pd.read_sql(sql_query, oracle)

        # Необходимо проверить, существует ли таблица для вставки данных из Oracle на MS SQL Server
        if table_created == 0:

            # Если таблица существует, выполнится оповещение пользователя, вставка новых строк будет производиться в эту таблицу
            table_exists = pd.read_sql("if OBJECT_ID ('%s') is not null select 1 else select 0" % table_name,
                                       ms_server).values

            # Если таблица для загружаемых данных не создана, тогда выполняется код для ее создания
            if table_exists == 0:
                print('Создание таблицы')

                # Определяется перечень столбцов в запросе на получение данных из Oracle и формируется запрос для определения типов столбцов
                columns_list = list(part_df.columns)
                table_sql_query = """with cols_ident as ( 
                """ + sql_query + """
                )  
                select 
                """ + ','.join(['DUMP(max("' + col + '"))' for col in columns_list]) + """ 
                from cols_ident"""
                table_sql = table_sql_query

                # Загрузка перечня типов входных значений
                col_types = pd.read_sql(table_sql, oracle)

                # Если в выборке весь столбец содержит значения NUll, данный столбец определяется как текст
                types_for_tsql = {'Typ=1': 'nvarchar(255)'
                    , 'Typ=2': 'numeric(38, 2)'
                    , 'Typ=12': 'datetime'
                    , 'Typ=13': 'datetime'
                    , 'Typ=112': 'nvarchar(500)'
                    , 'Typ=180': 'datetime2'
                    , 'Typ=181': 'datetime2'
                    , 'Null': 'nvarchar(255)'}  # Словарь типов значений
                types_list = [types_for_tsql[i.split(' ')[0]] for i in col_types.values[0]]

                # Так как столбцы добавились в обратном порядке, необходимо вернуть прежнюю последовательность
                columns_list.reverse()
                types_list.reverse()

                # Далее формируется скрипт для создания таблицы на MS SQL Server
                create_table = "create table %s ( \n" % table_name
                while len(columns_list) > 0:
                    col_name = columns_list.pop()
                    col_type = types_list.pop()
                    create_table += " [%s]" % col_name + ' ' + col_type + ',\n'
                create_table += ') with (data_compression = PAGE)'

                # Создается таблица       
                cur = ms_server.cursor()
                cur.execute(create_table)
                cur.commit()
                cur.close()
                print('Таблица создана')

                # Устанавливается признак наличия таблицы, чтобы данный блок кода больше не использовался
                table_created = 1

            else:
                table_created = 1
                print('Таблица существует')

        # Запись результата запроса в уже созданную ранее таблицу на MS SQL Server
        print('\rЧасть %s из %s - Вставка на сервер %s строк' % (str(step), str(len(sql_lists)), str(len(part_df))),
              end='')

        # В первую строку для вставки подставляется наименование таблицы
        first_row = "insert into %s values " % table_name

        # Записываются все полученные строки в виде конструкции insert into ... values (...),(...),(...)
        all_inserted_rows = []
        for row in part_df.values:
            # Обратите внимание! Для апострофа в TSQL необходимо добавить еще один такой-же символ, чтобы вставка прошла успешно и не поменялся исходный текст
            row = '( ' + ','.join(["'" + str(val).replace("'", "''") + "'" for val in row]) + ' )'
            all_inserted_rows.append(row)
            # Для ускорения вставки значение можно увеличить до 1000
            inserted_parts = list(chunks(all_inserted_rows, 20))

            # Вставка в таблицу на сервере выполняется частями
        inserted_part = 1
        for inserted_rows in inserted_parts:
            sql_insert = first_row + '\n,'.join(inserted_rows)
            sql_insert = sql_insert.replace('nan', 'Null').replace('NaT', 'Null').replace('None', 'Null').replace(
                "'Null'", 'Null')
            cur = ms_server.cursor()
            cur.execute(sql_insert)
            cur.commit()
            cur.close()
            inserted_part += 1

        step += 1

    print('\nЗагрузка выполнена')

Разберемся, что же происходит, после получения перечня ID с MS SQL Server. После загрузки первого результата с Oracle проверяется наличие таблицы в MS SQL Server с помощью команды OBJECT_ID. Результат проверки записывается в переменную table_exists. Если таблицы нет, она создается с помощью кода далее.

На основе первой полученной выборки по перечню ID, для определения типов столбцов используется встроенная функция DUMP, указывающая числовое значение типа столбца в Oracle. Для основных типов в словаре types_for_tsql определены подходящие типы столбцов в MS SQL Server. Например, если команда DUMP(max(t1.column_1)) показала результат Typ=2 Len=6: 197,7,100,72,33,81 по типу можно сделать вывод, что в столбце хранятся числа, а для столбца DUMP(max(t1.column_2)) результат Typ=1 Len=6: 4,47,4,61,4,48 указывает, что значение строка. Если столбец не содержит никаких значений (т.е. все Null), можно в словаре указать значение по умолчанию, например nvarchar(255) или nvarchar(max). Подробно о функции DUMP написано в документации: https://docs.oracle.com/cd/E11882_01/server.112/e41085/sqlqr06002.htm#SQLQR959

Получив типы всех столбцов создается таблица для загруженных данных командой create table. Для вставки используется конструкция insert into [table] values (…), (…), (…), при этом вставка данных выполняется частями автоматически, как и загрузка с Oracle.

В результате, с помощью одной небольшой функции можно загрузить данные с Oracle, создать для них таблицу на MS SQL Server и записать их в нее:

oracle_download(table_name, ids_sql, ids_column, sql_mask)

Советуем почитать