Время прочтения: 7 мин.
Задачу по загрузке данных из Oracle и их записи в таблицу на MS SQL Server можно решить большим количеством способов. Как правило, даже для выполнения простого запроса приходится потратить большое количество времени, особенно, когда надо передать в Oracle перечень ID для поиска информации. Для решения задачи можно загрузить данные в файлы, затем написать код создания таблицы и в нее с помощью BCP загрузить информацию, либо использовать Мастер импорта и экспорта.
На самом деле, получить данные и создать для них таблицу на MS SQL Server можно используя Python, при этом весь процесс возможно заложить в одну функцию. В такую функцию необходимо заложить следующие алгоритмы:
- загрузка данных с Oracle;
- автоматическое определение типов столбцов для таблицы на MS SQL Server на основании типов столбцов Oracle;
- создание таблицы и вставка в нее полученной с Oracle информации.
Назовем эту функцию oracle_download и определим для нее перечень входящих переменных:
- table_name – наименование таблицы, в которой мы хотим видеть полученные из Oracle данные;
- ids_sql – запрос для получения перечня ID, которые будут переданы в запросе к Oracle;
- ids_column – наименование столбца, содержащего ID из запроса в переменной ids_sql;
- sql_mask – запрос к Oracle.
В рассматриваемом примере в Oracle будет передаваться перечень ID, содержащий числовые значения. Ниже представлен код для импорта необходимых библиотек, определения строк подключения и указания входных переменных для функции.
import pandas as pd
import pyodbc as sql_db_connect
import cx_Oracle
#Установка подключения к MS SQL Server
link_server = 'Driver={SQL Server Native Client 11.0};Server=*наименование сервера*;Network Library=DBMSSOCN;Trusted_Connection=Yes'
ms_server = sql_db_connect.connect(link_server)
#Установка подключения к Oracle
oracle = cx_Oracle.connect('', '', '*наименование сервера*', encoding = 'UTF-8', nencoding = 'UTF-8')
#Переменная table_name содержит наименование целевой таблицы
table_name = '[DATABASE].[kay].[NTA_test_created_table]'
#Переменная ids_sql содержит запрос для получения перечня ID с MS SQL Server
ids_sql = """select
ID
from [DATABASE].[kay].[NTA_ids] with (nolock)
order by ID"""
#Переменная ids_column содержит наименование столбца, который содержит перечень ID
ids_column = 'ID'
#Переменная sql_mask содержит запрос, выполняемый на стороне Oracle
sql_mask = """SELECT
t1.column_1
,t1.column_2
,t1.column_3
,t1.column_4
,t2.column_ext
FROM oracle_table_1 t1
,oracle_table_2 t2
WHERE (1 = 1)
AND t1.ID in ( %s )
and t1.EXT_ID = t2.ID"""
Сразу определим функцию для разделения перечня ID на части по несколько штук, так как передавать более 1000 в IN нельзя.
# Функция chunks разделяет выборку на части
def chunks(lst, n):
for i in range(0, len(lst), n):
yield lst[i:i + n]
Теперь перейдем непосредственно к написанию кода функции.
# Функция oracle_download содержит код для загрузки по перечню ID информации из БД Oracle и создания таблицы на основе полученных данных
def oracle_download(table_name, ids_sql, ids_column, sql_mask):
Для загрузки данных из таблиц MS SQL Server используется функция read_sql и полученный перечень ID делится на несколько частей, например, по 100 ID (как в коде ниже) для подстановки в запрос к Oracle.
# Получение перечня ID с MS SQL Server
ids_df = pd.read_sql(ids_sql, ms_server)
# Все ID разделяются на несколько частей
val_list = list(ids_df[ids_column].sort_values())
parts = list(chunks(val_list, 100)) # По 100 ID за один запрос к Oracle
sql_lists = []
for part in parts:
sql_list = ",".join(str(id_) for id_ in part)
sql_lists.append(sql_list)
Чтобы было удобнее отслеживать процесс загрузки данных, определим значение переменной step, и для создания таблицы статус ее наличия на MS SQL Server.
# Для удобства введена переменная, позволяющая отслеживать прогресс загрузки (увеличивается с каждым успешным выполнением)
step = 1
# Признак наличия таблицы в БД
table_created = 0
Теперь необходимо загрузить данные с Oracle, создать для них таблицу на MS SQL Server и записать в нее данные. Ниже представлен полный код написанной функции (с кодом, представленным ранее):
# Функция oracle_download содержит код для загрузки по перечню ID информации из БД Oracle и создания таблицы на основе полученных данных и з
def oracle_download(table_name, ids_sql, ids_column, sql_mask):
# Получение перечня ID с MS SQL Server
ids_df = pd.read_sql(ids_sql, ms_server)
# Все ID разделяются на несколько частей
val_list = list(ids_df[ids_column].sort_values())
parts = list(chunks(val_list, 100)) # По 100 ID за один запрос к Oracle
sql_lists = []
for part in parts:
sql_list = ",".join(str(id_) for id_ in part)
sql_lists.append(sql_list)
# Для удобства введена переменная, позволяющая отслеживать прогресс загрузки (увеличивается с каждым успешным выполнением)
step = 1
# Признак наличия таблицы в БД
table_created = 0
for sql_list in sql_lists:
# Подстановка перечня ID в запрос
sql_query = sql_mask % sql_list
# Выполнение запроса на Oracle
part_df = pd.read_sql(sql_query, oracle)
# Необходимо проверить, существует ли таблица для вставки данных из Oracle на MS SQL Server
if table_created == 0:
# Если таблица существует, выполнится оповещение пользователя, вставка новых строк будет производиться в эту таблицу
table_exists = pd.read_sql("if OBJECT_ID ('%s') is not null select 1 else select 0" % table_name,
ms_server).values
# Если таблица для загружаемых данных не создана, тогда выполняется код для ее создания
if table_exists == 0:
print('Создание таблицы')
# Определяется перечень столбцов в запросе на получение данных из Oracle и формируется запрос для определения типов столбцов
columns_list = list(part_df.columns)
table_sql_query = """with cols_ident as (
""" + sql_query + """
)
select
""" + ','.join(['DUMP(max("' + col + '"))' for col in columns_list]) + """
from cols_ident"""
table_sql = table_sql_query
# Загрузка перечня типов входных значений
col_types = pd.read_sql(table_sql, oracle)
# Если в выборке весь столбец содержит значения NUll, данный столбец определяется как текст
types_for_tsql = {'Typ=1': 'nvarchar(255)'
, 'Typ=2': 'numeric(38, 2)'
, 'Typ=12': 'datetime'
, 'Typ=13': 'datetime'
, 'Typ=112': 'nvarchar(500)'
, 'Typ=180': 'datetime2'
, 'Typ=181': 'datetime2'
, 'Null': 'nvarchar(255)'} # Словарь типов значений
types_list = [types_for_tsql[i.split(' ')[0]] for i in col_types.values[0]]
# Так как столбцы добавились в обратном порядке, необходимо вернуть прежнюю последовательность
columns_list.reverse()
types_list.reverse()
# Далее формируется скрипт для создания таблицы на MS SQL Server
create_table = "create table %s ( \n" % table_name
while len(columns_list) > 0:
col_name = columns_list.pop()
col_type = types_list.pop()
create_table += " [%s]" % col_name + ' ' + col_type + ',\n'
create_table += ') with (data_compression = PAGE)'
# Создается таблица
cur = ms_server.cursor()
cur.execute(create_table)
cur.commit()
cur.close()
print('Таблица создана')
# Устанавливается признак наличия таблицы, чтобы данный блок кода больше не использовался
table_created = 1
else:
table_created = 1
print('Таблица существует')
# Запись результата запроса в уже созданную ранее таблицу на MS SQL Server
print('\rЧасть %s из %s - Вставка на сервер %s строк' % (str(step), str(len(sql_lists)), str(len(part_df))),
end='')
# В первую строку для вставки подставляется наименование таблицы
first_row = "insert into %s values " % table_name
# Записываются все полученные строки в виде конструкции insert into ... values (...),(...),(...)
all_inserted_rows = []
for row in part_df.values:
# Обратите внимание! Для апострофа в TSQL необходимо добавить еще один такой-же символ, чтобы вставка прошла успешно и не поменялся исходный текст
row = '( ' + ','.join(["'" + str(val).replace("'", "''") + "'" for val in row]) + ' )'
all_inserted_rows.append(row)
# Для ускорения вставки значение можно увеличить до 1000
inserted_parts = list(chunks(all_inserted_rows, 20))
# Вставка в таблицу на сервере выполняется частями
inserted_part = 1
for inserted_rows in inserted_parts:
sql_insert = first_row + '\n,'.join(inserted_rows)
sql_insert = sql_insert.replace('nan', 'Null').replace('NaT', 'Null').replace('None', 'Null').replace(
"'Null'", 'Null')
cur = ms_server.cursor()
cur.execute(sql_insert)
cur.commit()
cur.close()
inserted_part += 1
step += 1
print('\nЗагрузка выполнена')
Разберемся, что же происходит, после получения перечня ID с MS SQL Server. После загрузки первого результата с Oracle проверяется наличие таблицы в MS SQL Server с помощью команды OBJECT_ID. Результат проверки записывается в переменную table_exists. Если таблицы нет, она создается с помощью кода далее.
На основе первой полученной выборки по перечню ID, для определения типов столбцов используется встроенная функция DUMP, указывающая числовое значение типа столбца в Oracle. Для основных типов в словаре types_for_tsql определены подходящие типы столбцов в MS SQL Server. Например, если команда DUMP(max(t1.column_1)) показала результат Typ=2 Len=6: 197,7,100,72,33,81 по типу можно сделать вывод, что в столбце хранятся числа, а для столбца DUMP(max(t1.column_2)) результат Typ=1 Len=6: 4,47,4,61,4,48 указывает, что значение строка. Если столбец не содержит никаких значений (т.е. все Null), можно в словаре указать значение по умолчанию, например nvarchar(255) или nvarchar(max). Подробно о функции DUMP написано в документации: https://docs.oracle.com/cd/E11882_01/server.112/e41085/sqlqr06002.htm#SQLQR959
Получив типы всех столбцов создается таблица для загруженных данных командой create table. Для вставки используется конструкция insert into [table] values (…), (…), (…), при этом вставка данных выполняется частями автоматически, как и загрузка с Oracle.
В результате, с помощью одной небольшой функции можно загрузить данные с Oracle, создать для них таблицу на MS SQL Server и записать их в нее:
oracle_download(table_name, ids_sql, ids_column, sql_mask)