Время прочтения: 4 мин.

Всем добрый день! Я технический специалист, работающий в системе внутреннего аудита, в мои обязанности входит создание инструментов ETL на языке программирования C#.

Периодически источниками данных становятся жестко структурированные файлы формата xml, csv, json или любого другого формата. Иногда их количество становится достаточно большим и постоянно увеличивающимся. Например, в одной из моих задач количество файлов увеличивалось со средней скоростью обновления примерно 150 000 файлов в сутки. Если при этом обработка одного файла (считывание массива байт с жесткого диска в память, трансформация загруженных данных и запись их в базу данных) занимает секунду, то становится понятно, что обработка всех файлов займет более 40 часов. В этом случае мы не сможем обработать эти файлы до конца, так как скорость увеличения количества файлов будет явно выше скорости их обработки.

Одно из решений данной проблемы – разработать приложение, в котором создать пул независимых друг от друга потоков. В потоках будут обрабатываться файлы, выбираемые из общей очереди. Однако в этом случае возникают сложности с синхронизацией работающих потоков и совместного использования ресурсов, так как очень вероятно появление взаимных блокировок.

Что бы избежать этих сложностей компания Microsoft добавила в фреймоворк .Net библиотеку TPL (начиная с версии 4.0). Я расскажу, как используя возможности этой библиотеки решить данную проблему.

Итак, изначально алгоритм работы выглядит следующим образом:

  1. Сканируется каталог хранения файлов и возвращается список (например, List<File>), содержащий данные о всех файлах;
  2. Запускается цикл (for или foreach) в котором данные из очередного файла считываются в память, при необходимости трансформируются и записываются в БД.

Очевидно, что самые затратные по времени операции – это считывание данных с жесткого диска в память и запись данных из памяти в БД.

Попробуем оптимизировать наш алгоритм при помощи библиотеки TPL:

Пункт 1.

Изменим список, возвращаемый функцией сканирования каталога хранения файлов с List на ConcurrentQueue<FileInfo>.

Для чего мы это делаем? Дело в том, что класс ConcurrentQueue<T> является потокобезопасным, то есть если одновременно два потока попытаются извлечь данные из этого списка или записать в него данные, то у нас не возникнет исключений (Exception).

Пункт 1 нашего алгоритма будет выглядеть так: сканируется каталог хранения файлов и возвращается список ConcurrentQueue<FileInfo>, содержащий данные о всех файлах.

Пункт 2:

Изменим конструкцию формирующую цикл обработки данных из файла. Заменим for на Parallel.For или Parallel.ForEach.

В чем отличие новой конструкции от for? Тут всё просто и в принципе понятно из названия языковой конструкции. Все итерации цикла выполняются в параллельных потоках. В качестве примера я покажу организацию цикла конструкцией Parallel.ForEach:

Parallel.ForEach(listFiles, (currentFile) =>
       	  {
              	var dataFile = getDataFile(currentFile.FullName);
		TransformData(dataFile);
		WriteToDB(dataFile);
               });

где

  • listFiles – это коллекция типа ConcurrentQueue<FileInfo> содержащая список файлов в каталоге;
  • currentFile – элемент коллекции listFiles, который возвращается конструкцией ForEach;
  • dataFile – условная некоторая структура данных в памяти, получаемая считыванием содержимого файла в память;
  • getDataFile – условная функция возвращающая содержимое файла в виде некоторой структуры данных;
  • TransformData – условная процедура трансформации полученных данных;
  • WriteToDB – условная процедура записи данных в БД.

В данном примере, с помощью конструкции Parallel.ForEach, мы организуем цикл. В этом цикле, в параллельных потоках, производится считывание данных с жесткого диска, их трансформация и запись в БД. При этом, проблемы организации работы параллельных потоков отсутствуют. Количество параллельных потоков зависит от числа ядер процессора и их загруженности.

Используя предложенный алгоритм, мы ускорим обработку файлов, как минимум в 2 раза. Хотя, конечно, эта цифра будет меняться в зависимости от количества ядер и памяти машины, на которой будет запускаться программа.

Так же для ускорения работы программы нужно вынести запись в БД в отдельный поток, работающий независимо от основного. Сделать это можно при помощи коллекции ConcurrentQueue, чтобы избежать конфликтов при добавлении данных в очередь.

Перепишем вышеприведенный пример с учетом оптимизации записи в БД.

Предположим, что процедура чтения файлов возвращает нам данные в DataTable):

Parallel.ForEach(listFiles, (currentFile) =>
       	  {
              	DataTable dataFile = getDataFile(currentFile.FullName);
		TransformData(dataFile);
		threadWriteToDB.ListData.Enqueue(dataFile);
               });

Как видно, вместо строки с вызовом процедуры записи в БД, мы просто добавляем в коллекцию ConcurrentQueue<DataTable> ListData описанную и инициализированную в отдельном потоке, экземпляр которого threadWriteToDB используется в нашем цикле.

Запись в БД происходит уже в отдельном потоке. Запись в БД можно организовать аналогично работе с файлами, с помощью конструкций Parallel.For и/или Parallel.Foreach.

В моей задаче, где потребовалась обработка сопоставимого количества файлов, сейчас может обрабатываться в среднем от 200 000 до 400 000 файлов в сутки, при чем скорость ограничивается загрузкой БД и шириной канала данных.