Время прочтения: 6 мин.

В статье «Лечим проблемы с памятью у Pandas» мы разбирали пример с анализом операций клиентов и брокеров на финансовых рынках, или транзакций по картам для выявления мошеннических действий и нашли, как уменьшить объем памяти фрейма данных, выбрав правильные типы столбцов.

Но, если даже после таких преобразований не удаётся поместить данные в память, то нужно менять стратегию.

Вместо того, чтобы пытаться загрузить полный набор данных в память, мы можем загружать и обрабатывать его по частям. Как это работает?

В любой момент времени только часть строк из нашего датафрейма фактически размещается в памяти. После того, как мы загрузим следующий фрагмент в память, процесс сборки мусора Python удаляет старый фрагмент из памяти. Недостатком является то, что мы должны написать код, который может работать только с частью данных, хранить промежуточные результаты и объединять их в конце.

Очевидный алгоритм для простых задач, может потребовать нетривиальных решений для сложных.

В этой статье мы рассмотрим, как разбить массивный пласт данных на несколько фрагментов и оперировать ими.

Функция pandas.read_csv() облегчает нам этот процесс. Она имеет параметр chunksize, который мы можем использовать для указания количества строк, которое будет содержать каждый фрагмент.

Эта функция возвращает не сами фрагменты, а объект – итератор, который мы можем использовать в цикле для обработки каждого фрагмента фрейма данных.

К сожалению, нет надежного способа убедиться, что каждый фрагмент находится ниже определенного порога памяти. Лучший способ — попробовать начать с небольшого числа строк, вычислить объем памяти каждого фрагмента фрейма данных, а затем увеличить количество строк до тех пор, пока вы не окажетесь ниже 50% своего порога (просто для безопасности).

Например, наш набор данных из 45,5 млн строк потребляет 5,2 ГБ памяти. Тогда мы можем предположить, что наборы по 500 тыс. строк займут около 58 МБ.

Если фрагментов-кусков не много, мы можем отобразить их на гистограмме:

import pandas as pd
import matplotlib.pyplot as plt

memory_usage = []
chunk_iter = pd.read_csv("data.csv", "r", encoding="utf-8", chunksize=500000)
for chunk in chunk_iter:
    memory_footprints.append(chunk.memory_usage(deep=True).sum()/(1024*1024))

plt.hist(memory_footprints)
plt.show()

В стандартном случае для определения количества строк в одном фрейме данных, мы используем функцию len(). Чтобы сделать это с фрагментированным датафреймом, нам нужно будет вычислить количество строк в каждом фрагменте и добавить это значение в переменную счетчика. Аналогично мы работаем со списками.

num_rows = 0
chunk_iter = pd.read_csv("data.csv", chunksize=500000)
for chunk in chunk_iter:
    num_rows += len(chunk)
print(num_rows)

Давайте рассмотрим такую задачу, в которой каждый фрагмент содержит набор значений, которые нам нужно объединить.

Например, чтобы объединить расчётные значения из наших фрагментов (такие как количество операций на бирже, финансовый результат от продажи ценных бумаг и т.п.) мы могли бы посчитать каждый параметр отдельно и добавлять в список python интересующие нас данные. Но в данным случае мы бы потеряли во времени. Что и произошло при использовании первой версии скрипта. И чем больше у нас был бы анализируемый объём, тем дольше проходил процесс. Гораздо выгоднее работать с объектами pandas, чтобы мы могли воспользоваться преимуществами оптимизации под капотом, чтобы сохранить как процессор, так и память.

Предпочтительный способ сделать это в pandas – добавить каждый объект, например, series (или датафрейм, или что-то подобное), который мы хотим объединить вместе, в список, а затем объединить их в конце с помощью pandas.concat().

Так:

series_list = [
		quarter_calculation(client_code, df_operations, quarters)[‘PRICE’],
		quarter_calculation(client_code, df_operations, quarters)[‘COMIS’]
	      ]
ptint(pd.concat(series_list, axis = 1))

Или так:

List_df = list()
df_output = pd.DataFrame(columns=
    [
        'DATE',
        'CLIENTCODE',
        'CODE',
        'SECCODE',
        'PRICE',
        'COMIS', 
        'VALUE',
    ]
			)
for df_operations in chunk_iter:
df_calc = quarter_calculation(client_code, df_operations, quarters)
List_df.append(df_calc)
df_output = pd.concat(List_df, ignore_index = True)

Допустим, нам надо посчитать дельту стоимости портфеля клиента за период. Тогда, первый вариант кода с использованием фрагментов будет такой:

dtypes = {
	  "Investment_portfolio_cost_date1": "float", 
	  "Investment_portfolio_cost_date2": "float"
	}
chunk_iter = pd.read_csv("data.csv", chunksize=500000, dtype=dtypes)
delta_cost = []
for chunk in chunk_iter:
    diff=chunk['Investment_portfolio_cost_date1']-chunk['Investment_portfolio_cost_date2']
    delta_cost.append(diff)
lifespans_dist = pd.concat(delta_cost)
print(lifespans_dist)

В данном цикле каждый фрагмент датафрейма содержал все строки, хотя мы использовали только два столбца. Если мы знаем, что будем использовать только эти два столбца, мы можем повысить производительность. Оценим время выполнения разных вариантов одного алгоритма.

