Python, Программирование

Многопоточная обработка и автосохранение в файл

Время прочтения: 5 мин.

Модуль multiprocessing предоставляет нам два главных способа параллельной обработки массивов: Pool и Process. Для решения нашей задачи Pool нам не подходит — он возвращает результат обработки массива только после завершения работы всех потоков, а нам важно получение результатов в течение всего времени работы программы и отслеживание состояния процессов-обработчиков.

Для примера мы будем хранить результаты обработки в Pickle – с этим форматом легко работать, и он быстро сохраняется на локальный диск. Вы можете использовать иной формат под свои нужды.

Первым делом импортируем библиотеки:

import pickle
import multiprocessing as mp

Далее создаем класс, который будет работать с файлами. В класс будут передаваться имя файла с результатами обработки и путь к папке, в которой находятся файлы для обработки. В файле с результатами должен вестись учет, какие файлы уже прошли обработку. Для этого мы будем хранить данные в словаре Python, где ключом будет выступать имя файла, а значением — данные, полученные при преобразовании этого файла. Таким образом при остановке обработки программа будет знать, какие файлы уже были обработаны.

class FlileManager():
    def __init__(self, result_file, file_directory):
        self.result = dict()
        self.result_file = result_file
        self.file_directory = file_directory
        with open(result_file,'rb') as p:
            #Открываем pickle, если файл поврежден или не найден - будет создан новый
            try:
                self.result = pickle.load(p)
            except (EOFError, pickle.UnpicklingError, FileNotFoundError):
                self.result = dict()

    #Метод для сохранения результатов обработки в Pickle
    def save_pickle(self):
        with open(result_file, 'wb') as p:
            pickle.dump(self.result, p)

    #Сбор необработанных файлов
    def collect_files(self) -> list:
		files_to_process = []
        for root, subdir, files in os.walk(file_directory):
			for file in files:
				#Здесь также задается расширение обрабатываемых файлов
				if file not in self.result.keys() and file.endswith('.txt'):
					files_to_process.append(os.path.join(root,file))
		return files_to_process

Теперь о параллельной обработке.

Модуль multiprocessing содержит в себе классы для обеспечения связи между процессами, такие как класс Lock и прокси для базовых типов и объектов Python. В нашем случае все процессы должны отправлять результаты обработки в единый словарь. Тогда почему бы не использовать Lock, чтобы единовременно разрешить только одному процессу складывать результаты в словарь и сохранять результаты обработки в файл?

Исходя из собственного опыта, я укажу два существенных минуса:

  1. Использование Lock тормозит все остальные процессы и обработку в целом.
  2. Пользователь может прервать программу в момент сохранения данных в файл, и тот может быть поврежден.

Я решил эти две проблемы использованием другого класса из модуля multiprocessing – Queue, то есть очередь. В эту очередь можно складывать различные данные, а потом извлекать их для дальнейшего использования.

Идея заключается в создании нескольких основных процессов, ведущих непосредственную обработку файлов и отдельного процесса, который будет извлекать обработанные данные из очереди, складывать их в словарь и сохранять словарь в файл Pickle с некоторой периодичностью. Также мы добавим возможность остановить обработку самостоятельно, если это нам по какой-то причине стало нужно.

Создадим класс для управления процессами:

class ProcessManager():
	def __init__(self):
		self.process_list = []
		self.queue = mp.Queue()
		self.queue_process = None
	
	def create_process(self, func, file_list):
		try:
			process = mp.Process(target = func, args = (file_list, self.queue))
			process.start()
			self.process_list.append(process)
		except mp.ProcessError:
			print('Error during process creation')
	
	def create_queue_process(self, file_manager):
		try:
			self.queue_process = mp.Process(target = queue_func, args = (self.queue, file_manager, len(self.process_list)))
			self.queue_process.start()
			self.queue_process.join()
		except mp.ProcessError:
			for proc in self.process_list:
				proc.terminate()

Если процесс, работающий с очередью, не запустится, то обработка файлов не будет иметь смысла, так как они просто не будут сохраняться. Поэтому в случае ошибки при создании обработчика очереди мы должны убить процессы, обрабатывающие данные.

Функция, передаваемая в процессы для обработки, будет принимать в качестве аргумента список файлов и очередь. Таков ее примерный вид:

def process_func(file_list, queue):
	try:
		for file in file_list:
			#some_work_func - некоторая функция, производящая обработку файла
			workout_result = some_work_func(file)
			queue_token = {file: workout_result}
			queue.put(queue_token)
	except KeyboardInterrupt:
		pass
	finally:
		queue.put('DONE')

Отмечу, что в конце функции процесса в очередь отправляется строка DONE – это сигнал того, что процесс закончил работу.

Функция, работающая с очередью, будет принимать в качестве аргументов очередь, экземпляр класса работы с файлами и количество рабочих потоков. Функция должна периодически сохранять результаты обработки в файл, заканчивать работу только после окончания работы всех основных процессов и игнорировать прерывание работы программы пользователем, чтобы сохранить все результаты обработки.

def queue_func(queue, file_manager, stream_count):
	closed_processes = 0
	items_saved = 0
	while True:
		try:
			if queue.is_empty():
				pass
			else:
				item = queue.get()
				if item == 'DONE':
					closed_processes += 1
				else:
					file_manager.result.update(item)
					items_saved += 1
					if items_saved == 10:
						file_manager.save_pickle()
						items_saved = 0
			if closed_processes == stream_count:
				break
		except KeyboardInterrupt:
			pass
	file_manager.save_pickle()

Обработчик очереди ведет счет, сколько потоков отправили сигнал DONE об окончании работы. Если сигналов пришло столько же, сколько всего было создано потоков, то обработчик прекращает просматривать очередь и сохраняет прогресс обработки в Pickle.

Основная программа должна быть защищена «входной точкой» if __name__==’__main__’, иначе это вызовет RuntimeError при создании процессов.

if __name__ == '__main__':
	try:
		stream_count = 5 #количество потоков обработки
		files_dir = 'путь/к/директории'
		result_file = 'result.pkl' #Файл с результатами обработки
	
		file_manager = FileManager(result_file, files_dir)
		process_manager = ProcessManager()
		files_to_process = file_manager.collect_files()
	
		for i in range(stream_count):
			#Общий список файлов разбивается на равные части для всех потоков
			process_manager.create_stream(process_func, files_to_process[i:len(files_to_process):stream_count])
		process_manager.create_queue_process(file_manager)
		#Пока очередь не закончит заботу, программа не завершится
	except KeyboardInterrupt:
		print('Пользователь завершил работу досрочно')
	else:
		print('Программа завершила работу')

Перехват KeyboardInterrupt в главной программе не очень важен и носит более косметический характер.

Используя данный подход, я решил задачу корректного сохранение данных при многопотоковой обработки из VOSK. На этом у меня все. Пишите в комментарии понравилась ли данная статья и что бы вы улучшили.

По ссылке на GitHub вы можете посмотреть полный код шаблона.

Советуем почитать