Machine Learning, Анализ данных, Саморазвитие

Асинхронные http-запросы

Время прочтения: 3 мин.

В первой статье мы рассказывали, как создали скрипт на Python, который работает с excel файлом со списком IP-адресов серверов ТСВ, с помощью http запросов получает список каналов, а затем с каждого канала выгружает скриншоты за период с 8 утра до 8 вечера с установленным интервалом.

В данном скрипте для каждого скриншота формируется свой http запрос, который выполняется в среднем в течение 2 секунд. Учитывая количество выгружаемых скриншотов (примерно 25000 штук на момент функционирования этого скрипта, сейчас это количество возросло до 80000), на выгрузку уходит 12-15 часов.

Ввиду больших временных затрат необходимо было ускорить выгрузку. Для этого мы включили в скрипт модуль asyncio для асинхронного запуска программ, а также модуль для асинхронных http запросов – aiohttp. Суть улучшения проста: когда наш скрипт отправляет запрос на сервер, он ждёт ответа и простаивает, попусту тратя драгоценное время, поэтому предлагается не ждать ответа, а отправить ещё один запрос, только на другой сервер, а затем ещё и ещё, и так до тех пор, пока не придет ответ от одного из серверов – в этом случае мы обрабатываем ответ, то есть сохраняем скриншот, отправляем следующий запрос. Таким образом наша программа не простаивает и работает наиболее эффективно.

Вначале инициализируем «событийный» цикл, запускающий асинхронные задания – такие функции, которые могут прерывать своё выполнение – это нужно для переключения между ними:

loop = asyncio.get_event_loop()
    
    future = asyncio.ensure_future(work_with_server(servers, start_moment))
    loop.run_until_complete(future)

Далее создаем список сопрограмм, каждая из которых взаимодействует со своим сервером, но ограничиваем число одновременных запросов: 10 штук, чтобы не создать слишком большую нагрузку на сеть:

async def work_with_server(servers, start_moment):
    """
    Функция формирует список сопрограмм по каждому серверу
    Параметры:
        servers - список серверов
    Возвращаемого значения нет
    """
    sem = asyncio.Semaphore(10)
    tasks = []
    
    async with aiohttp.ClientSession() as session:        
        for idx, inf in servers[:].iterrows():
            serv_ip = inf[0]
            serv_name = inf[1]
            Path(os.path.join(root, start_moment, serv_name)).mkdir(exist_ok=True)
            serv_url = 'https://' + serv_ip + ':8080/login?username=uvk&password=uvk'
            task = asyncio.ensure_future(bound_fetch(sem, serv_url, session, serv_ip, serv_name, start_moment))
            tasks.append(task)
          
        results = [await f for f in tqdm(asyncio.as_completed(tasks), total=len(tasks), desc='servers_bar')]

Затем подключаем Семафор (наш ограничитель):

async def bound_fetch(sem, url, session, serv_ip, serv_name, start_moment):
    """
    Функция подключает Семафор
    Параметры:
        sem - aiohttp.Semaphore
        url - url-адрес текущего сервера
        session - сессия aiohttp
        serv_ip - ip-адрес текущего сервера
        serv_name - имя сервера (номер ГОСБ-номер ВСП)
    Возвращаемого значения нет
    """
    async with sem:
        await fetch(url, session, serv_ip, serv_name, start_moment)

Ну и здесь работаем с конкретным сервером:

  1. Создали функцию fetch
async def fetch(url, session, serv_ip, serv_name, start_moment):
    """
    Здесь и происходит главное: Получение id сессии, получение списка каналов и выгрузка скриншотов по каждому каналу
    Параметры:
        url - url-адрес сервера, с которым сейчас работаем
        session - сессия aiohttp
        serv_ip - ip-адрес текущего сервера
        serv_name - имя сервера (номер ГОСБ-номер ВСП)
        
    Возвращаемого значения нет
    """

2. Запрашиваем идентификатор сессии у сервера по URL и составляем запрос списка каналов:

async with session.get(url, verify_ssl=False) as response:
            content_text = await response.json()
            sid = content_text['sid']
            
        rqst_chnl = "https://" + serv_ip + ":8080/channels/?sid=" + sid

3. Получаем список каналов с текущего сервера по текущей сессии (sid — идентификатор сессии)

async with session.get(rqst_chnl, verify_ssl=False) as jopa:
            resp = await jopa.read()
            guids = []
            if resp:
                guids, names = channels_handler(resp)

4. Запускаем цикл, в котором происходит выгрузка скриншотов с текущего сервера по текущей сессии:

for guid, name in zip(tqdm(guids, desc='guids_bar', leave=False), names):
            Path(os.path.join(root, start_moment, serv_name, str(guid))).mkdir(exist_ok=True)
            for catch_timee in tqdm(catch_times, desc='screenshots', leave=False):
                async with session.get("https://" + \
                                  serv_ip + \
                                  ":8080/screenshot/" + \
                                  guid + \
                                  "?timestamp=" + \
                                  catch_timee + \
                                  '&sid=' + \
                                  sid, verify_ssl=False) as screen:
                    scr = screen.content_type

В итоге у нас получился новый скрипт, который справляется с задачей за 4-5 часов. Т.е. экономия времени составляет 10-11 часов и это успех!

Советуем почитать