Время прочтения: 7 мин.
Что такое .NET for Apache Spark?
Фреймворк .NET for Apache Spark — оболочка на языке c#, разработанная в спецификации .net для предоставления .net разработчикам возможности использования распределенной системы для разработки приложений обработки больших данных.
Фреймворк имеет следующие возможности:
- работать со структурированными данными и использовать синтаксис SQL запросов;
- работать с потоковыми данными;
- SparkML;
- подключения и получения данных из разных источников: hdfs, ms sql, postgre sql, kafka, elasticnet, azure и т.д.
Приложения, использующие фреймворк могут быть развернуты как на локальной машине под управлением windows или linux так и на облачных сервисах Microsoft благодаря соответствию стандарту .NET Standart и формальной спецификации .NET API.
С чего начать?
Прежде чем начать работать непосредственно с библиотекой, необходимо настроить окружение — скачать Microsoft.Spark.Worker с официального репозитория для нужной системы(windows/linux). Для систем под управлением windows — прописать переменную среду DOTNET_WORKER_DIR, установив в нее путь до папки со spark worker’ом, проверить доступность серверов spark и подключение к ним.
Также необходимо подключить пакет .net for apache spark к приложению. Для windows достаточно через интерфейс управления nuget пакетами visual studio загрузить и установить необходимый пакет. для linux предварительно необходимо скачать репозиторий и собрать библиотеку при помощи maven, а затем прописать зависимость для приложения.
Более подробно этот процесс расписан в официальной документации по фреймворку, так что я не буду на нем останавливаться.
Работа с фреймворком
На примере нескольких задач из моей практики я продемонстрирую работу с фреймворком.
Первая задача заключалась в разработке API для подсчета агрегированной статистики по количеству кредитов, выданных клиентам нескольких отделений для ранжирования. У нас был структурированный файл с данными по продажам кредитных продуктов банка, и при запросе к API необходимо было выдать пользователю готовую статистику по продажам в подразделениях в Json формате.
Я использовал отдельный класс для работы со spark, зарегистрировав его в промежуточном слое приложения:
builder.Services.AddSingleton<ISparkConnector, SparkConnector>();
Для того чтобы получить доступ к распределенным вычислениям через spark, я инициализировал spark сессию. Собирается пайплайн сессии через метод Builder(), задается имя приложения, адрес сервера spark, а также параметры конфигурации для сессии. Инициализация сессии выполняется при вызове завершающего метода GetOrCreate():
public class SparkConnector : ISparkConnector
{
SparkSession spark;
public SparkConnector()
{
try {
spark = SparkSession
.Builder()
.Master("yarn")
.AppName("advertisement_count")
.Config("spark.driver.cores", "2")
.Config("spark.driver.memory", "4g")
.Config("spark.executor.memory", "2g")
.GetOrCreate();
} catch {
///недоступен spark сервер
///
...
}
}
Полный список параметров конфигурации можно посмотреть в официальной документации по спарку. Я в своем коде использую только ограничения по памяти для экзекуторов.
Для загрузки данных из hdfs необходимо задать формат читаемого файла и дополнительные опции для DataFrameReader, например, наличие заголовков в читаемом файле и разделитель:
var scores = spark.Read()
.Format("csv")
.Option("header", "true")
.Option("delimiter", "\t")
.Load("hdfs://my/hdfs/data/path/data.dat");
Фреймворк позволяет собирать пайплайны для работы с данными так, что решение умещается в несколько строк кода:
public string GetStatistic(string code = null)
{
if (code != null) {
return String.Join(",", scores.Filter($"filial = {code}")
.GroupBy("filial")
.Count()
.ToJSON()
.Collect());
} else
{
return String.Join(",", scores
.GroupBy("filial")
.Count()
.ToJSON()
.Collect());
}
}
Чтение данных в спарке реализовано через класс DataFrameReader, который инициализируется методом .Read(). Для него задается формат файла .Format(), устанавливаются дополнительные опции для чтения файла, например, кодировка, разделитель и заголовки для табличных файлов. Непосредственно данные читаются вызовом метода .Load(), в качестве аргумента в него передается локация файла в файловой системе. На выходе получается объект класса DataFrame, с которым уже производятся манипуляции через DataFrame API: GroupBy() для групировки по колонке, .Count() для подсчета количества записей по группам, .ToJSON() для преобразования данных в датафрейме в json формат и .Collect() для получения данных в приложение.
Готово! Осталось только создать метод контроллера, возвращающий данные:
[ApiController]
[Route("[controller]")]
public class SparkController : ControllerBase
{
private ISparkConnector _sparkConnector;
private readonly ILogger<SparkController> _logger;
public SparkController(ILogger<SparkController> logger, ISparkConnector sparkConnector)
{
_logger = logger;
_sparkConnector = sparkConnector;
}
[HttpGet(Name = "AdvStat")]
public string Get()
{
return _sparkConnector.GetStatistic();
}
}
Естественно, результат необходимо дополнительно обработать и привести его в нормальный формат для дальнейшего чтения. Здесь я опущу эти подробности, так как делается это без использования фреймворка.
Вторая задача заключалась в обработке лога отправки электронных писем c hdfs, объединив его с журналом договоров c mssql и подсчетом количества отправленных писем по группам заявок.
Для этой задачи я подключался к разным источникам. Лог отправки писем лежал на hdfs в формате parquet, так что запросить его можно, использовав метод DataFrameReader Parquet():
var logs = spark.Read().Parquet("hdfs://my/hdfs/data/path/maillogs");
Для запроса данных из таблицы ms sql необходимо использовать jdbc драйвер, который указывается через метод .Format(). Также необходимо указать адрес ms sql сервера и таблицу из базы через метод Option():
var deals = spark.Read()
.Format("com.microsoft.sqlserver.jdbc.spark")
.Option("url", "\\ip.adress.mssql:port")
.Option("dbtable", "[crm].[reports].[deals]")
.Load();
Для начала я обрабатывал лог электронных писем. Меня интересовали только отправленные письма, в данном случае те, у которых присутствует событие «добавлен адрес к письму». Идущие друг за другом события «создано письмо» — «добавлен адрес к письму» определяют, что письмо было отправлено. Для отсеивания неотправленных писем по этой логике я использовал оконную функцию Lag(). Функция возвращает значение заданного поля из предыдущей строки:
logs = logs.WithColumn(“prev_event_name”, Functions.Lag("event_name",1).Over(Window.OrderBy("date_create")))
.WithColumn("prev_event_text_1", Functions.Lag("event_text_1",1).Over(Window.OrderBy("date_create")))
.Filter(logs["event_name"] == "Добавлен адрес к письму" && logs["prev_event_name"] =="Создано письмо");
Так как функция оконная, при помощи метода Over() определяется окно, в котором она будет работать. Окно задается через методы статического класса Window. В моем случае окном для выполнения метода Lag() является весь датафрейм, отсортированный по времени появления событий, поэтому я использовал метод OrderBy()
Посмотрим на еще одну возможность фреймворка — создание функций, определенных пользователем (user-defined functions или udf). Метод Udf<>() создает и регистрирует в контексте спарка функцию для обработки полей датафрейма по определенной пользователем логике. Для своей задачи я написал функцию выделения домена из почтового адреса:
var GetDomain = Functions.Udf<string, string>(
str => str.Split("@").Count() > 1 ? str.Split("@")[1] : "no domain";
);
var logs = logs.WithColumn("domain", GetDomain(scores["prev_event_text_1"]));
Далее к получившемуся датафрейму необходимо было присоединить информацию по договорам. Конкретно для моей задачи – сегмент клиента.
Для этого я использовал метод Join(). Чтобы избежать дублирования имен полей в результирующем датафрейме, можно передать методу не логическое выражение объединения, а массив с именами полей:
var segLogs = logs.Join(deals.Select("entity_id", "client_segment"), new List<string>() { "entity_id" }, "inner");
После этого я посчитал количество писем, отправленных на каждый из доменов для каждого сегмента. Для этого я использовал метод Pivot(). Метод сводит колонку в строку и применяет для каждого поля функцию агрегации. В моем случае для подсчета количества писем я использовал Count():
var result = segLogs.GroupBy("client_segment").Pivot("domain").Count();
Результирующий датафрейм я сохранил на hdfs для дальнейшего взаимодействия с ним аналитиками. Сохранение результатов происходит через DataFrameWriter. Так же, как и с ридером, можно использовать дополнительные опции и указать формат сохраняемого файла, либо использовать стандартные функции сохранения, например .Parquet():
result.Write().Parquet("hdfs://my/result/folder/segments_mails_count");
Заключение
Применив фреймворк .NET for Apache Spark, я решил несколько своих задач и приобрел интересный опыт взаимодействия с большими данными на платформе .net.
Для себя я выделил как положительные, так и отрицательные стороны.
Положительные: простота изучения, синтаксис методов повторяет такой у фреймворков на scala и python, легкая интеграция с .net приложениями, возможность преобразования результатов из объектов spark в с# структуры и дальнейшее их использование в приложении.
Отрицательные: необходимость дополнительной настройки машины для запуска приложения, без установки Microsoft.Spark.Worker и добавления переменной окружения программа не будет работать.