Время прочтения: 9 мин.

Всем привет! Сначала хотел назвать статью «Graph mining в 1 строчку кода», но это настолько неправдоподобно, что хочется дописать «без регистрации и СМС». Этого я делать, конечно же, не буду, поэтому решил немного переиначить смысл статьи, а заодно затронуть алгоритмы такого пугающего на первый взгляд слова как «графы». Итак, в настоящей статье хочу поделиться инструментом, помогающим выявлять связи между проверяемыми сущностями, но расскажу про него не с точки зрения графового анализа данных, а с точки зрения алгоритма действия и его реализации на Python без использования специализированных библиотек. Согласитесь, реализовывать алгоритмы самостоятельно, пусть даже несложные – полезная практика, и порой для глубокого понимания решения задачи недостаточно использовать питоновский import something и брать готовый метод – куда полезнее понимать на уровне кода, как это работает. Об этом и поговорим ниже. Сразу отмечу, что упора на код не делаю, здесь мы ставим задачу понимания сути решения. Поэтому там, где код неоптимален, скорее всего это сделано в угоду пониманию происходящего. Тем не менее, предложения по оптимизации приветствуются – нет предела совершенству. Также хочу остановиться на упомянутом выше слове «сущности» — я не нашел всеобъемлющего слова, куда бы попали связи между клиентами, организациями, сетевыми устройствами, ну и так далее.

Итак, дано: датасет операций между физическими лицами (ФЛ), выглядит так:

Фиолетовым цветом выделил пример связи, которую мы хотим выявить.

Задача: установить связи между ФЛ, то есть построить цепочки взаимодействия типа: ФЛ1 перечислял ФЛ2, ФЛ2 перечислял ФЛ3, ФЛ3 перечислял ФЛ4, и так далее. То есть на выходе надо получить такую картину:

Поясняю: здесь те же наши клиенты, что и на скрине исходных данных, но выстроенные в цепочку связей, то есть между Людмилой и Натальей был совершен платеж, в свою очередь Наталья финансово хоть раз была связана с Яной, а Яна переводила Светлане.

В качестве инструментов возьму классический Python и его модуль Pandas. Да, в блоке импорта у меня будет одна строчка: import pandas as pd. Перейдем непосредственно к коду:

Считаем данные и посмотрим на них:

df = pd.read_excel(r'D:\file_graph_mining.xlsx')

Нам из пула данных нужны только 2 столбца, причем в полученном датасете из двух столбцов не должно быть одинаковых строк:

df = df[['client_a', 'client_b']].drop_duplicates()

Приступаем к написанию алгоритма. Итак, в моей реализации это выглядит так: мы берем исходный датафрейм и циклично добавляем к нему самого себя, но по определенной связке: каждый раз ищем в атрибуте client_a_XX тех, кто есть в атрибуте client_b_XX, где ХХ – номер (итератор) цикла. Повторение – мать учения, поэтому на примере первой картинки, пока очищенной от лишних связей:

Мы нашли связь Людмила + Наталья, дальше мы ищем Наталью в столбце с Людмилой: вуаля – она нашлась в связке Наталья + Яна, дальше ищем и находим Яну. Ок, возвращаемся к коду. Для того чтобы данные соединять с этими же данными – создадим 2 одинаковых пула: исходный, на который будем накручивать его копии, и саму неизменяемую копию, на каждой итерации которую будем соединять с постоянно растущим исходным датафреймом.

Итак, вот создание копии:

merged_df = df #где merged_df, исходный, а df – копия, которую будем «мерджить»

Дальше создадим несколько вспомогательных переменных.

Первая – names_list — список названий столбцов. Его конечно можно получить, например, с помощью

[x for x in merged_df.columns.values]

Но нам нужны и будущие названия атрибутов, которые еще не существуют. Поэтому names_list у нас выглядит так:

names_list = ['client_a', 'client_b', 'client_a_1', 'client_b_1', 'client_a_2', 'client_b_2', 'client_a_3', 'client_b_3']

То есть для каждой пары client_a / client_b создаем их «дочерние» client_a_1 / client_b_1, client_a_2 / client_b_2 и тд.

Здесь, во-первых, для примера умышленно ограничимся 3 итерациями, а во-вторых, создание такого списка с названиями атрибутов легко автоматизируется, код приводить не буду.

На основе первой вспомогательной переменной создадим еще одну, она понадобится для условия выборки данных в дальнейшем:

names_list_index = [i for i in range(len(names_list)) if i%2]
names_list_index

Третья вспомогательная переменная – i:

i = 2

Это отправная точка для переходов между атрибутами, то есть:

for k in range(1,2):
    print(  names_list[i]
          , names_list[i+1]
          , names_list[i-1]
          , names_list[i]
          , sep='\n')

Вернет

И это нам позволит перемещаться между атрибутами на каждой итерации. Другими словами, i – это значение «индекса», меняя которое будем задавать нужные в конкретной итерации цикла атрибуты. Согласен, не объяснил, но дальше станет яснее.

Собственно, сама обработка и построение связей реализуется так:

for j in range(1,4):
    merged_df = merged_df\
                    .merge(df.set_axis([names_list[i], names_list[i+1]], axis=1), how='inner', left_on=names_list[i-1], right_on=names_list[i])\
                    .fillna('')\
                    .query(f'{names_list[i+1]} not in {[names_list[x-1] for x in names_list_index][:j]}')
    print(names_list[i+1], [names_list[x-1] for x in names_list_index][:j])
    i = i + 2
