Анализ процессов, Графы

Простой Graph Mining на PySpark

Время прочтения: 6 мин.

В практике внутреннего аудита встречаются задачи, в которых необходимо выявить сомнительные операции по перечислению денежных средств между счетами. В данной статье поэтапно рассмотрим, как происходит оборот средств на синтетических данных.

Применение Spark для анализа данных подразумевает большой объем данных,  но для наглядности будем использовать небольшой набор данных, который состоит из 4 столбцов и 712 строк. Полный код и вспомогательные  материалы можно найти на github странице.

Анализ данных будем проводить в два этапа, а именно, получение статистик и визуализация. В качестве статистик извлечем временной интервал (медианное время между транзакциями для каждого отправителя), а также частота и сумма между отправителем и получателем перевода. Визуализацию будем осуществлять с помощью graphviz, т.к. на данный момент для PySpark нет полноценного инструмента (opensource)  для визуализации графов.

В первую очередь запустим спарк сессию и импортируем необходимые модули:

from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession

conf = SparkConf()
conf.setMaster("local").setAppName("Graph")
sc = SparkContext.getOrCreate(conf)
spark = SparkSession(sc)

from pyspark.sql.types import StructType, StringType, FloatType
import pyspark.sql.functions as f
from pyspark.sql import Window
from graphviz import Digraph
import pandas as pd

Далее загрузим данные для анализа. Для корректной загрузки опишем схему данных с помощью модуля types, в котором с помощью метода StructType укажем структуру полей (имя столбца, тип данных в столбце):

schema = StructType([
        StructField('Payer', StringType()),
        StructField('Recipient', StringType()),
        StructField('Amount', FloatType()),
        StructField('Date', StringType()),
                    ])

Загружаем данные с помощью модуля  spark.read.csv, в котором указываем имя файла, схему и вид разделителя столбцов. В данном случае данные будут представлены в виде объекта DataFrame:

df = spark.read.csv('data.csv',schema,';')

Для работы со столбцами используется модуль

pyspark.sql.functions

в котором реализованы различные методы для преобразования данных. С помощью метода to_timestamp преобразуем строковый столбец ‘Date’ для  корректного представления временной метки:

df = df.withColumn('Date', f.to_timestamp(f.col('Date'), format="yyyy-MM-dd HH:mm:ss"))

Для корректного парсинга временной метки необходимо указывать параметр ‘format’, в котором указывается последовательность даты и времени, а также разделители между ними. Преобразование осуществляется с помощью метода withColumn объекта DataFrame, который первым параметром принимает имя для преобразованного столбца. В данном примере указываем идентичное имя, таким образом, перезаписывается существующий столбец ‘Date’, иначе, создается новый столбец,  с преобразованными данными. Вторым параметром метода withColumn является выражение, т.е. действие над столбцом, в данном примере это to_timestamp. В результате получаем таблицу следующего вида:

Приступим к подсчету статистик. Первым делом необходимо найти разность времени между транзакциями для каждой пары отправитель-получатель. Для этого создадим новый столбец  даты со сдвигом на одну временную метку вниз. Данную операцию необходимо проводить отдельно для каждого отправителя – получателя, в этом нам поможет метод Window:

w = Window.partitionBy(["Payer", "Recipient"]).orderBy("Date")
lag_expr = f.lag(df["Date"]).over(w)
df = df.withColumn("prev_time", lag_expr)

Метод Window позволяет проводить обработку данных относительно заданного окна данных, в данном примере этим окном является группировка данных по столбцам  «Payer» и «Recipient», с последующей сортировкой по столбцу «Date». Далее зададим выражение lag_expr , которое будет применяться для каждого окна, после чего создаем новый столбец «prev_time», с заданным выражением:

Null значения в столбце «prev_time» говорят о том, что данная транзакция для пары отправитель-получатель является первой или единственной. Далее найдем разность между столбцами «Date» и «prev_time»:

ddiff_expr = f.datediff(f.col("Date"), f.col("prev_time"))
w_o_expr = f.when(f.isnull(ddiff_expr), 0).otherwise(ddiff_expr)
df = df.withColumn("diff_in_day", w_o_expr)

