Время прочтения: 4 мин.
Инструменты process mining набрали большую популярность как в среде разработчиков, так и в бизнес-среде. При анализе бизнес-процесса можно воспользоваться готовыми инструментами, такими как Disco, ProM, однако их функционала не всегда хватает для полноценной и разносторонней аналитики, например, для поиска узких мест в процессе. В такой ситуации на помощь аналитикам приходит язык программирования python и библиотека pm4py.
В библиотеке pm4py не предусмотрено инструментов для поиска узких мест в логах, однако она нам понадобится для импортирования логов процесса, который мы хотим проанализировать.
Файлы логов обычно представлены в трех форматах: «.xes», «.csv», «.log». Для повышения универсальности нашего инструмента предусмотрим возможность работы с любым из широко применяемых форматов.
Поиск узких мест в логе будет осуществлять внутри DataFrame – табличного формата данных. Чтобы преобразовывать данные из формата, который не предполагает прямой импорт в DataFrame, сделаем класс-обработчик входных данных:
from pm4py.algo.discovery.dfg import algorithm as dfg_fac
from pm4py.objects.conversion.log import converter as log_converter
from pm4py.objects.log.util import dataframe_utils
import pandas as pd
import numpy as np
class Transformer:
def __init__(self, path_to_file):
self.path = path_to_file
self.log = None
def get_from_xes(self):
self.log = xes_importer.apply(self.path)
self.dataframe = log_converter.apply(self.log, variant=log_converter.Variants.TO_DATA_FRAME)
return self.dataframe
def get_from_csv(self):
self.dataframe = pd.read_csv(self.path)
self.dataframe = dataframe_utils.convert_timestamp_columns_in_df(self.dataframe)
self.log = log_converter.apply(self.dataframe)
return self.dataframe
def get_PD_from_log(self):
df = pd.read_fwf(self.path)
cols = df.columns.tolist()
cols = cols[-1:] + cols[:-1]
self.dataframe = df[cols]
self.log = log_converter.apply(self.log, variant=log_converter.Variants.TO_EVENT_LOG)
return self.dataframe
Теперь мы можем загружать в нашу программу файлы всех известных расширений для файлов логов процессов.
При поиске узких мест в процессах существует несколько подходов. Одним из них является следующий:
1) Выделяем в нашем процессе самые продолжительные экземпляры (кол-во таких процессов выбирается произвольно, обычно это 5-10% от общего числа кейсов).
2) Определяем переходы, средние длительности которых занимают больше всего времени
Для того, чтобы реализовать данный подход, необходимо создать пары id кейса — продолжительность кейса.
t = Transformer('/content/drive/MyDrive/tmp_df.csv')
t.get_from_csv()
cases = np.unique(t.dataframe['case:concept:name'])
avarage_duraion = []
for case in cases:
tmp_df = t.dataframe[t.dataframe['case:concept:name']==case]
avarage_duraion.append((case, max(tmp_df['time:timestamp']-min(tmp_df['time:timestamp']))))
avarage_duraion = sorted(avarage_duraion, key=lambda tup: tup[1], reverse=True)
bottle_cases = avarage_duraion[:round(len(avarage_duraion)/10)]
После того, как мы отфильтровали кейсы по продолжительности, необходимо посчитать медианные длительности для всех переходов в отобранных кейсах. Для этого отфильтруем наш исходный DataFrame.
cases = list(map(lambda x: x[0], bottle_cases))
tmp_df = t.dataframe[t.dataframe['case:concept:name'].isin(cases)]
tmp_df.index = [i for i in range(tmp_df.shape[0])]
Затем создадим словарь, ключами для которого будут переходы, а значениями суммарное время всех экземпляров таких переходов. Потом это время заменится на среднее время перехода:
duration_dict = dict()
for i in range(tmp_df.shape[0]-1):
edge = (tmp_df['concept:name'][i], tmp_df['concept:name'][i+1])
duration = tmp_df['time:timestamp'][i+1] - tmp_df['time:timestamp'][i]
if edge not in duration_dict:
duration_dict[edge] = [duration]
else:
duration_dict[edge].append(duration)
for key in duration_dict.keys():
duration_dict[key] = np.mean(duration_dict[key])
Теперь можем выбрать наиболее продолжительные переходы:
edge_tuple = []
for key in duration_dict.keys():
edge_tuple.append((key, duration_dict[key]))
edge_tuple = sorted(edge_tuple, key=lambda tup: tup[1], reverse=True)
edge_tuple[:round(len(edge_tuple)/10)]
Вот что получилось в итоге:

Второй способ заключается в оценке медианного времени, которое занимают переходы с учетом их количества, совершенных по данной ветке. Для реализации этого подхода создадим словарь, где ключ — это переход, а значением является массив времени переходов.
duration_dict = dict()
for i in range(t.dataframe.shape[0]-1):
edge = (t.dataframe['concept:name'][i], t.dataframe['concept:name'][i+1])
duration = t.dataframe['time:timestamp'][i+1] - t.dataframe['time:timestamp'][i]
if edge not in duration_dict:
duration_dict[edge] = [duration]
else:
duration_dict[edge].append(duration)
for key in duration_dict.keys():
duration_dict[key] = (np.mean(duration_dict[key]).days*24*3600+np.mean(duration_dict[key]).seconds)*len(duration_dict[key])
edge_tuple = []
for key in duration_dict.keys():
edge_tuple.append((key, duration_dict[key]))
edge_tuple = sorted(edge_tuple, key=lambda tup: tup[1], reverse=True)
edge_tuple[:round(len(edge_tuple)/10)]
В результате получим вот такой список

Он отличается от списка, который получился в первом случае, но тем не менее есть пересечения, на основе которых можно сделать выводы.
В итоге мы получили два различных инструмента для поиска узких мест в процессах. Инструменты являются гибкими и настраиваемыми, что позволяет применять их с различными логами процессов.
Еще больше информации о Process Mining можно найти и обсудить в канале #process_mining сообщества ODS.AI в slack