DAG файл: создание и работа с ним
Для подготовки витрин данных SDP Analytics используется ETL оркестратор Airflow. Основным инструментом Airflow являются DAG файлы, расшифровывается как Directed Acyclic Graph - направленный ациклический граф.
По сути своей DAG является инструкцией с описанием последовательности операций над данными для Airflow. DAG пишется пользователем на Python в удобной для него IDE среде разработки.
Обычно DAG состоит из нескольких частей, рассмотрим их на примере простого DAGа по созданию таблицы.
- Импортируем необходимые библиотеки и инструменты для подключения к БД
from airflow import DAG
from datetime import datetime
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.hooks.postgres_hook import PostgresHook
- Задаем параметры запуска DAG'а подробнее
with DAG(
dag_id = "bootcamp4",
start_date = datetime.now(),
schedule_interval = None,
catchup = False,
tags = ['bootcamp']
) as dag:
- Определяем параметры для подключения к БД, id конекта задается в подключениях Airflow
src = PostgresHook(postgres_conn_id='admin_instance')
src_conn = src.get_conn()
- Далее идут блоки задач которые должен выполнять DAG
#Удалить таблицу таблицы
delete_table = PostgresOperator(
task_id ="delete_table",
postgres_conn_id = "admin_instance",
sql = """
DROP TABLE superset1_adb.test_b1
"""
)
#Создаем таблицу в БД Analytics
create_new_table_1 = PostgresOperator(
task_id = "create_new_table_1",
trigger_rule = 'one_failed',
postgres_conn_id = "admin_instance",
sql = """
CREATE TABLE IF NOT EXISTS superset1_adb.test_b1 (
code int8 NULL
);
"""
)
- Задаем последовательность выполнения операторов в DAG'е,
delete_table >> create_new_table_1
После того как DAG написан его необходимо загрузить в систему. Для этого необходимо авторизоваться на портале и пройти по ссылке в SDP Analytics. Далее открыть доступные области Airflow и у области куда необходимо добавить DAG нажать Список DAGs.
В открывшимся окне мы увидим список всех DAG файлов этой рабочей области, для добавление нового необходимо нажать кнопку Добавить сверху справа, перетащить файл в открывшуюся область и сохранить.
Изменение уже загруженного DAG файл происходит путем перезаписи текущего.
После загрузки DAGа он появляется в списке в Airflow, с наименованием, указанным в поле dag_id, в нашем примере это "bootcamp4"
При заходе в Airflow мы попадаем на стартовую страницу DAGs, на ней мы видим список всех загруженных сценариев, в том числе и наш и краткую информацию по ним.
Для запуска DAGа используется кнопка
колонки с действиями. На этом создание и запуск DAGа заканчивается.Однако основная функция Airflow заключается не только в запуске, но и в мониторинге и управление работой DAGов, рассмотрим предназначенные для этого инструменты.
После того как сценарий запущен мы можем увидеть краткую статистику его работы, если пройти по имени DAGа, то можно получить доступ к более развернутой информации.
Каждая вкладка информационного окна DAGа позволяет посмотреть различные статистики и данные, их более подробное описание находится отдельно.
Через меню Browse - Dag Runs мы можем ознакомится с уже отработавшими сценариями.
Здесь мы видим результат запуска отдельных экземпляров DAG запущенных в конкретное время. По полю Dag Id мы можем перейти к DAG, по полю Run Id к конкретному экземпляру задачи для данного DAG. С помощью меню Actions можно массово изменить статус запуска DAG.
Для мониторинга выполнения большого количества заданий в разных DAG существует интерфейс Browse – Task Instance.
В нем представлен список задач из всех отработавших DAG с информацией по ним (подробнее).