Время прочтения: 6 мин.

Предыстория

Когда-то давно четыре народа… К-хм. Не так давно на нашем кластере было признано использование Hive запросов нежелательным в силу того, что:

  1. Hive-запросы управляются TEZ, им по умолчанию выделяется максимум ресурсов, вплоть до того, что эти ресурсы перенаправляются с уже работающих Spark-сессий.
  2. Невозможно переопределить параметры выделения ресурсов на TEZ-сессии и установить для них пороговые значения как, например, было бы возможно с MapReduce.
  3. TEZ будет пытаться выполнить запрос, даже если запрос пользователя не оптимизирован, например, обращается к весомой партиционированной таблице без использования партиций.
  4. Мониторинг работы таких запросов менее информативен и в основном показывает информацию о факте выполнения запроса в моменте, без показа текста запроса, информации по подзадачам и т.д.

Альтернативой Hive-запросов был определен Spark. Он работает более дружелюбно, потребление ресурсов легче контролировать, а анализ spark-сессий гораздо проще: можно увидеть подзадачи и диаграмму запроса. Однако пользователи-аналитики лучше знают SQL и по этой причине предпочитают работать с HiveQL в силу почти идентичного синтаксиса, удобства работы в HUE. Учитывая, что аналитикам нужно некоторое время на обучение новому инструменту, они продолжали работать с Hive. Но закон есть закон. На плечи администраторов кластера выпала задача “убивать(Так как приложения помечаются статусом KILLED, от англ.: убито) Hive-приложения и, побыв какое-то время своего рода «палачом», устав от необходимости периодически отслеживать запущенные приложения, мы решили автоматизировать этот процесс, чтобы пользователи обижались на «злую машину», а не админа.

К этому моменту я уже имел опыт обращения к API ResourceManager’а в целях сбора истории приложений кластера. К слову, Hadoop предлагает огромный функционал взаимодействия с его экосистемой через API.

Hadoop API

Я не перестаю восхищаться титаническим трудом разработчиков Hadoop и предоставляемые API – это одна из причин уважения (помимо этого прекрасного жёлтого слона, при виде которого в голове начинает играть “What a Wonderful World” Луи Армстронга). Перечислю основные возможности:

  • Полноценное взаимодействие с HDFS;
  • Управление MapReduce приложениями и их историей;
  • Управление Yarn-службами;
  • Взаимодействие с Yarn Timeline;
  • Взаимодействие с NodeManager;
  • Взаимодействие с ResourceManager,

Последним я и воспользовался в своей задаче.

Вероятно, вам тоже уже знаком веб-интерфейс ResourceManager’а, с помощью которого можно следить за объёмом занимаемых приложением ресурсов, перечнем работающих приложений, отработавших и т.д.

Моя задача была среди работающих приложений найти те, что управляются TEZ (означает, что это Hive-запрос), взять его id и попросить Yarn прекратить его работу. Не трудоёмкий процесс, но рутинный и требует периодического отвлечения от основных задач. Так, опираясь на указанные действия, я и сформулировал алгоритм: с некой периодичностью получать список текущих приложений и находить те, что нас не устраивают, и останавливать их выполнение.

Для того чтобы получить список текущих приложений, сформируем следующий GET запрос:

http://<<host>>:<<port>>/ws/v1/cluster/apps/state=<<states>>,

где:

            host – Хост ResourceManager’а;

            port – Порт ResourceManager’а (Стандартное значение 8088);

            states – Список состояний приложений, которые мы хотим получить: ACCEPTED –принятых в работу, RUNNING – выполняющихся в текущий момент и т.д.

Для того, чтобы остановить приложение, воспользуемся следующим PUT запросом:

http://<<host>>:<<port>/ws/v1/cluster/apps/<<application_id>>/state?user.name=<<user>>,

где:

            application_id – id приложения, которое мы можем получить из предыдущего запроса;

            user – Пользователь, от лица которого приложение будет приостановлено (Здесь, на самом деле, может быть почти любая строка, например, “Michael_Jackson_is_not_my_dad”).
Перейдём к написанию кода.

Код

Для того чтобы общаться с API, мы можем использовать requests или subprocess + curl. В своей задаче я использовал оба варианта, так как кластер работает под Kerberos и по ряду причин установить gssapi и, как следствие, библиотеку requests_kerberos, из которой можно было бы взять класс с Kerberos-авторизацией, не предоставляется возможным. Поэтому, для получения списка приложений я воспользовался requests в силу простоты работы с json и subprocess + curl, а также в силу поддержки авторизации через Kerberos.

