Время прочтения: 4 мин.
Интересная тема для обсуждения, не так ли? Для начала обратимся к предыдущей статье цикла, описывающей применение методов NLP в аудите, где я использовал линейную обработку русскоязычного текста. Посмотреть статью можно здесь в открытом доступе: https://newtechaudit.ru/nlp-i-audit/
Сложность состоит в том, что я подвергал обработке масштабные русскоязычные тексты, полные уникальных слов. Поэтому использование таких инструментов, как natasha, занимало большое время. Рассмотрим пример используемой функции препроцессинга:
def preproc_line(line, stop_words):
line = ''.join(i for i in line if not i.isdigit())
line = line.translate(str.maketrans('', '', string.punctuation))
line = line.lower()
if len(stop_words):
line = del_stopwords(line, stop_words)
line = pymorphy_preproc(line)
line = ' '.join(line)
line = sub_names(line)
line = sub_dates(line)
return line
Здесь я последовательно очистил данные, удалил стоп-слова, выделил именованные сущности (имена и даты)
vectorizer = TfidfVectorizer(ngram_range = (1, 2))
vect = vectorizer.fit_transform(X_train)
А затем токенизировал и выделил биграммы. Все это я уже демонстрировал ранее, теперь обратимся к главной проблеме: времени исполнения. Воспользуемся классической библиотекой, которая поможет нам измерять время выполнения программы:
import time
И оценим, как долго будет исполняться целевая функция в стандартном виде:
start_time = time.time()
X_train = preproc_data(outp_df, words_pack_1, 'Text')
print("--- %s seconds ---" % (time.time() - start_time))

Ого, несколько тысяч коротких сообщений из чат-бота и более минуты исполнения! Как же мы можем ускорить этот процесс? Для этого и используется многопоточность, то есть разделение одного процесса на несколько потоков, работающих независимо. Нашу структуру упрощает также то, что нам не нужна синхронизация потоков, поскольку обрабатывать мы можем датасет частями и последовательно, не обмениваясь информацией при перезаписывании. Для этого волшебства нам понадобятся всего две библиотеки, одна из которых делит процесс на потоки, а вторая нужна для создания физической копии обрабатываемых данных. Выигрыш во времени, проигрыш в памяти.
import threading
from copy import copy
Теперь создадим копию:
data_new = copy(data)
И обернем нашу функцию препроцессинга таким образом, чтобы она обрабатывала только часть данных, то есть срез:
def thread_function(data, data_from, data_to, stop_words):
for i in range(data_from, data_to):
data_new[i] = ''.join(preproc_line(data[i], stop_words))
return data
Теперь создадим список для хранения открытых потоков, затем определим размер среза данных, пропорциональный количеству потоков:
threads = []
num_threads = 1
bunch_size = (len(data) + num_threads - 1) / num_threads;
Отлично. Далее пройдемся по всему списку обрабатываемых сэмплов и применим треды к каждому срезу по отдельности:
for i in range(num_threads):
data_from = int(bunch_size * i)
data_to = int(bunch_size * (i + 1))
if data_to > len(data):
data_to = int(len(data))
x = threading.Thread(target=thread_function, args=(data, data_from, data_to, words_pack_1))
threads.append(x)
x.start()
for index, thread in enumerate(threads):
thread.join()
Финал: смотрим на время исполнения всей конструкции (тот же результат, что и без потоков):

Действительно, какой выигрыш во времени! В самом деле, работает. И это может быть гораздо важнее и заметнее на более длинных текстах, на большем количестве сообщений, с применением большего числа функций выделения именованных сущностей.
Также в зависимости от процессора/ОС/движка Python можно использовать и разделение работы на несколько процессов. Для этого легко воспользоваться библиотекой:
from multiprocessing import Process
А разработка имеет тот же синтаксис:
data = list(outp_df['Text'].astype(str))[:10]
data_new = copy(data)
def proc_function(data, data_from, data_to, stop_words):
for i in range(data_from, data_to):
data_new[i] = ''.join(preproc_line(data[i], stop_words))
return data
processes = []
num_processes = 2
bunch_size = (len(data) + num_processes - 1) / num_processes;
for i in range(num_processes):
data_from = int(bunch_size * i)
data_to = int(bunch_size * (i + 1))
if data_to > len(data):
data_to = int(len(data))
x = Process(target=proc_function, args=(data, data_from, data_to, words_pack_1))
processes.append(x)
x.start()
for index, process in enumerate(processes):
process.start()
for index, process in enumerate(processes):
process.join()
Однако на физическом уровне работа такого алгоритма серьезно отличается от использования потоков: исполнение переносится уже на несколько независимых процессов по разным ядрам машины. Это может быть много быстрее, чем разделение на потоки, однако требует большего количества ресурсов CPU, а также особенностей движка, например, механизмов очистки мусора.