Время прочтения: 6 мин.

В каждом из нас время от времени просыпается инженерный дух, который, не удовлетворяясь простым созерцанием какого-либо явления, жаждет если хотя бы не докопаться до сути, то по крайней мере копнуть чуть глубже, перейти на уровень ниже и оттуда изучить волшебство устройства, секрет работы и тайну движимой им силы. Иногда так случается, что этот самый дух оказывается очень голоден и он решительно требует ещё — либо следующего куска, либо другого лакомства. Парадокс заключается в том, что чем он голоднее, тем больший созидательный потенциал он содержит. И в этой статье я хочу предоставить немного пищи для размышления, рассмотрев механизм работы MapReduce.

Речь пойдёт о функциях работы со множествами. Это такие функции как объединение, пересечение, выборка элементов из множества по определённым условиям и другие.  Казалось бы, что тут сложного? Это же всё реализовано в реляционных базах данных. Но… у реляционной базы данных есть свои ограничения, в частности, она позволяет хранить достаточно ограниченный объём информации. Когда объём данных слишком велик для того, чтобы быть размещённым на одном сервере базы данных, данные хранятся распределённо, а многие инструменты, нативные для базы данных, приходится реализовывать самому в рамках тех законов, которые действуют в распределённой системе.

Развитие технологии BigData началось с парадигмы MapReduce — модели распределённых вычислений, разработанной компанией Google, над очень большими наборами данных, расположенных на компьютерных кластерах. Парадигма MapReduce взяла за основу две процедуры функционального программирования: map, применяющую функцию к каждому элементу списка, и reduce, объединяющая результаты работы map. Сама вычислительная модель состоит из трёх шагов:

1. Map — предварительная обработка входных данных, заключающаяся в том, что каждый рабочий узел применяет функцию map к локальным данным и записывает результат в формате «ключ-значение» во временное хранилище

2. Shuffle (перемешивание) — перераспределение данных на основе ключей таким образом, чтобы все данные одного ключа лежали на одном рабочем узле

3. Reduce (свёртка) — параллельная обработка рабочими узлами каждой группы данных по порядку следования ключей и «объединение» результатов в финальный итог

Попробуем функции работы со множествами написать на языке MapReduce. Например, выборка по определённому критерию реализуется с помощью одной MapReduce задачи, в которой есть только одна фаза Map. И то, что на SQL языке реляционных баз данных было бы написано так (из всех строк таблицы пользователей UserTable выбрать только те записи, в которых имя пользователя UserName соответствует значению “John”):

SELECT UserId, UserName, Age FROM UserTable
WHERE UserName = 'John'

трансформировалось бы в следующий Python код для Mapper (класса, выполняющего функцию map) в рамках Streaming-интерфейса (когда входные данные принимаются с stdin):

import sys

for line in sys.stdin:
    UserId, UserName, Age = line.strip().split("\t")
    if UserName == "John":
        print(line.strip())

Предполагается, что на вход подаётся строка со значениями UserId, UserName и Age, разделёнными символами табуляции. Фильтрация происходит на этапе поступления данных (этап Map), и в результате из всего входящего потока остаются только те строки, которые удовлетворяют критерию UserName= “John”.

К слову, в консоли Linux есть grep – универсальная поисковая утилита, с помощью которой можно выбрать строки, удовлетворяющие заданному условию. Приведённый выше код можно считать распределённой реализацией функции grep, функцией, которая реализуется с помощью MapReduce.

Рассмотрим объединение множеств, результатом которого является множество, содержащее все элементы исходных множеств. Пример диаграммы Эйлера, которая позволяет наглядно изобразить отношения между множествами, для объединения двух множеств A и B:

SQL код, описывающий объединение участников двух таблиц: спортивной команды SportTeamTable и любителей книг BookFansTable (в результате мы получим список людей, которые ЛИБО входят в состав спортивной команды, ЛИБО любят книги, ЛИБО и то и другое одновременно):

SELECT UserId FROM SportTeamTable
UNION
SELECT UserId FROM BookFansTable

Соответствующий Python код для Reducer (класса, реализовывающего функцию reduce):

import sys

lastValue = None

for line in sys.stdin:
    UserId, Set = line.strip().split("\t")
    if UserId != lastValue:
        print(UserId)
    lastValue = UserId

Также предполагается, что в рамках Streaming-интерфейса на вход stdin поступает строка со значениями UserId и Set, разделёнными символами табуляции. UserId — идентификатор пользователя, значение, по которому происходит объединение, а Set — идентификатор множества, который в нашем примере принимал бы два значения «SportTeam» и «BookFans». Здесь приведён код последнего этапа MapReduce задачи – код для Reducer, данные на него поступают уже в отсортированном виде, и для того, чтобы исключить дублирующиеся значения (в соответствии с UNION на SQL, в противоположность UNION ALL) на каждом шаге итерации проверяем, чтобы текущее значение отличалось от предыдущего.

В свою очередь, операция пересечения двух уже рассмотренных ранее множеств участников спортивной команды и любителей книг включала бы только тех людей, которые входят в состав спортивной команды И одновременно любят книги. Область пересечения множеств на диаграмме Эйлера стала куда менее обширной по сравнению с объединением, увы!

SQL код пересечения таблиц SportTeamTable и BookFansTable

SELECT s.UserId FROM SportTeamTable s
INNER JOIN BookFansTable b ON b.UserId = s.UserId

Изменения Python кода при этом минимальны (изменяется лишь оператор в условии с неравенства на равенство):

import sys

lastValue = None

for line in sys.stdin:
    UserId, Set = line.strip().split("\t")
    if UserId == lastValue:
        print(UserId)
    lastValue = UserId

Если в случае объединения множеств из всего потока информации, которая поступает на Reducer в отсортированном виде надо было исключить дублирующуюся, то в данном случае прямо противоположная задача – нужно оставить именно те строки, “ключи” которых равны.

И напоследок, рассмотрим пример агрегирующей функции, которая выполняет вычисления над набором данных и возвращает одно значение – некоторую обобщенную метрику рассматриваемого набора. Мы остановимся на расчёте среднего арифметического группы значений. Среднее значение времени SpentTime, потраченного работником на задачу, в таблице графика задач TaskSchedulerTable на языке SQL:

SELECT UserId, AVG(SpentTime) FROM TaskSchedulerTable
GROUP BY UserId

И соответствующая Python версия:

import sys

lastValue = None
count = None
totalTime = None

for line in sys.stdin:
    UserId, SpentTime = line.strip().split("\t")
    if UserId == lastValue:
        count += 1
        totalTime = totalTime + int(SpentTime)
    else:
        if lastValue is not None:
            print(lastValue, totalTime/count, sep="\t")
        count = 1
        totalTime = int(SpentTime)
        lastValue = UserId
print(UserId, totalTime/count, sep="\t")

Это код Reducer, который будет рассчитывать среднее значение. Учитывая отсортированный порядок поступающих на его вход пар “ключ-значение” в виде строк с UserId и SpentTime, остаётся для каждого “ключа” последовательно накопить весь пул соответствующих значений и рассчитать среднюю величину.

Вместо резюме хочу заметить, что перенос популярных идей на другую методологию решения задач очень даже способствует понимаю механизмов работы методов, лежащих в её основе. Спасибо за внимание!