Время прочтения: 5 мин.
Модуль multiprocessing предоставляет нам два главных способа параллельной обработки массивов: Pool и Process. Для решения нашей задачи Pool нам не подходит — он возвращает результат обработки массива только после завершения работы всех потоков, а нам важно получение результатов в течение всего времени работы программы и отслеживание состояния процессов-обработчиков.
Для примера мы будем хранить результаты обработки в Pickle – с этим форматом легко работать, и он быстро сохраняется на локальный диск. Вы можете использовать иной формат под свои нужды.
Первым делом импортируем библиотеки:
import pickle
import multiprocessing as mp
Далее создаем класс, который будет работать с файлами. В класс будут передаваться имя файла с результатами обработки и путь к папке, в которой находятся файлы для обработки. В файле с результатами должен вестись учет, какие файлы уже прошли обработку. Для этого мы будем хранить данные в словаре Python, где ключом будет выступать имя файла, а значением — данные, полученные при преобразовании этого файла. Таким образом при остановке обработки программа будет знать, какие файлы уже были обработаны.
class FlileManager():
def __init__(self, result_file, file_directory):
self.result = dict()
self.result_file = result_file
self.file_directory = file_directory
with open(result_file,'rb') as p:
#Открываем pickle, если файл поврежден или не найден - будет создан новый
try:
self.result = pickle.load(p)
except (EOFError, pickle.UnpicklingError, FileNotFoundError):
self.result = dict()
#Метод для сохранения результатов обработки в Pickle
def save_pickle(self):
with open(result_file, 'wb') as p:
pickle.dump(self.result, p)
#Сбор необработанных файлов
def collect_files(self) -> list:
files_to_process = []
for root, subdir, files in os.walk(file_directory):
for file in files:
#Здесь также задается расширение обрабатываемых файлов
if file not in self.result.keys() and file.endswith('.txt'):
files_to_process.append(os.path.join(root,file))
return files_to_process
Теперь о параллельной обработке.
Модуль multiprocessing содержит в себе классы для обеспечения связи между процессами, такие как класс Lock и прокси для базовых типов и объектов Python. В нашем случае все процессы должны отправлять результаты обработки в единый словарь. Тогда почему бы не использовать Lock, чтобы единовременно разрешить только одному процессу складывать результаты в словарь и сохранять результаты обработки в файл?
Исходя из собственного опыта, я укажу два существенных минуса:
- Использование Lock тормозит все остальные процессы и обработку в целом.
- Пользователь может прервать программу в момент сохранения данных в файл, и тот может быть поврежден.
Я решил эти две проблемы использованием другого класса из модуля multiprocessing – Queue, то есть очередь. В эту очередь можно складывать различные данные, а потом извлекать их для дальнейшего использования.
Идея заключается в создании нескольких основных процессов, ведущих непосредственную обработку файлов и отдельного процесса, который будет извлекать обработанные данные из очереди, складывать их в словарь и сохранять словарь в файл Pickle с некоторой периодичностью. Также мы добавим возможность остановить обработку самостоятельно, если это нам по какой-то причине стало нужно.
Создадим класс для управления процессами:
class ProcessManager():
def __init__(self):
self.process_list = []
self.queue = mp.Queue()
self.queue_process = None
def create_process(self, func, file_list):
try:
process = mp.Process(target = func, args = (file_list, self.queue))
process.start()
self.process_list.append(process)
except mp.ProcessError:
print('Error during process creation')
def create_queue_process(self, file_manager):
try:
self.queue_process = mp.Process(target = queue_func, args = (self.queue, file_manager, len(self.process_list)))
self.queue_process.start()
self.queue_process.join()
except mp.ProcessError:
for proc in self.process_list:
proc.terminate()
Если процесс, работающий с очередью, не запустится, то обработка файлов не будет иметь смысла, так как они просто не будут сохраняться. Поэтому в случае ошибки при создании обработчика очереди мы должны убить процессы, обрабатывающие данные.
Функция, передаваемая в процессы для обработки, будет принимать в качестве аргумента список файлов и очередь. Таков ее примерный вид:
def process_func(file_list, queue):
try:
for file in file_list:
#some_work_func - некоторая функция, производящая обработку файла
workout_result = some_work_func(file)
queue_token = {file: workout_result}
queue.put(queue_token)
except KeyboardInterrupt:
pass
finally:
queue.put('DONE')
Отмечу, что в конце функции процесса в очередь отправляется строка DONE – это сигнал того, что процесс закончил работу.
Функция, работающая с очередью, будет принимать в качестве аргументов очередь, экземпляр класса работы с файлами и количество рабочих потоков. Функция должна периодически сохранять результаты обработки в файл, заканчивать работу только после окончания работы всех основных процессов и игнорировать прерывание работы программы пользователем, чтобы сохранить все результаты обработки.
def queue_func(queue, file_manager, stream_count):
closed_processes = 0
items_saved = 0
while True:
try:
if queue.is_empty():
pass
else:
item = queue.get()
if item == 'DONE':
closed_processes += 1
else:
file_manager.result.update(item)
items_saved += 1
if items_saved == 10:
file_manager.save_pickle()
items_saved = 0
if closed_processes == stream_count:
break
except KeyboardInterrupt:
pass
file_manager.save_pickle()
Обработчик очереди ведет счет, сколько потоков отправили сигнал DONE об окончании работы. Если сигналов пришло столько же, сколько всего было создано потоков, то обработчик прекращает просматривать очередь и сохраняет прогресс обработки в Pickle.
Основная программа должна быть защищена «входной точкой» if __name__==’__main__’, иначе это вызовет RuntimeError при создании процессов.
if __name__ == '__main__':
try:
stream_count = 5 #количество потоков обработки
files_dir = 'путь/к/директории'
result_file = 'result.pkl' #Файл с результатами обработки
file_manager = FileManager(result_file, files_dir)
process_manager = ProcessManager()
files_to_process = file_manager.collect_files()
for i in range(stream_count):
#Общий список файлов разбивается на равные части для всех потоков
process_manager.create_stream(process_func, files_to_process[i:len(files_to_process):stream_count])
process_manager.create_queue_process(file_manager)
#Пока очередь не закончит заботу, программа не завершится
except KeyboardInterrupt:
print('Пользователь завершил работу досрочно')
else:
print('Программа завершила работу')
Перехват KeyboardInterrupt в главной программе не очень важен и носит более косметический характер.
Используя данный подход, я решил задачу корректного сохранение данных при многопотоковой обработки из VOSK. На этом у меня все. Пишите в комментарии понравилась ли данная статья и что бы вы улучшили.
По ссылке на GitHub вы можете посмотреть полный код шаблона.