%%timeit
delta_cost = []
chunk_iter = pd.read_csv("data.csv", 
			chunksize=500000, 
			dtype = {
				'Investment_portfolio_cost_date1’: ‘float’, 
				'Investment_portfolio_cost_date2': ‘float’
				}
			)
for chunk in chunk_iter:
    delta_cost.append(chunk['Investment_portfolio_cost_date1’] -\
		     chunk['Investment_portfolio_cost_date2'])
delta_cost_dist = pd.concat(delta_cost)

Наилучшее время для исходного, немного упрощённого кода, лучшее из 3: 1,4 сек. на цикл. Обратите внимание, что это может сильно отличаться в зависимости от используемого оборудования. Теперь, сравним это с таким вариантом:

%%timeit
delta_cost = []
chunk_iter = pd.read_csv("data.csv", 
			chunksize=500000, 
			dtype={
				'Investment_portfolio_cost_date1’: "float", 
				'Investment_portfolio_cost_date2': "float"
			      }, 
			usecols=[
				'Investment_portfolio_cost_date1’, 
				'Investment_portfolio_cost_date2'
				]
			)
for chunk in chunk_iter:
    delta_cost.append(chunk['Investment_portfolio_cost_date1’] -\
		     chunk['Investment_portfolio_cost_date2'])
delta_cost_dist = pd.concat(delta_cost)

В этом варианте, время лучшего из 3: 467 МС на цикл. Загрузив только те колонки, которые нам были нужны, мы смогли сократить общее время работы примерно на две трети. Мы также можем увеличить размер блока (значение chuksize) и по-прежнему держать объем памяти каждого блока ниже нашего порогового значения.

Далее нам было бы интересно, например, определить виды бумаг, которыми оперируют клиенты, и количество операций по каждому виду. Стандартным способом это посчитать является pandas.Series.value_counts(). Однако при работе с пакетными данными нам необходимо вычислить уникальные значения для каждого фрагмента и объединить их в конце.

Примерный алгоритм таков:

— мы можем использовать pandas.concat() и объединить все фрагменты в конце, тогда

1. генерируем подсчёт уникальных значений для столбца ‘SECCODE’ (код бумаги) для каждого блока таблицы данных из 500000 строк.

2. добавляем каждый результирующий объект серии в список securities_vc.

3. используем pandas.concat(),объединяя все объекты серии в securities_vc в один объект и назначаем результат в summary_vc.

chunk_iter = pd.read_csv("data.csv", chunksize=500000, usecols=['SECCODE'])
securities_vc = list()
for chunk in chunk_iter:
    chunk_vc = chunk['SECCODE'].value_counts()
    securities_vc.append(chunk_vc)
summary_vc = pd.concat(securities_vc)

Хотя мы объединили все объекты Series в один объект, он содержит повторяющиеся значения индекса. Это потому, что pandas.concat() не объединяет значения с одним и тем же индексом.

Чтобы суммировать все значения, связанные с индексом, нам сначала нужно сгруппировать такие значения. Мы можем сделать это, используя Series.groupby(). Вызов этого метода вернет pandas.GroupBy объект и будет отображать только ссылку. Мы можем просматривать группировки в ходе итерации объекта, как мы делаем это со словарями. Используем  GroupBy.get_group() метод.

Например, как-то так:

temp_data = s_calc_securities.groupby(s_calc_securities.index)
for key, item in temp_data:
   print(temp_data.get_group(key))

Как только мы сгруппируем значения по соответствующим индексам, мы сможем вызвать Series.sum()  метод, чтобы получить окончательный результат с уникальными значениями.

Полный текст кода такой:

chunk_iter = pd.read_csv("data.csv", chunksize=500000, usecols=['SECCODE'])
securities_vc = list()
for chunk in chunk_iter:
    chunk_vc = chunk[' SECCODE '].value_counts()
    securities_vc.append(chunk_vc)
summary_vc = pd.concat(securities_vc)
final_vc = summary_vc.groupby(summary_vc.index).sum()
print(final_vc)

Рассмотрим несколько более сложный пример. Теперь мы хотим понять, как менялось распределение операций с бумагами по периодам, но у нас есть ограничения памяти относительно полного набора данных. Мы можем подойти к этой задаче двумя путями.

Один из способов – прочитать только те столбцы, с которыми нам нужно работать как с обычным, полным фреймом данных, а затем использовать метод GroupBy. Вот как это будет выглядеть:

df_operations = pd.read_csv("data.csv", usecols=['SETTLEDATE', 'SECCODE'])
id_securities_counts = \
df_operations[' SECCODE '].groupby(df_operations['SETTLEDATE']).value_counts()
print(id_securities_counts)

Мы также можем достичь того же результата с помощью пакетной обработки.

chunk_iter = pd.read_csv("data.csv", chunksize=500000)
df_list = []
for chunk in chunk_iter:
    df_temp = chunk['SECCODE'].groupby(chunk['SETTLEDATE']).value_counts()
    df_list.append(df_temp)
final_df = pd.concat(df_list)
id_securities_counts = final_df.groupby(final_df.index).sum()

В этой статье мы поделились ещё одним испробованным способом по работе с большими данными на ограниченных по мощности аппаратных средствах. Надеемся, эта информация будет вам полезна, и вы сможете использовать её в работе.