Время прочтения: 5 мин.

Под системой алертов я подразумеваю написанную программу, которая самостоятельно и через определенные временные промежутки проверяет метрики на предмет аномалий.

Аномалией считается необычное поведение метрики, в результате которого значения последней отличаются от привычных.

Выявлять аномалии можно по-разному. В решении своей задачи я использовал межквартильный размах. Для него необязательно, чтобы данные были распределены нормально. Кроме того, для мониторинга сильных отклонений в режиме реального времени в целом стоит отдавать предпочтение статистическим методам. Они не такие ресурсозатратные как методы машинного обучения, вследствие чего работают быстрее.

Перед написанием скрипта определил список необходимых метрик: количество активных пользователей, просмотров, лайков и CTR. Сам же скрипт получился из нескольких блоков: списка метрик для мониторинга, коннектора к базе данных приложения, алгоритма мониторинга аномалий, функций для отправки алерта. Далее по пунктам.

1. На первом шаге мне понадобился BotFather. Это сервис для настройки и управления ботами в telegram.  При входе в диалог с BotFather в приложении появляется инструкция по использованию и основные команды. Так, через команду /newbot я создал нового бота, ввел логин (название бота) и его юзернейм, получил уникальный токен и id чата. После этого импортировал нужные библиотеки в jupyter notebook.

import telegram
import pandahouse # для доступа в базу
import pandas as pd # для работы с датафреймами
import seaborn as sns # для визуализации
import matplotlib.pyplot as plt # для визуализации
import io # для сохранения изображения в буфере, чтобы не засорять хранилище

from datetime import datetime, date, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.decorators import dag, task # декораторы
from airflow.operators.python import get_current_context

2. На втором шаге задал подключение к базе данных.

connection = {'host': 'https://your_address',
                      'database':'database_name',
                      'user':'your_name', 
                      'password':'your_password'
                     }

3. Установил дефолтные параметры, которые прокидываются в таски, а также задал интервал запуска DAG (Directed Acyclic Graph, направленный ациклический граф). Это объект, предоставляемый Airflow, который нужен для того, чтобы задать набор последовательных задач (тасков) для их запуска по определенному расписанию.

default_args = {
    'owner': 'name_of_the_owner',
    'depends_on_past': False, # зависимость от прошлых запусков
    'retries': 2, # количество попыток запуска Dag в случае ошибки
    'retry_delay': timedelta(minutes=5), # время между повторными запусками
    'start_date': datetime(2022, 7, 14), # дата и время начала выполнения Dag
}

# Интервал запуска DAG
schedule_interval = '*/15 * * * *' # каждые 15 минут

4. Далее я ввел функцию для детектирования аномалий. Именно здесь происходит проверка полученного значения. Будет ли последнее считаться отклонением или же соответствовать типичному поведению метрики.  Для этого я построил доверительный интервал, границы которого рассчитал с помощью квартилей. Так, само значение межквартильного размаха представляет собой разницу между третьим и первым квартилем (df[‘iqr’] = df[‘q75’] — df[‘q25’]). Верхняя граница – третий квартиль плюс межкартильный размах, нижняя граница – первый квартиль минус межквартильный размах. Если значение метрики выпадает за эти пределы, то оно считается аномальным. Стоит отметить, что такие метрики, как количество лайков и просмотров, активность пользователей зачастую отличаются особым поведением в течение суток. Например, поздней ночью их значения могут быть низкими, в то время как утром и вечером достигать своего пика. Чтобы сгладить этот эффект и избежать ложных срабатываний, я использовал скользящее среднее.

def check_anomaly(df, metric, a=4, n=5):
    # метод shift для сдвига значений на один период назад для защиты от ошибочных алертов
    # метод rolling для скользящего счета по всему интервалу
    df['q25'] = df[metric].shift(1).rolling(n).quantile(0.25) 
    df['q75'] = df[metric].shift(1).rolling(n).quantile(0.75)
    df['iqr'] = df['q75'] - df['q25'] # кладу в датафрейм значение межквартильного размаха
    df['up'] =  df['q75'] + a*df['iqr'] # кладу в датафрейм значение границ
    df['low'] =  df['q25'] - a*df['iqr']

    # сглаживаю границы графика аномалий и делаю, чтобы периоды просчитывались до конца графика
    df['up'] = df['up'].rolling(n, center=True, min_periods=1).mean()
    df['low'] = df['low'].rolling(n, center=True, min_periods=1).mean()

    # определяю условия для значений самой свежей (последней полученной) метрики (пятнадцатиминутки)
    if df[metric].iloc[-1] < df['low'].iloc[-1] or df[metric].iloc[-1] > df['up'].iloc[-1]:
        is_alert = 1
    else:
        is_alert = 0

    return is_alert, df

5. Наконец, составил сам DAG. В теле сформировал task и создал функцию для запуска системы алертов. Внутри функции задал параметры подключения к телеграмм-боту, написал запрос на SQL для получения значений метрик из базы данных и обозначил необходимую информацию, которая попадет в отчет с случае обнаружения аномалии (текст сообщения, значения метрик и необходимые графики). Скрипт ниже, из-за большого размера, представлен в сокращенном виде с сохранением логики и основной структуры. Полную версию вы можете посмотреть, перейдя по приложенной после статьи ссылке.

@dag(default_args=default_args, schedule_interval=schedule_interval, catchup=False)
def dag _alerts_system():
    # Формирую task для запуска системы алертов по ленте новостей 
    @task()
    def run_feed(chat=None):
        chat_id = chat or char_id вашего бота
        bot = telegram.Bot(token='your_token')
        # Обозначаю список метрик
        metrics_list = ['users_feed', 'views', 'likes', 'ctr']
        # Делаю нужный запрос
        query = ''''''
        data = pandahouse.read_clickhouse(query=query, connection=connection)
        # беру цикл чтобы применить межквартильный размах
        for metric in metrics_list:
            df = data[['ts', 'date', 'hm', metric]].copy()
            # к датафрейму применяю алгоритм в функции check_anomaly()
            is_alert, df = check_anomaly(df, metric)
            if is_alert == 1:
                # здесь формирую содержание отчета. Данный раздел у каждого будет свой. Мою полную версию сможете посмтреть по ссылке в конце статьи.
             #текст сообщения и значения метрик
                msg = ...
             #параметры графиков
                plot_object = io.BytesIO()
               
                bot.sendMessage(chat_id=chat_id, text=msg)
                bot.sendPhoto(chat_id=chat_id, photo=plot_object)
        return
    run_feed()
dag _alerts_system = dag _alerts_system()

В результате получил систему алертов, которая автоматически каждые 15 минут проверяет данные на предмет аномалий, и в случае их обнаружения посылает сводный отчет в телеграмм-чат. Рутинные дела автоматизированы. Есть время подумать над следующим улучшением. Буду рад, если мой опыт поможет и вам.

Прилагаю ссылку на код в своем репозитории на github.