Время прочтения: 11 мин.

В процессе работы над двумя разными проектами перед нами дважды встала одна и та же прикладная задача: рассчитать время простоев. В обоих случаях надо было провести расчет с учетом недельного графика работы. Мы начинали решать задачу с создания локального сценарного решения. Но в какой-то момент стало понятно, что задача расчета требует большого количества усилий и учёта множества нюансов. Тогда мы и задумались: а можно ли сделать что-то изящное (в сравнении с базовыми решениями), что будет применимо к любому (в пределах разумного) графику и что можно будет потом использовать в других проектах? (спойлер: да).

  1. Постановка задачи, частные решения и их проблемы

Оценка потерь является важной прикладной задачей. Один из базовых способов этой оценки выглядит следующим образом. Пусть у нас есть актив, который приносит n рублей в день. Если наш актив выбыл на неделю, потери составят 7n. Тут все просто. Теперь усложним: пусть актив приносит доход только часть суток и не всю неделю, а только в определенные дни; пусть он выбыл на период с А по В и нам необходимы точные расчёты. Получается задача, которая в общем виде выглядит так: сколько рабочего времени прошло между временными отсечками A и B, если рабочие часы заданы графиком X?

Рассмотрим возможные сценарии. Пусть S – это начало рабочего дня, F – его окончание, а точки А и В лежат внутри рабочего дня. Тогда наш алгоритм должен быть составлен с учетом 6 возможных сценариев.

Если точки A и B принадлежат разным дням, то в алгоритм добавляется ещё какое-то количество сценариев.

Когда у нас в рабочем графике есть только точки S и F, а потери времени прекращаются в пределах одних суток, то задача решается просто – потерю времени находят как объект timedelta. Пример такого решения описан в посте на Habr.com «Нахождение длительности временных интервалов в Python». Однако этот алгоритм значительно усложняется, когда отсечек на временной шкале становится много (в пределах одного дня или при потерях времени длительностью более одного дня), а условия включения или исключения интервалов нельзя описать простым правилом.

В этом случае задача базово решается перебором/проверкой всех возможных сценариев с использованием оператора ветвления if-elif-else. В чем тут проблема? Такое решение очень неустойчиво к модификациям исходных условий. Например, если мы добавляем обеды, то длина нашего алгоритма увеличивается примерно в 2 раза. А есть ещё круглосуточная работа и ночная работа. Итого, мы должны описать с полсотни сценариев. Часть из них можно объединить, но такое объединение сильно усложнит алгоритм и сделает код нечитабельным. В общем, не будем останавливаться на доказательстве тезиса «почему 20+ elif это плохо».

Есть решения, претендующие на большую универсальность. Habr-пост из смежной области «SQL: задача о рабочем времени» как раз об этом. Метод предполагает подсчет общего количества дней за вычетом выходных и праздничных дней, а затем умножение этого числа на количество рабочих часов. Однако этот блок решений тоже не универсален. Если мы посмотрим внимательно, то увидим, что в код заложены кастомные данные по режиму работы. Следовательно, любые изменения входных данных потребуют доработки кода.

