Время прочтения: 4 мин.

В данной статье будет рассказано, как можно на Hadoop проанализировать и обработать лог, рассчитать граф и отрисовать его на локальном компьютере, используя Graphviz.

Для начала нужно загрузить данные на HDFS. Самый простой способ — копирование файлов через веб-интерфейс файлового менеджера в Hue. Веб-интерфейс интуитивно понятен и расположен по адресу http://[Hue_node]:8888/filebrowser/. Однако, если размер файла от нескольких гигабайт, то таким способом данные скорее всего могут не загрузиться. В таком случае можно рекомендовать использовать WinSCP.

Когда файлы будут загружены с помощью WinSCP, необходимо выполнить команду для их перемещения в HDFS:

hadoop fs -put /путь_куда_загружен_файл /путь_куда_переместить_файл

Если планируется загружать данные в таблицы и работать с HiveQL запросами, то каждый файл при перемещении на HDFS нужно размещать в отдельной директории.

Прежде, чем рассчитывать и отрисовывать граф, нужно проанализировать лог. По моему опыту удобнее проводить анализ лога в Hive. С помощью HiveQL запросов можно ознакомиться с данными: узнать какие столбцы есть в логе, найти и удалить технические события. Итоговый лог лучше загрузить в новую таблицу и затем сохранить в файл.

Примеры HiveQL-запросов:

create schema bdp location 'hdfs://pathdirectory';
use bdp; 
create external table if not exists bdp.logfull
(TRANSACTIONLOGNO bigint, USERNAME string, TRANSACTIONDATE timestamp, SESSIONNO bigint, BANKACCOUNTNO bigint, AUDITMESSAGE string, CUSTOMERNO bigint, APPLICATIONNO bigint, BANKPRODUCTNO bigint, MODBANKACCOUNTNO bigint, BANKACCOUNTROLENO bigint, AUTHENTICATIONNO bigint)
row format delimited
fields terminated by ';'
stored as textfile
location 'hdfs://pathdirectory'
tblproperties("serialization.encoding"="windows-1251", "skip.header.line.count"="1", "transactional"="true");
 select * from bdp.logfull limit 1000;
 select count(*) from bdp.logfull;
 create external table if not exists bdp.logfileclear
(TRANSACTIONLOGNO bigint, USERNAME string, TRANSACTIONDATE timestamp, AUDITMESSAGE string, CUSTOMERNO bigint, APPLICATIONNO bigint, BANKPRODUCTNO bigint)
insert into bdp.logfileclear
select TRANSACTIONLOGNO, USERNAME, TRANSACTIONDATE, AUDITMESSAGE, CUSTOMERNO, APPLICATIONNO, BANKPRODUCTNO from bdp.logfull
where (username!='' and username not like '%admin%' and username not like '%importAdmin%')
and (auditmessage!='');
 
insert overwright local directory '/pathdirectory' row format delimited fields terminated by ';' select * from bdp.logfileclear

Когда лог проанализирован и очищен, можно приступать к расчету графа. На данном этапе можно использовать PySpark. Код для подключения в JupyterHub к PySpark приведен ниже:

import os
import sys
spark_home='/opt/cloudera/parcels/SPARK2/lib/spark2'
os.environ['SPARK_HOME']=spark_home
os.environ['PYSPARK_DRIVER_PYTHON']='/opt/venvs/anaconda/bin/python'
os.environ['PYSPARK_PYTHON']='/opt/venv/anaconda/bin/python'
sys.path.insert(0, os.path.join (spark_home, 'python'))
sys.path.insert(0, os.path.join (spark_home, 'python/lib/py4j-0.10.7-src.zip'))
from pyspark import SparkContext,SparkConf, HiveContext
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql.types import * 
conf=SparkConf().setAppName('test').\
    setMaster("yarn-client").\
	set('spark.local.dir', 'sparktmp').\
	set('spark.executor.memory', '3g').\
    set('spark.driver.maxResultSize', '5g').\
    set('spark.executor.instances', '6').\
	set('spark.port.maxRetries', '150').\
	set('spark.executor.cores', '10').\
    set('spark.dynamicAllocation.enabled', 'false')
sc=SparkContext.getOrCreate(conf=conf)
sc

Чтобы узнать актуальные директории для подключения, нужно выполнить следующие команды:

!ls -la /opt/cloudera/parcels/
!ls -la /opt/cloudera/parcels/SPARK2/lib/spark2/python/lib 

Данные из файла загружаем в RDD.

RDD — основная концепция Spark

Ключ к пониманию Spark — это RDD: Resilient Distributed Dataset. По сути это надежная распределенная таблица (на самом деле RDD содержит произвольную коллекцию, но удобнее всего работать с кортежами, как в реляционной таблице). RDD может быть полностью виртуальной и просто знать, как она породилась, чтобы, например, в случае сбоя узла, восстановиться. А может быть и материализована — распределенно, в памяти или на диске (или в памяти с вытеснением на диск). Также, внутри, RDD разбита на партиции — это минимальный объем RDD, который будет обработан каждым рабочим узлом.

Перед расчетом графа нужно определить какие столбцы являются time:timestamp, case:concept:name, case:concept,  с помощью PySpark дату нужно преобразовать к необходимому формату YYYY-MM-DD HH:MI:SS, отсортировать лог по времени. Затем лог передать в PM4PY и произвести расчет с помощью выбранного майнера.

И в завершении статьи небольшой лайфхак 🙂 Предположим, на Hadoop отсутствует возможность установить Graphviz и отрисовать рассчитанный граф можно только на локальном компьютере. Есть простое решение! Рассчитанный граф сохраняется в файл pickle, скачивается на локальный компьютер и затем отрисовывается. Расчет графа и сохранение в pickle-файл:

from pm4py.visualization.heuristics_net import factory as hn_vis_factory
from pm4py.objects.heuristics_net.net import HeuristicsNet
import pickle
def save_object(obj, filename):
	with open(filename, 'wb') as input:
    	pickle.dump(obj, output, pickle.HIGHEST_PROTOCOL)
final_net=HeuristicsNet(result[1])
final_net.calculate()
save_object(final_net, 'result_pkl.pkl')
sc.stop() 

«Отрисовка» графа на компьютере:

 with open('result_pkl.pkl', 'rb') as input:
    	final_net=pickle.load(input)
 gviz=hn_vis_factory.apply(final_net)
hn_vis_factory.view(gviz)

Итак, если у Вас большой лог-файл и необходимо построить граф процесса, то вы не ограничены возможностями вашего компьютера и в обработке больших данных вам может помочь Hadoop.