Время прочтения: 3 мин.

В рамках аудиторских проверок возникают ситуации, когда необходимо выгрузить определенную информацию из всех таблиц базы данных. Например, информацию о рейтингах заемщиков для отслеживания ее изменения.

Однако, в различных таблицах поля, содержащие ИНН и рейтинги, могут называться по-разному. Да, можно перечислить их все, но если таблиц не одна и не две, а, например, несколько десятков? Для решения такой задачи необходимо выгрузить все поля таблиц, названия которых содержат подстроку «rating» по значению из полей, содержащих ИНН («inn»).

Информацию будем выгружать из внешнего хранилища Hive с помощью Spark. Для этого, прежде всего, создадим текстовый файл «tn.txt» с именами таблиц базы данных, информацию из которых необходимо выгрузить. При формировании файла нужно учесть, чтобы в его структуре не было пустых строк. Пример файла с названиями таблиц изображен на рисунке 1.

Рисунок 1 – Структура текстового файла с названиями таблиц

Далее запускаем скрипт на Python. Подключаемся к Hive. Код для подключения в JupyterHub к PySpark приведен в статье

Затем создаем точку входа для создания DataFrame и использования функций SQL.

import pandas as pd
SparkContext.setSystemProperty("hive.metastore.uris", "thrift://nnl:9083")
sparkSession=(SparkSession
             .builder
             .enableHiveSupport()
             .getOrCreate())
sqlContext = SQLContext(sc)

После отработки предыдущего кода передаем имя базы данных в строковую переменную, а также список ИНН заемщиков, по которым необходимо выгрузить информацию о рейтингах

bd = 'my_bd'
inn=('**********','**********')

И, наконец, выгружаем информацию о рейтингах из всех таблиц, указанных ранее. При этом данные выгружаются в файлы с расширением .xlsx, которые именуются по шаблону «bd_tn.xlsx», где bd – название базы данных, tn –название таблицы, из которой выгружены данные.

#файл с названиями таблиц для выгрузки
fn = 'tn.txt'
f = open(fn,'r',encoding='UTF-8') 
#цикл по таблицам из базы данных, указанным в файле
for dt in f:
    #имя файла,в который выгружаем
    new_file = bd +'_'+str(dt.replace('\n','')) 
#таблица с колонками 
    sql = """describe """ + bd + """.""" + str(dt.replace('\n',''))
    columns = sparkSession.sql(sql)
    #колонки из таблицы, содержащие подстроки "inn" и "rating"
    columns_inn = columns.toPandas().loc[columns.toPandas()['col_name'].str.contains('inn')] 
    columns_rating = columns.toPandas().loc[columns.toPandas()['col_name'].str.contains('rating')]
    if len(columns_rating)>0 and len(columns_inn)>0:
        cols_need = str(list(columns_rating.col_name)).replace('[', '').replace(']', '').replace("'", '')  
        cols_inn = str(list(columns_inn.col_name)).replace('[', '').replace(']', '').replace("'", '')      
        #формирование части SQL запроса с ИНН
        inn_sql = ''
        flag=0
        for item in list(columns_inn.col_name):
            if flag==0:
                inn_sql = inn_sql + item+' in '+str(inn) 
            else:
                inn_sql = inn_sql + ' or ' + item+' in '+str(inn) 
            flag=flag+1
 
        sql = """SELECT DISTINCT """ + cols_need +""", """+ cols_inn + """ FROM """ + bd + """."""  + str(dt.replace('\n','')) + """ where """+ inn_sql
        print(sql)

        df = sparkSession.sql(sql)
        print('Результат получен из базы')
        
        pddf = df.toPandas()
        print('Результат переведен в Pandas')
        
        if len(pddf)>0:            
            pddf.to_excel(new_file + '.xlsx')
            print(str(l) + ': Выгружено from '+str(new_file) )
        
f.close()

После отработки скрипта файлы с данными будут находиться в текущей директории. По окончании работы необходимо выполнить следующую команду, чтобы освободить ресурсы кластера.

sc.stop()

Таким образом, мы автоматизировали выгрузку данных для случая, когда необходимо по ИНН выгрузить определенную информацию из списка таблиц, например, рейтинги заемщика. С помощью скрипта на Python можно узнать все значения рейтинга из таблиц базы данных, не прибегая к перечислению точных названий колонок.