Время прочтения: 3 мин.

Однажды у нас появилась задача: на регулярной основе делать выборку данных из txt-файла, содержащего более 160 млн. строк, весом около 12ГБ.

Библиотека pandas в Python легко справляется с подобными задачами. Но время обработки занимает десятки минут. Поэтому была найдена альтернатива в Go, что помогло сократить временные затраты до нескольких секунд.

В данной статье сведём нашу задачу к выводу строк из файла с детализацией расходов, произведённых в определённый временной диапазон. Структуру входного файла определим так: «дата,данные1,данные2,…», то есть данные разделены запятыми, на первом месте дата. Обрабатывать будем кусками. Кусков будет много, а выделение памяти для каждого из них — дорогостоящая операция, поэтому будем использовать sync.Pool, который позволит переиспользовать выделенную память, не дожидаясь сборщика мусора.

В функции main() укажем файл-источник, интересующий нас период определим переменными dateStart и dateEnd и распарсим их. Если всё Ok, запустим функцию SplitChunk().

func CheckErr(err error){
    if err != nill {
        fmt.Println(err)
    }
}

func main() {
    dateStart := "customDateStart"
    dateEnd := "customDateEnd"
    sourceName := "customfileName"
    
    sourceName, err := os.Open(sourceName) if err != nil {
        fmt.Println("Ошибка открытия файла", err)
        return
    }
    
    file, err := os.Open(sourceName)
    CheckErr(err)
    defer file.Close()
    
    checkDateStart, err := time.Parse("2006-01-02 15:04:05", dateStart)
    CheckErr(err)
    
    checkDateEnd, err := time.Parse("2006-01-02 15:04:05", dateEnd)
    CheckErr(err)
    
    SplitChunk(file, checkDateStart, checkDateEnd)
}

В функции SplitChunk() определим два вида пулов: в первый будем закидывать чанки (poolOfLines) из исходного файла, которые позже в WorkChunk() разделим построчно, и каждую строку поместим в poolOfString– второй вид пулов. Каждый пул будем обрабатывать в отдельной горутине.

func SplitChunk(f *os.File, onset time.Time, offset time.Time) error {
    poolOfLines := sync.Pool{New: func() interface{} {
        chunkOfLines:= make([]byte, 100*1024)
        return chunkOfLines
    }}
    poolOfString := sync.Pool{New: func() interface{} {
        chunkOfLines:= ""
        return chunkOfLines
    }}
    t := bufio.NewReader(f)
    var wg sync.WaitGroup
    for {
        buferOfPools := poolOfLines.Get().([]byte)
        p, err := t.Read(buferOfPools)
        buferOfPools = buferOfPools[:p]
        if p == 0 {
            if err != nil {
                fmt.Println(err)
                break
            }
            if err == io.EOF {
                break
            }
            return err
        }
        nextLine, err := t.ReadBytes('\n')
        if err != io.EOF {
            buferOfPools = append(buferOfPools, nextLine...)
        }
        wg.Add(1)
        go func() {
            WorkChunk(buferOfPools, &poolOfLines, &poolOfString, onset, offset)
            wg.Done()
        }()
    }
    wg.Wait()
    return nil
}

Далее в функции WorkChunk() нас ждёт основное действие: полученный кусок файла (poolOfLines) разделяем на строки, кидаем их в poolOfString. В отдельной горутине каждую строку сплитим, берём ту самую дату, которую сравниваем с заданным ранее периодом, и выводим её, если она в него попадает.

func WorkChunk(chunk []byte, poolOfLines *sync.Pool, poolOfString *sync.Pool, onset time.Time, offset time.Time) {
    var wg2 sync.WaitGroup
    parts := poolOfString.Get().(string)
    parts = string(chunk)
    poolOfLines.Put(chunk)
    partsSlice := strings.Split(parts, "\n")
    poolOfString.Put(parts)
    chSize := 300
    p := len(partsSlice)
    threads := p / chSize
    if p%chSize != 0 {
        threads++
    }
    for j := 0; j < (threads); j++ {
        wg2.Add(1)
        go func(c int, m int) {
            defer wg2.Done()
            for j := c; j < m; j++ {
                text := partsSlice[j]
                if len(text) == 0 {
                    continue
                }
                partSlice := strings.SplitN(text, ",", 2)
                partCreationDateString := partSlice[0]
                partCreationDate, err := time.Parse("2006-01-02 15:04:05", partCreationDateString)
                CheckErr(err)
                if partCreationDate.After(onset) && partCreationDate.Before(offset) {
                    fmt.Println(text)
                }
            }
            
        }(j*chSize, int(math.Min(float64((j+1)*chSize), float64(len(partsSlice)))))
    }
    wg2.Wait()
    partsSlice = nil
}

В ходе таких манипуляций время обработки файла снизилось до 17-19 секунд. Не в каждой задаче sync.Pool столь заметно сокращает затраты ресурсов, но для этой задачи он подходит идеально.