Выражение ddiff_expr находит разность между двумя датами в днях. Выражение w_o_expr заменяет null значения на 0. Осталось посчитать необходимую статистику (минимальное, среднее, медианное или максимальное время) между переводами для каждой пары отправитель-получатель. Можно воспользоваться готовым выражением

f.min(),f.avg(), f.max()

Но для расчета медианного значения необходимо написать выражение в виде sql-запроса:

magic_percentile = f.expr('percentile_approx(diff_in_day, 0.5)').alias('med_in_day')
med_val =  df.groupby(["Payer", "Recipient"]).agg(magic_percentile)

В методе expr вызываем ‘percentile_approx’ запрос, который рассчитывает заданный перцентиль, в нашем случае 0.5, что соответствует медиане. Таким образом, получаем таблицу med_val с медианным временем между переводами для каждой пары отправитель-получатель, где 0 говорит о том, что медианное время менее 1 дня:

Далее рассчитаем количество и сумму переводов для каждой пары отправитель-получатель:

count_tr_expr = f.count("Amount").alias("freq")
sum_tr_expr = f.round(f.sum("Amount"),0).alias("summa")
df_t = df.groupby(["Payer", "Recipient"]).agg(count_tr_expr, sum_tr_expr)

Также рассчитаем процент суммы перевода получателю от общей суммы, которую отправитель перевел за всю историю данных:

ttl_sum_expr = f.sum("summa").alias("payer_total_sum")
df_total_sum = df_t.groupby("Payer").agg(ttl_sum_expr)
df_t = df_t.join(df_total_sum, on="Payer")
pcnt_ttl_sum_expr = f.round((f.col("summa")/f.col("payer_total_sum"))*100, 2)
df_t = df_t.withColumn("%_of_total_sum", pcnt_ttl_sum_expr)

Код, описанный выше, реализован в модуле. Для предобработки данных нужно лишь вызвать класс  Graph_miner:

init_graph = Graph_miner(df, "Payer", "Recipient", "Amount", "Date")

В методе указываем DataFrame, в котором уже преобразовали столбец с датой, а также имена столбцов с отправителем, получателем, суммой перевода и датой.

После вызова

init_graph. result. head (8)

получим следующий результат:

Осталось отобразить полученный результат. Также в модуле реализован класс Painter, с помощью которого отобразим наши данные:

paint = Painter(init_graph)
paint.draw(filename = "start_graph")

Результатом выполнения будет pdf-файл, в котором отображены все обработанные данные. Далее можно детализировать выборку и убрать шумовые данные.  Например, убрать те вершины, которые имеют только одного отправителя, сделать это можно с помощью параметра r_count_tresh  в методе  filtering_df:

paint.filtering_df(r_count_tresh=1)
paint.draw(filename = "r_count_tresh_1")

Картина стала проще. Далее отфильтруем те ребра, в которых общая сумма переводов меньше 3000:

paint.filtering_df(sum_tresh = 3000, r_count_tresh=1)
paint.draw(filename = "sum_tresh_3000_r_count_tresh_1")

В результирующем графе остались наиболее весомые ребра, которые можно рассмотреть детальнее. Информация на ребрах приобрела читабельный вид, рассмотрим подробнее. Для примера возьмем ребро

account_155àaccount_7 (f-5 d-3 s-11700 p-18.28)

Граф является ориентированным. В данном случае отправитель — account_155, получатель — account_7. Всего было осуществлено 5 переводов (f-5). Медианное время между переводами 3 дня (d-3). Общая сумма переводов за 5 раз составляет 11700 рублей (s-11700).  11700 составляет 18.28 % от общей суммы, которую отправил account_155 всем получателям (p-18.28).

Напоследок отрисуем отдельный связный граф, в который входит конкретная вершина, например account_96:

paint.filtering_df(sum_tresh = 3000, r_count_tresh=1, acc_name="account_96")
paint.draw(filename = "sum_tresh_3000_r_count_tresh_1_name_account_96")

В отдельный файл сохранился граф, в котором все вершины имеют связь с вершиной account_96.

Таким образом, провели анализ данных на предмет выявления закономерностей движения средств между счетами. Полный пример с кодом можно найти в тетрадке.

Советуем почитать