Время прочтения: 5 мин.
Главная сложность проведения анализа качества услуг на устройствах самообслуживания заключалась в необходимости обработать огромный массив данных (более 1 терабайта). Для выполнения поставленной задачи мы написали инструмент на основе языка программирования Python, который помог нам провести обработку данных в короткие сроки (чуть более 3-х дней) и выявить причины возникновения недовольства клиентов, об этом инструменте мы и расскажем в данной статье.
Под анализом Big Data понимается анализ больших массивов данных в рамках возможностей ПК и систем управления базами данных, поэтому при формировании информации возникают определенные трудности, которые заключаются в необходимости обеспечения скоординированной работы компьютерных программ на десятках серверов.
Наш анализ данных начинается с выгрузки с файлового сервера архива записей сессий устройств самообслуживания во временную папку ПК, в которой в первую очередь будет проведён отбор записей по определённой категории, а затем будет проанализирован пул отфильтрованных данных и результаты анализа будут сохранены в базе данных.
Для ускорения процесса анализа можно запускать процессы параллельно на различных ПК. Для того, чтобы не анализировать одну и ту же информацию дважды, стоит определиться с условиями, по которым каждый ПК будет запрашивать данные с файлового сервера, в нашем коде это временные интервалы.
Данную задачу можно решить разными языками программирования, но нами был выбран Python и дистрибутив Anaconda.
Для экономии времени берем готовые библиотеки, встроенные в дистрибутив Anaconda, помогающие решить общий тип задач, такие как работа с базами данных, архивами, логированием и т.д.
Импортируем библиотеки, которые понадобятся для анализа:
from datetime import datetime
from zipfile import ZipFile
from logging import getLogger, FileHandler, Formatter, INFO
from multiprocessing import Process, current_process
from sqlalchemy import Column, Integer, String
from config import engin, session, Base, time_f, lefty, righty
Для сбора статистики и упрощения структуры кода необходимо создать класс для хранения временных данных, которые будем постоянно использовать, на всех этапах работы инструмента. В поле success будет записываться количество сессий, проводимых клиентами с состоянием «Успешно» за анализируемый период, в поле errors – количество сессий с состоянием «Ошибка», в поле others – количество сессий, которые не завершились успешно и не закончились ошибкой – с состоянием «Другие».
class Status:
success = 0
errors = 0
others = 0
currentFile = 0
Создаем служебный класс, который необходим встроенному модулю sqlalchemy для загрузки в базу данных. Каждый экземпляр данного класса будет являться отдельной строкой в базе данных.
class DataContext(Base):
__tablename__ = "имя_таблицы"
file_name = Column("Имя файла", String, primary_key = True)
success = Column("Успешно", Integer)
fail = Column("Ошибка", Integer)
other = Column("Другие", Integer)
В папке файлового сервера, который является источником данных, ищем файлы с необходимой информацией, которые были загружены в нужный промежуток времени, и переносим данные во временную папку, для дальнейшей разархивации и анализа полученной информации.
def find_all_files_between_lr(xdir, left, right, temp_dir):
for xroot, xdirs, xfiles in walk(xdir):
files = [join(xroot, xname) for xname in xfiles if stat(join(xroot, xname)).st_mtime >= left and stat(join(xroot, xname)).st_mtime <= right]
for file in files:
Status.currentFile += 1
with ZipFile(file) as f:
f.extractall(temp_dir)
for prj in find_files(temp_dir):
… // функция разделения файла на конкретные сессии, зависит от внутренних особенностей файла
delete_all_content(temp_dir)
… //функция записи статистики в лог-файл
Во временной папке конкретного ПК, где находится необходимая информация, необходимо провести разделение всего файла на отдельные сессии, которые в дальнейшем будут отдельно обработаны.
Так как соединение с базой данной не всегда стабильно, в случае проблем с каждого ПК можно собрать Log-файлы, обработать и загрузить их самостоятельно. Поэтому данные, полученные при обработке файла, дублируются в Log-файл и базу данных, для удобства и быстроты дальнейшего анализа.
Главная функция, где создается Log-файл
def main(left, right):
left = mktime(datetime.strptime(left, time_f).timetuple())
right = mktime(datetime.strptime(right, time_f).timetuple())
uid = current_process().pid
temp_dir = f"prjs_{uid}/"
log_name = f"samo_{uid}.log"
log_fmt = '%(asctime)s - %(message)s'
Status.lg = getLogger(__name__)
Status.lg.setLevel(INFO)
fh = FileHandler(log_name)
fh.setFormatter(Formatter(log_fmt))
Status.lg.addHandler(fh)
Status.lg.info(f"обработка файлов c {lefty} по {righty} Номер файла|Имя файла|Успешно в этом файле|Ошибка в этом файле|Другие в этом файле|Всего успешно|Всего ошибок|Всего остальных")
sidr = r"путь_до_папки_где_ищем_файлы"
tbs = список_с_необходимыми_папками_в_sdir
for t in tbs:
Status.t = t
sidry = r"{}\{}".format(sidr, t)
find_all_files_between_lr(sidry, left, right, temp_dir)
Функция делит промежуток времени на равные фрагменты, для упрощения и быстроты обработки информации.
def time_splitter (left, right, c):
left = mktime(datetime.strptime(left, time_f).timetuple())
right = mktime(datetime.strptime(right, time_f).timetuple())
delta = right - left
slicy = delta / c
res = []
for i in range(c):
next_left = slicy + left + 1
res.append((left, next_left))
left = next_left
return res
Даем каждому процессу промежуток времени, с которым он будет работать и запускаем процессы, количество процессов выбираем исходя из количества потоков CPU.
if __name__ == "__main__":
times = time_splitter(lefty, righty, cores)
jobs = []
for part_time in times:
p = Process(target=main, args=part_time)
jobs.append(p)
for job in jobs:
job.start()
Особенность данного кода заключается в том, что на одном рабочем месте может параллельно вычисляться столько потоков данных, сколько позволяет процессорная мощность ПК. Помимо этого, Log-файлы централизованно загружаются в базу данных для удобной обработки.
Преимущество данного кода в том, что его можно использовать одновременно на нескольких ПК, при этом не изменяя логику обработки данных, а только изменив файл настройки инструмента, задавая крайние значения временного интервала lefty и righty для каждого ПК, на котором будет проводиться анализ. Изменив данные параметры, можно регулировать временной интервал требуемой выгрузки.
from sqlalchemy import create_engine, MetaData
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
SCHEMA_NAME = 'DBO'
engine = create_engine('postgresql://******:*****@1.1.1.1/database')
Base = declarative_base(metadata=MetaData(schema=SCHEMA_NAME))
Session = sessionmaker(bind=engin)
session = Session()
timeFormat = '%S:%M:%H_%d.%m.%Y'
lefty = '00:00:00_00.02.2020'
righty = '00:00:00_29.02.2020'
Сотрудники с использованием результата выполнения данного кода в кратчайшие сроки смогли определить, что клиенты часто не могут воспользоваться услугами на устройствах самообслуживания из-за технических проблем, которые не были вовремя устранены ответственными подразделениями.
В дальнейшем данный код может использоваться ИТ-специалистами для написания инструментов обработки Big Data.