Время прочтения: 3 мин.
Однажды у нас появилась задача: на регулярной основе делать выборку данных из txt-файла, содержащего более 160 млн. строк, весом около 12ГБ.
Библиотека pandas в Python легко справляется с подобными задачами. Но время обработки занимает десятки минут. Поэтому была найдена альтернатива в Go, что помогло сократить временные затраты до нескольких секунд.
В данной статье сведём нашу задачу к выводу строк из файла с детализацией расходов, произведённых в определённый временной диапазон. Структуру входного файла определим так: «дата,данные1,данные2,…», то есть данные разделены запятыми, на первом месте дата. Обрабатывать будем кусками. Кусков будет много, а выделение памяти для каждого из них — дорогостоящая операция, поэтому будем использовать sync.Pool, который позволит переиспользовать выделенную память, не дожидаясь сборщика мусора.
В функции main() укажем файл-источник, интересующий нас период определим переменными dateStart и dateEnd и распарсим их. Если всё Ok, запустим функцию SplitChunk().
func CheckErr(err error){
if err != nill {
fmt.Println(err)
}
}
func main() {
dateStart := "customDateStart"
dateEnd := "customDateEnd"
sourceName := "customfileName"
sourceName, err := os.Open(sourceName) if err != nil {
fmt.Println("Ошибка открытия файла", err)
return
}
file, err := os.Open(sourceName)
CheckErr(err)
defer file.Close()
checkDateStart, err := time.Parse("2006-01-02 15:04:05", dateStart)
CheckErr(err)
checkDateEnd, err := time.Parse("2006-01-02 15:04:05", dateEnd)
CheckErr(err)
SplitChunk(file, checkDateStart, checkDateEnd)
}
В функции SplitChunk() определим два вида пулов: в первый будем закидывать чанки (poolOfLines) из исходного файла, которые позже в WorkChunk() разделим построчно, и каждую строку поместим в poolOfString– второй вид пулов. Каждый пул будем обрабатывать в отдельной горутине.
func SplitChunk(f *os.File, onset time.Time, offset time.Time) error {
poolOfLines := sync.Pool{New: func() interface{} {
chunkOfLines:= make([]byte, 100*1024)
return chunkOfLines
}}
poolOfString := sync.Pool{New: func() interface{} {
chunkOfLines:= ""
return chunkOfLines
}}
t := bufio.NewReader(f)
var wg sync.WaitGroup
for {
buferOfPools := poolOfLines.Get().([]byte)
p, err := t.Read(buferOfPools)
buferOfPools = buferOfPools[:p]
if p == 0 {
if err != nil {
fmt.Println(err)
break
}
if err == io.EOF {
break
}
return err
}
nextLine, err := t.ReadBytes('\n')
if err != io.EOF {
buferOfPools = append(buferOfPools, nextLine...)
}
wg.Add(1)
go func() {
WorkChunk(buferOfPools, &poolOfLines, &poolOfString, onset, offset)
wg.Done()
}()
}
wg.Wait()
return nil
}
Далее в функции WorkChunk() нас ждёт основное действие: полученный кусок файла (poolOfLines) разделяем на строки, кидаем их в poolOfString. В отдельной горутине каждую строку сплитим, берём ту самую дату, которую сравниваем с заданным ранее периодом, и выводим её, если она в него попадает.
func WorkChunk(chunk []byte, poolOfLines *sync.Pool, poolOfString *sync.Pool, onset time.Time, offset time.Time) {
var wg2 sync.WaitGroup
parts := poolOfString.Get().(string)
parts = string(chunk)
poolOfLines.Put(chunk)
partsSlice := strings.Split(parts, "\n")
poolOfString.Put(parts)
chSize := 300
p := len(partsSlice)
threads := p / chSize
if p%chSize != 0 {
threads++
}
for j := 0; j < (threads); j++ {
wg2.Add(1)
go func(c int, m int) {
defer wg2.Done()
for j := c; j < m; j++ {
text := partsSlice[j]
if len(text) == 0 {
continue
}
partSlice := strings.SplitN(text, ",", 2)
partCreationDateString := partSlice[0]
partCreationDate, err := time.Parse("2006-01-02 15:04:05", partCreationDateString)
CheckErr(err)
if partCreationDate.After(onset) && partCreationDate.Before(offset) {
fmt.Println(text)
}
}
}(j*chSize, int(math.Min(float64((j+1)*chSize), float64(len(partsSlice)))))
}
wg2.Wait()
partsSlice = nil
}
В ходе таких манипуляций время обработки файла снизилось до 17-19 секунд. Не в каждой задаче sync.Pool столь заметно сокращает затраты ресурсов, но для этой задачи он подходит идеально.