print(merged_df.shape)

Что мы здесь делаем:

Создаем цикл из 2 итераций, как говорили ранее (range(1,3)). Здесь можно позволить задавать число итераций автоматически, то есть что-то типа «пока последний атрибут датафрейма merged_df непустой» — через конструкцию while;

Соединяем исходный датафрейм (merged_df) с копией исходного (df) таким образом, что:

  1. Первое: меняем названия атрибутов присоединяемой копии (df) через конструкцию df.set_axis([names_list[i], names_list[i+1]], axis=1);
  2. Второе: присоединяем копию с уже переименованными атрибутами в ней, вот так:

Где красной стрелкой выделена связь, по которой происходит соединение фреймов данных. Обратите внимание, что в качестве ключей для связи у меня указаны не константы, а значения, которые меняются в зависимости от итерации цикла (это я про left_on и right_on).

Вернемся к коду: .fillna(‘’) убережет нас от NaN при типе соединения “left join”.

Отдельно остановимся на строке условия .query(…). Есть одно важное замечание. Без этого условия код в некоторых случаях возвращает вот такие повторения:

То есть Людмила переводила Наталье, Наталья переводила Людмиле, а Людмила снова Наталье. И это при том, что связки у нас уникальные, а значит ошибочно будет предполагать, что это 3 реальных перевода. Избавляемся от этого изъяна. Логика следующая – на каждой итерации присоединения поля, которые не являются ключами в объединении, не должны быть равны. То есть (простите за мой SQL):

На 1 итерации: client_b_1 not in (‘client_a’)

На 2 итерации: client_b_2 not in (‘client_a’, ‘client_a_1’)

На 3 итерации: client_b_3 not in (‘client_a’, ‘client_a_1’, ‘client_a_2’)

На 3 итерации: client_b_4 not in (‘client_a’, ‘client_a_1’, ‘client_a_2’, ‘client_a_3’)

На первый взгляд кажется, что эти условия легко поддаются автоматизации на каждой итерации. Да – поддаются, нет – не легко. Посмотрите на предыдущую картинку, но с указанием индексов атрибутов:

А теперь попробуем по ним пройтись. Для начала просто переберем значения нашего списка names_list:

[x for x in names_list]

Хорошо, теперь переберем индексы нашего names_list:

[names_list.index(x) for x in names_list]

Ок, теперь переберем индексы списка, но вернем значения: [names_list[x] for x in range(8)]

Неплохо. Возвращаемся к требуемому условию – нам надо получить такой динамический список, который на каждой следующей итерации добавлял один атрибут в условие. Что-то типа:

for i in range(0, len(names_list)+1): print(i, names_list[:i])

Но возвращать не все значения, а согласно sql-подобному условию выше, то есть:

Здесь я убрал лишние итерации, красным и синим прямоугольником выделил атрибуты, участвующие в условии на каждой итерации. То есть значение из красного не должно равняться значениям в синих. Также здесь можно сделать вывод, что на каждой итерации нас интересуют те атрибуты, индексы которых делятся нацело на 2. Для понимания сути ниже я привел вывод результата в соответствии с sql-подобным условием:

names_list_index = [i for i in range(len(names_list)) if i%2]
for k in range(len(names_list_index)):
print([names_list[x] for x in names_list_index][:k+1][-1:], 'NOT IN (' + str([names_list[x-1] for x in names_list_index][:k]) + ')')

Как раз то что нам надо!

И последняя строчка цикла i = i + 1. Это сделано для динамической адресации к атрибутам ячейки в рамках цикла. То есть, на примере куска кода:

left_on=names_list[i-1], right_on=names_list[i]

На первой итерации цикла при i = 2 имеем (внимание, итерация по индексам списка — с нуля):

  1. Ключ исходного датасете names_list[i-1], или names_list[2-1], или атрибут «client_b», то есть первый индекс списка names_list;
  2. Ключ присоединяемого датасета names_list[i], или names_list[2], или атрибут «client_a_1», то есть второй индекс списка names_list.

Таким образом получили итоговый датафрейм merged_df, который, если убрать пары атрибутов на каждой итерации, а оставить только связующие сущности (участники цепочки), будет выглядеть так:

merged_df[['client_a', 'client_b_1', 'client_b_2', 'client_b_3']]

Наконец, алгоритм поиска связей реализован. Дальше можно выгружать его, допустим в Excel, и накручивать дополнительные другие атрибуты, требуемые для анализа. Дополнительно отмечу, что данный подход (получение графа в виде таблицы, а не его визуализация) многократно снижает время обработки исходного датасета, а в некоторых случаях таблица – чуть ли не единственный выход для, например, анализа такого графа:

Это еще не все. Для тех, кто хочет сохранить последовательность транзакций для выстраивания, например, пути перемещения денежных средств, а не просто связей, можно добавить атрибут с датой совершения транзакции и немного изменить присоединяемый на каждой итерации датафрейм с помощью слайсов (slices) и некоторого условия по дате. Кому нужна частота транзакций – можно добавить атрибут с количеством переводов, но тогда не нужно избавляться от дубликатов связей, что мы делали на этапе подготовки данных, а сгруппировать данные. Также отмечу, что данная логика прекрасно реализуется и на SQL – вы можете использовать либо несколько конструкций JOIN, либо конструкцию с циклом, либо вовсе написать рекурсивный запрос. Это полезно в случае, если исходный датасет при загрузке через pd.read_excel вызовет memory error.