Время прочтения: 6 мин.
В каждом из нас время от времени просыпается инженерный дух, который, не удовлетворяясь простым созерцанием какого-либо явления, жаждет если хотя бы не докопаться до сути, то по крайней мере копнуть чуть глубже, перейти на уровень ниже и оттуда изучить волшебство устройства, секрет работы и тайну движимой им силы. Иногда так случается, что этот самый дух оказывается очень голоден и он решительно требует ещё — либо следующего куска, либо другого лакомства. Парадокс заключается в том, что чем он голоднее, тем больший созидательный потенциал он содержит. И в этой статье я хочу предоставить немного пищи для размышления, рассмотрев механизм работы MapReduce.
Речь пойдёт о функциях работы со множествами. Это такие функции как объединение, пересечение, выборка элементов из множества по определённым условиям и другие. Казалось бы, что тут сложного? Это же всё реализовано в реляционных базах данных. Но… у реляционной базы данных есть свои ограничения, в частности, она позволяет хранить достаточно ограниченный объём информации. Когда объём данных слишком велик для того, чтобы быть размещённым на одном сервере базы данных, данные хранятся распределённо, а многие инструменты, нативные для базы данных, приходится реализовывать самому в рамках тех законов, которые действуют в распределённой системе.
Развитие технологии BigData началось с парадигмы MapReduce — модели распределённых вычислений, разработанной компанией Google, над очень большими наборами данных, расположенных на компьютерных кластерах. Парадигма MapReduce взяла за основу две процедуры функционального программирования: map, применяющую функцию к каждому элементу списка, и reduce, объединяющая результаты работы map. Сама вычислительная модель состоит из трёх шагов:
1. Map — предварительная обработка входных данных, заключающаяся в том, что каждый рабочий узел применяет функцию map к локальным данным и записывает результат в формате «ключ-значение» во временное хранилище
2. Shuffle (перемешивание) — перераспределение данных на основе ключей таким образом, чтобы все данные одного ключа лежали на одном рабочем узле
3. Reduce (свёртка) — параллельная обработка рабочими узлами каждой группы данных по порядку следования ключей и «объединение» результатов в финальный итог
Попробуем функции работы со множествами написать на языке MapReduce. Например, выборка по определённому критерию реализуется с помощью одной MapReduce задачи, в которой есть только одна фаза Map. И то, что на SQL языке реляционных баз данных было бы написано так (из всех строк таблицы пользователей UserTable выбрать только те записи, в которых имя пользователя UserName соответствует значению “John”):
SELECT UserId, UserName, Age FROM UserTable
WHERE UserName = 'John'
трансформировалось бы в следующий Python код для Mapper (класса, выполняющего функцию map) в рамках Streaming-интерфейса (когда входные данные принимаются с stdin):
import sys
for line in sys.stdin:
UserId, UserName, Age = line.strip().split("\t")
if UserName == "John":
print(line.strip())
Предполагается, что на вход подаётся строка со значениями UserId, UserName и Age, разделёнными символами табуляции. Фильтрация происходит на этапе поступления данных (этап Map), и в результате из всего входящего потока остаются только те строки, которые удовлетворяют критерию UserName= “John”.
К слову, в консоли Linux есть grep – универсальная поисковая утилита, с помощью которой можно выбрать строки, удовлетворяющие заданному условию. Приведённый выше код можно считать распределённой реализацией функции grep, функцией, которая реализуется с помощью MapReduce.
Рассмотрим объединение множеств, результатом которого является множество, содержащее все элементы исходных множеств. Пример диаграммы Эйлера, которая позволяет наглядно изобразить отношения между множествами, для объединения двух множеств A и B:
SQL код, описывающий объединение участников двух таблиц: спортивной команды SportTeamTable и любителей книг BookFansTable (в результате мы получим список людей, которые ЛИБО входят в состав спортивной команды, ЛИБО любят книги, ЛИБО и то и другое одновременно):
SELECT UserId FROM SportTeamTable
UNION
SELECT UserId FROM BookFansTable
Соответствующий Python код для Reducer (класса, реализовывающего функцию reduce):
import sys
lastValue = None
for line in sys.stdin:
UserId, Set = line.strip().split("\t")
if UserId != lastValue:
print(UserId)
lastValue = UserId
Также предполагается, что в рамках Streaming-интерфейса на вход stdin поступает строка со значениями UserId и Set, разделёнными символами табуляции. UserId — идентификатор пользователя, значение, по которому происходит объединение, а Set — идентификатор множества, который в нашем примере принимал бы два значения «SportTeam» и «BookFans». Здесь приведён код последнего этапа MapReduce задачи – код для Reducer, данные на него поступают уже в отсортированном виде, и для того, чтобы исключить дублирующиеся значения (в соответствии с UNION на SQL, в противоположность UNION ALL) на каждом шаге итерации проверяем, чтобы текущее значение отличалось от предыдущего.
В свою очередь, операция пересечения двух уже рассмотренных ранее множеств участников спортивной команды и любителей книг включала бы только тех людей, которые входят в состав спортивной команды И одновременно любят книги. Область пересечения множеств на диаграмме Эйлера стала куда менее обширной по сравнению с объединением, увы!
SQL код пересечения таблиц SportTeamTable и BookFansTable
SELECT s.UserId FROM SportTeamTable s
INNER JOIN BookFansTable b ON b.UserId = s.UserId
Изменения Python кода при этом минимальны (изменяется лишь оператор в условии с неравенства на равенство):
import sys
lastValue = None
for line in sys.stdin:
UserId, Set = line.strip().split("\t")
if UserId == lastValue:
print(UserId)
lastValue = UserId
Если в случае объединения множеств из всего потока информации, которая поступает на Reducer в отсортированном виде надо было исключить дублирующуюся, то в данном случае прямо противоположная задача – нужно оставить именно те строки, “ключи” которых равны.
И напоследок, рассмотрим пример агрегирующей функции, которая выполняет вычисления над набором данных и возвращает одно значение – некоторую обобщенную метрику рассматриваемого набора. Мы остановимся на расчёте среднего арифметического группы значений. Среднее значение времени SpentTime, потраченного работником на задачу, в таблице графика задач TaskSchedulerTable на языке SQL:
SELECT UserId, AVG(SpentTime) FROM TaskSchedulerTable
GROUP BY UserId
И соответствующая Python версия:
import sys
lastValue = None
count = None
totalTime = None
for line in sys.stdin:
UserId, SpentTime = line.strip().split("\t")
if UserId == lastValue:
count += 1
totalTime = totalTime + int(SpentTime)
else:
if lastValue is not None:
print(lastValue, totalTime/count, sep="\t")
count = 1
totalTime = int(SpentTime)
lastValue = UserId
print(UserId, totalTime/count, sep="\t")
Это код Reducer, который будет рассчитывать среднее значение. Учитывая отсортированный порядок поступающих на его вход пар “ключ-значение” в виде строк с UserId и SpentTime, остаётся для каждого “ключа” последовательно накопить весь пул соответствующих значений и рассчитать среднюю величину.
Вместо резюме хочу заметить, что перенос популярных идей на другую методологию решения задач очень даже способствует понимаю механизмов работы методов, лежащих в её основе. Спасибо за внимание!