Подгрузим библиотеки:

import requests
import subprocess

Напишем функцию для получения списка приложений:

def get_all_apps(host: str, port: str, states: list[str]) -> list:
    r = requests.get('http://{}:{}/ws/v1/cluster/apps/?state={}'.format(host, port, ','.join(states)))
    return r.json().get('apps').get('app')

Теперь сформируем список фильтров, по которым мы отберём нежелательные:

filters = [
    lambda x: (x.get('applicationType') == 'TEZ') & (x.get('elapsedTime') / (60 * 1000) >= 5)   
]

И функцию, которая вернёт отфильтрованный список:

def get_target_apps(apps: list[dict], filters: list[callable]) -> list:        
    return [x for x in apps if any([func(x) for func in filters])]

Теперь функция, чтобы сделать PUT запрос, на изменение состояния приложения:

def kill_app(host: str, port: str, app_id: str):
    subprocess.run([
        'curl',
        '-X', 'PUT',
        '-H', 'Content-Type: application/json',
        '-d', '{"state": "KILLED"}',
        '--negotiate', '--user', ':',
        'http://{}:{}/ws/v1/cluster/apps/{}/state?user.name=cluster_defender'.format(host, port, app_id)
    ], stdout=subprocess.PIPE)

Наконец:

if __name__ == '__main__':
    all_apps = get_all_apps(host, port, states)
    target_apps = get_target_apps(all_apps, filters)
    
    [kill_app(x.get('id')) for x in target_apps]

Периодическое исполнение

Следующим шагом будет постановка на периодическое исполнение. Сделать это можно как средствами python (например, при помощи цикла while и функции sleep модуля time), так и планировщиками: cron и oozie. Я выбрал oozie, так как это инструмент экосистемы Hadoop, работу которого также можно отслеживать с помощью Hue или ResourceManager. Для того чтобы выставить расписание для задачи, нам нужно сделать следующее:

  1. сформировать три файла: файл со свойствами, файл с описанием алгоритма исполнения и файл-координатор, с информацией о времени выполнения соответственно:

coordinator.properties,

frequency=5 
startTime=2023-08-16T14:00Z
endTime=9999-12-31T23:59Z
timezone=Europe/Moscow

jobTracker=<< ResourceManager API endpoint >>
nameNode=<< HDFS API endpoint  >>
ownerIPA=your_awesome_login

ScriptFileName=main.py
ScriptFilePath=main.py

workflowPath=${nameNode}/user/${ownerIPA}/oozie
oozie.coord.application.path=${nameNode}/user/${ownerIPA}/oozie
oozie.libpath=${nameNode}/user/oozie/share/lib

workflow.xml

<workflow-app name="ClusterDefenderExecution" xmlns="uri:oozie:workflow:0.1">
    <start to="run-py-script"/>
        <action name="run-py-script">
            <shell xmlns="uri:oozie:shell-action:0.1">
    <job-tracker>${jobTracker}</job-tracker>
    <name-node>${nameNode}</name-node>
    <exec>${ScriptFileName}</exec>
    <file>${ScriptFilePath}#${ScriptFileName}</file>
    <capture-output/>
</shell>    
<ok to="end"/>
<error to="kill"/>
        </action>
        <kill name="kill">
            <message>Error occured during execution, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
        </kill>
        <end name="end"/>
</workflow-app>

coordinator.xml:

<coordinator-app name="ClusterDefenderCoordinator" frequency="${frequency}" start="${startTime}" end="${endTime}" timezone="${timezone}" xmlns="uri:oozie:coordinator:0.1">
    <action>
        <workflow>
            <app-path>${workflowPath}</app-path>
        </workflow>
    </action>
</coordinator-app>

2. Разместить скрипт, файлы workflow.xml и coordinator.xml на HDFS;

3. Запустить задачу используя следующую команду:

oozie job -config coordinator.properties -oozie <<Ссылка на API Oozie>> -run

4. Отслеживать и, при необходимости, корректировать выполнение в Hue или в терминале при помощи команд:

oozie job -info jobId -oozie <<Ссылка на API Oozie>>
oozie jobs -jobtype coordinator -oozie <<Ссылка на API Oozie>>

Итог

Таким образом, была решена задача по устранению нежелательных TEZ сессий на кластере. Надеюсь мой опыт окажется полезным, спасибо за внимание!