2. Описание алгоритма

    Нам нужен алгоритм, которому все равно, с какой сложностью описан график и какой длины отрезок АB, а значит он не должен оперировать сценариями. Как он может выглядеть?

    Представим наши потери времени следующим образом:

    • отложим на прямой точки A и B, пометив первую точку белым флагом (старт потерь), а вторую – флагом с шахматным принтом (финиш потерь);
    • также отложим на прямой отсечки согласно нашему графику работы, пометив их флагами зелёного цвета (начало работы) или красного цвета (окончание работы).

    Разберем на примере.

    • точка А: 01.03.2023 11:00;
    • точка B: 02.03.2023 13:30;
    • график X: пн-пт с 10:00 до 19:00, обед с 13:00 до 14:00.

    Тут мы используем простую арифметику: искомая величина потерь времени – это сумма разниц между зелёными и красными флажками в пределах белого и шахматного флагов. А значит, все, что нужно, это собрать воедино все временные отсечки и просуммировать интервалы по ним, как показано выше.

    Итак, что нужно сделать:

    Приведем сразу варианты представления исходных данных. Это датафрейм df, загружаемый из csv/excel-файла.

    Разберём пошагово наш алгоритм.

    Шаг 1:

    Представим наш график как список словарей по временным отсечкам для каждого дня недели в следующем виде:

    [{time: время, flag: цвет флага, start_finish: ‘’}]
    
    
    • время согласно графику работы: часы, минуты;
    • цвет флага: зелёный – после него время нужно учитывать, красный – после него время учитывать не нужно;
    • ‘’ – этому элементу позже будет присвоено значение «старт» или «финиш», в зависимости от того точка А или точка В расположена левее данной временной отсечки.

    Для удобства восприятия покажем только values, но подразумеваем, что keys и фигурные скобки тоже присутствуют. В нашем случае получится следующий результат:

    Понедельник: 

    [{time: 10:00, flag: зелёный, start_finish: ‘’},
    {time: 19:00, flag: красный, start_finish: ‘’}, 
    {time: 13:00, flag: красный, start_finish: ‘’},
    {time: 14:00, flag: зелёный, start_finish: ‘’}]
    

    И так для каждого дня.

    Таким образом, на этом этапе нужно составить словарь, включающий режим работы по дням недели с учетом обеденного перерыва. Мы сознательно не приводим код для этой части работы, поскольку, по сути, это подготовка данных, а не сам алгоритм расчета потерь времени. Эта часть зависит от формата представления исходных данных.

    Шаг 2:

    Схожим образом представим точки A и B:

    [{time: дата + время, flag: ‘’, start_finish: старт/финиш}]
    • дата + время: год, месяц, день, часы, минуты;
    • ‘’ – на место этого элемента в дальнейшем будет подставлен красный или зелёный флаг, (по цвету флага ближайшего соседа слева на временной шкале);
    • старт/финиш: старт – это точка А, финиш – В.
    A:	{time: 01.03.2023 11:00, flag: ‘’, start_finish: ‘старт’}
    B: 	{time: 02.03.2023 11:00, flag: ‘’, start_finish: ‘финиш’}
    

    Шаг 3:

    Определим все дни между точками A и B включительно и подтянем из графика работы по этим дням все флаги, а также сами точки А и В. Данные из нашего примера превратятся в список. Теперь ключевой момент, на котором, собственно, и строится весь алгоритм: отсортируем список по возрастанию даты и времени. Это важно, т.к. позволит нам идти в цикле от отсечки к отсечке, проверяя, нужно ли включать следующий за ней отрезок в расчет.

    [{time: 01.03.2023 10:00, flag: ‘зелёный’, start_finish: ‘’}
    {time: 01.03.2023 11:00, flag: ‘’, start_finish: ‘старт’}
    {time: 01.03.2023 13:00, flag: ‘красный’, start_finish: ‘’}
    {time: 01.03.2023 14:00, flag: ‘зелёный’, start_finish: ‘’}
    {time: 01.03.2023 19:00, flag: ‘красный’, start_finish: ‘’}
    {time: 02.03.2023 10:00, flag: ‘зелёный’, start_finish: ‘’}
    {time: 02.03.2023 13:00, flag: ‘красный’, start_finish: ‘’}
    {time: 02.03.2023 13:30, flag: ‘’, start_finish: ‘финиш’}]
    

    Код для шагов 2 и 3:

    # функция сбора и сортировки флагов
    
    def collect_downtime (row):
        start = pd.to_datetime(row['Point_A'], format = '%d.%m.%Y %H:%M:%S')
        finish = pd.to_datetime(row['Point_B'], format = '%d.%m.%Y %H:%M:%S')
        schedule = row['schedule']
        
    #список всех дней между точками А и В
        day_list = pd.date_range(start.date(), finish.date()).tolist()
    # список, куда мы сложим все флаги
        target_list_full = []
        
    # обрабатываем каждый день из списка
        for current_day in day_list:
            target_list_day = [] # переменная для накопления флагов внутри дня
            day_begin = current_day.replace(hour=0, minute = 0, second=0, microsecond=0) #начало дня
            day_of_week = current_day.dayofweek # день недели
            day_schedule = schedule[day_of_week] # график дня недели
            
    #обрабатываем каждую временную отсечку внутри дня. Здесь мы складываем время начала дня с timedelta из графика
            for element in day_schedule:
                try:
                    target_element = element
                    target_element['time'] += day_begin
                    target_list_day.append(target_element)
                except:
                    pass
            target_list_full += target_list_day
    
    # добавим в список начало и конец инцидента, началу сразу присвоим флаг False
        target_begin = {'time': start,
                        'flag': False,
                        'start_finish': 'Start'}
        
        target_end = {'time': finish,
                      'flag': '',
                      'start_finish': 'Finish'}
        
        target_list_full.append(target_begin)
        target_list_full.append(target_end)
    
    # ключевой момент: сортируем список флагов по дате
        target_list_full = sorted(target_list_full, key = lambda x: x['time'])
        
        result_dict = {'target_list' : target_list_full,
                       'begin_index' : target_list_full.index(target_begin),
                       'end_index' : target_list_full.index(target_end)}
        return result_dict
    
    # применяем функцию collect_downtime к нашему датафрейму
    df['result_dict'] = df.apply(collect_downtime, axis = 1)
    

    Шаг 4:

    Необходимо корректно идентифицировать цвет точки старта. Увы, это тот случай, когда все же придется использовать ветвление для проверки цвета флага. Если стартовый флаг первый в списке, то смотрим на ближайший нефинишный флаг справа на временной шкале. Если стартовый флаг не первый в списке, то ориентироваться нужно на ближайший флаг слева.

    В нашем примере стартовый флаг следует за зелёным, значит и сам приобретает этот цвет

    {time: 01.03.2023 11:00, flag: ‘’, start_finish: ‘старт’}
    	= >	{time: 01.03.2023 11:00, flag: ‘зелёный’, start_finish: ‘старт’}
    

    Шаг 5:

    Пройдем в цикле по всему списку от стартового флага до финишного (не включая его) и, если цвет флага зелёный, рассчитаем время до следующего флага:

    {time: 01.03.2023 10:00, flag: ‘зелёный’, start_finish: ‘’}
    {time: 01.03.2023 11:00, flag: ‘зелёный’, start_finish: ‘старт’} 	2 ч.
    {time: 01.03.2023 13:00, flag: ‘красный’, start_finish: ‘’}		-
    {time: 01.03.2023 14:00, flag: ‘зелёный’, start_finish: ‘’} 		5 ч.
    {time: 01.03.2023 19:00, flag: ‘красный’, start_finish: ‘’} 		-
    {time: 02.03.2023 10:00, flag: ‘зелёный’, start_finish: ‘’} 		3 ч.
    {time: 02.03.2023 13:00, flag: ‘красный’, start_finish: ‘’} 		-
    {time: 02.03.2023 13:30, flag: ‘’, start_finish: ‘финиш’} 		
    

    Итого искомое время 2+5+3 = 10 часов.

    Код для шагов 4 и 5:

    # функция расчета рабочего времени. На вход – словарь, результат функции collect_downtime
    def calculate_downtime (row):
        target_list = row['result_dict'].get('target_list')
        begin_index = row['result_dict'].get('begin_index')
        end_index = row['result_dict'].get('end_index')
        time_sum = dt.timedelta(0) # переменная для накопления времени
         
        #надо проставить метки True/False для начала и конца инцидента
        # если точка А не первая в списке, тогда ориентируемся на предыдущую отсечку:
        if begin_index != 0:
            target_list[begin_index]['flag'] = target_list[0]['flag']
        # точка А первая в списке, точка B не вторая:    
        elif (end_index != 1):
            target_list[begin_index]['flag'] = not (target_list[1]['flag'])
        # точка А первая в списке, точка B вторая:
        else:
            target_list[begin_index]['flag'] = not (target_list[2]['flag'])
    
        # теперь у всех отсечек, кроме финишной, есть признак True/False. Пройдем последовательно по всем отсечкам
        for i in range(end_index - begin_index):
            time_period = dt.timedelta(0) # переменная для накопления времени простоя
            if target_list[i+ begin_index]['flag'] == True:
                time_period = target_list[i+1+begin_index]['time'] - target_list[i+begin_index]['time']
            time_sum += time_period 
        
        return time_sum.total_seconds() / 60 #сразу преобразуем в минуты
    
    # применяем функцию calculate_downtime к нашему датафрейму
    df['calculate_downtime'] = df.apply(calculate_downtime, axis = 1)
    
    

    3. Преимущества алгоритма

      • Нечувствительность к изменениям графика работы

      Это главное преимущество алгоритма. График работы может быть любым: произвольное число рабочих дней в неделю, ночная работа, круглосуточная работа, произвольно распределённые перерывы. По всем этим вариантам алгоритм отработает корректно.

      • Быстрота исполнения кода

      Для сравнения производительности предлагаемого алгоритма с традиционным решением на основе ветвления, провели тестирование на датасете из 100 000 строк. Результаты говорят сами за себя: затраты времени на выполнение кода составили 3283 секунды для традиционного алгоритма против 127 секунд для нашего алгоритма. То есть наш алгоритм на основе размеченного списка временных отсечек почти в 26 раз быстрее обычного кода на основе оператора ветвления if-elif-else.

      4. Ограничения

      Чтобы алгоритм отработал корректно, важно учесть и отработать ряд нюансов. Примем, что мы имеем дело с «чистыми» данными и что у нас нет:

      • Незакрытых периодов, когда интервал указан без начала или без конца: рабочий день с хх:хх до 18:00;
      • Перепутанных периодов, когда начало идет после конца: обед 13:00 до 12:00;
      • Интервалов, ошибочно указанных в других, неподходящих интервалах: обеды в нерабочее время.

      Тогда основные ограничения могут быть следующими.

      Необходимо внимательно обработать случаи, когда в графике используется время 24:00

      Мы реализуем алгоритм с помощью библиотеки datetime, а она просто не знает, что такое 24:00. Будем исходить из того, какая точность результата нам необходима. В нашем случае абсолютная точность не нужна, поэтому просто заменяем 24:00 на 23:59. Если этого недостаточно, можно заменить на 23:59:59.999 и т.д.

      Данная версия алгоритма ориентирована на графики, привязанные к дням недели

      Поскольку вариаций графиков может быть очень много, полностью универсальным решением будет только использование полного производственного календаря всего исследуемого периода с указанием индивидуального графика каждого дня. Алгоритм в этом случае даже упростится: чтобы узнать график конкретного дня, не нужно будет дополнительных операций. Но несколько усложнится задача подготовки данных. В данном посте исходим все же из того, что раскладка по дням недели полностью описывает график рабочего времени.

      5. Заключение

      В посте мы привели универсальный алгоритм расчета потерь рабочего времени. Дальше эти данные могут быть привязаны к деньгам, клиентам, любым ресурсам. Алгоритм реализован на языке Python с использованием всем хорошо знакомых библиотек pandas и datetime. С его помощью можно эффективно обрабатывать датасеты длиной в миллионы строк и получать не оценочный, а точный результат. В итоге мы решили создать свой алгоритм, который можно и брать как есть, и, поняв основную идею, достаточно легко воспроизвести с нуля для вашей задачи в привычных вам инструментах. Надеемся, что пост будет вам полезен. Будем рады комментариям и предложениям по улучшению кода.

      Ссылка на notebook: https://www.kaggle.com/code/elenakorsakova/downtime-calculation