Python, Анализ данных

Анализ Терабайта

Время прочтения: 5 мин.

Главная сложность проведения анализа качества услуг на устройствах самообслуживания заключалась в необходимости обработать огромный массив данных (более 1 терабайта). Для выполнения поставленной задачи мы написали инструмент на основе языка программирования Python, который помог нам провести обработку данных в короткие сроки (чуть более 3-х дней) и выявить причины возникновения недовольства клиентов, об этом инструменте мы и расскажем в данной статье.

Под анализом Big Data понимается анализ больших массивов данных в рамках возможностей ПК и систем управления базами данных, поэтому при формировании информации возникают определенные трудности, которые заключаются в необходимости обеспечения скоординированной работы компьютерных программ на десятках серверов.

Наш анализ данных начинается с выгрузки с файлового сервера архива записей сессий устройств самообслуживания во временную папку ПК, в которой в первую очередь будет проведён отбор записей по определённой категории, а затем будет проанализирован пул отфильтрованных данных и результаты анализа будут сохранены в базе данных.

Для ускорения процесса анализа можно запускать процессы параллельно на различных ПК. Для того, чтобы не анализировать одну и ту же информацию дважды, стоит определиться с условиями, по которым каждый ПК будет запрашивать данные с файлового сервера, в нашем коде это временные интервалы.

Данную задачу можно решить разными языками программирования, но нами был выбран Python и дистрибутив Anaconda.

Для экономии времени берем готовые библиотеки, встроенные в дистрибутив Anaconda, помогающие решить общий тип задач, такие как работа с базами данных, архивами, логированием и т.д.

Импортируем библиотеки, которые понадобятся для анализа:

from datetime import datetime
from zipfile import ZipFile
from logging import getLogger, FileHandler, Formatter, INFO
from multiprocessing import Process, current_process
from sqlalchemy import Column, Integer, String
from config import engin, session, Base, time_f, lefty, righty

Для сбора статистики и упрощения структуры кода необходимо создать класс для хранения временных данных, которые будем постоянно использовать, на всех этапах работы инструмента. В поле success будет записываться количество сессий, проводимых клиентами с состоянием «Успешно» за анализируемый период, в поле errors – количество сессий с состоянием «Ошибка», в поле others – количество сессий, которые не завершились успешно и не закончились ошибкой – с состоянием «Другие».

class Status:
    success = 0
    errors = 0 
    others = 0
    currentFile = 0

Создаем служебный класс, который необходим встроенному модулю sqlalchemy для загрузки в базу данных. Каждый экземпляр данного класса будет являться отдельной строкой в базе данных.

class DataContext(Base):
    __tablename__ = "имя_таблицы"
    file_name = Column("Имя файла", String, primary_key = True)
    success = Column("Успешно", Integer)
    fail = Column("Ошибка", Integer)
    other = Column("Другие", Integer)

В папке файлового сервера, который является источником данных, ищем файлы с необходимой информацией, которые были загружены в нужный промежуток времени, и переносим данные во временную папку, для дальнейшей разархивации и анализа полученной информации.

def find_all_files_between_lr(xdir, left, right, temp_dir):
    for xroot, xdirs, xfiles in walk(xdir):
        files = [join(xroot, xname) for xname in xfiles if stat(join(xroot, xname)).st_mtime >= left and stat(join(xroot, xname)).st_mtime <= right]
        for file in files:
            Status.currentFile += 1
            with ZipFile(file) as f:
                f.extractall(temp_dir)
            for prj in find_files(temp_dir):
			… // функция разделения файла на конкретные сессии, зависит от внутренних особенностей файла
            delete_all_content(temp_dir)
 … //функция записи статистики в лог-файл

Во временной папке конкретного ПК, где находится необходимая информация, необходимо провести разделение всего файла на отдельные сессии, которые в дальнейшем будут отдельно обработаны.

Так как соединение с базой данной не всегда стабильно, в случае проблем с каждого ПК можно собрать Log-файлы, обработать и загрузить их самостоятельно. Поэтому данные, полученные при обработке файла, дублируются в Log-файл и базу данных, для удобства и быстроты дальнейшего анализа.

Главная функция, где создается Log-файл

def main(left, right):
    left = mktime(datetime.strptime(left, time_f).timetuple())
    right = mktime(datetime.strptime(right, time_f).timetuple())
    uid = current_process().pid
    temp_dir = f"prjs_{uid}/"
    log_name = f"samo_{uid}.log"
    
    log_fmt = '%(asctime)s - %(message)s'
    Status.lg = getLogger(__name__)
    Status.lg.setLevel(INFO)
    fh = FileHandler(log_name)
    fh.setFormatter(Formatter(log_fmt))
    Status.lg.addHandler(fh)
    Status.lg.info(f"обработка файлов c {lefty} по {righty} Номер файла|Имя файла|Успешно в этом файле|Ошибка в этом файле|Другие в этом файле|Всего успешно|Всего ошибок|Всего остальных")
    sidr = r"путь_до_папки_где_ищем_файлы"
    tbs = список_с_необходимыми_папками_в_sdir
    for t in tbs:
        Status.t = t
        sidry = r"{}\{}".format(sidr, t)
        find_all_files_between_lr(sidry, left, right, temp_dir)

Функция делит промежуток времени на равные фрагменты, для упрощения и быстроты обработки информации.

def time_splitter (left, right, c):
    left = mktime(datetime.strptime(left, time_f).timetuple())
    right = mktime(datetime.strptime(right, time_f).timetuple())
    delta = right - left
    slicy = delta / c
    res = []
    for i in range(c):
        next_left = slicy + left + 1
        res.append((left, next_left))
        left = next_left
    return res

Даем каждому процессу промежуток времени, с которым он будет работать и запускаем процессы, количество процессов выбираем исходя из количества потоков CPU.

if __name__ == "__main__":
    times = time_splitter(lefty, righty, cores)
    jobs = []
    for part_time in times:
        p = Process(target=main, args=part_time)
        jobs.append(p)
    
    for job in jobs:
        job.start()

Особенность данного кода заключается в том, что на одном рабочем месте может параллельно вычисляться столько потоков данных, сколько позволяет процессорная мощность ПК. Помимо этого, Log-файлы централизованно загружаются в базу данных для удобной обработки.

Преимущество данного кода в том, что его можно использовать одновременно на нескольких ПК, при этом не изменяя логику обработки данных, а только изменив файл настройки инструмента, задавая крайние значения временного интервала lefty и righty для каждого ПК, на котором будет проводиться анализ. Изменив данные параметры, можно регулировать временной интервал требуемой выгрузки.

from sqlalchemy import create_engine, MetaData
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
SCHEMA_NAME = 'DBO'
engine = create_engine('postgresql://******:*****@1.1.1.1/database')
Base = declarative_base(metadata=MetaData(schema=SCHEMA_NAME))
Session = sessionmaker(bind=engin)
session = Session()
timeFormat = '%S:%M:%H_%d.%m.%Y'
lefty = '00:00:00_00.02.2020'
righty = '00:00:00_29.02.2020'

Сотрудники с использованием результата выполнения данного кода в кратчайшие сроки смогли определить, что клиенты часто не могут воспользоваться услугами на устройствах самообслуживания из-за технических проблем, которые не были вовремя устранены ответственными подразделениями.

В дальнейшем данный код может использоваться ИТ-специалистами для написания инструментов обработки Big Data.

Советуем почитать