ym88659208ym87991671
DAG файл: создание и работа с ним | Документация для разработчиков

DAG файл: создание и работа с ним

Обновлено 3 апреля 2023

Для подготовки витрин данных SDP Analytics используется ETL оркестратор Airflow. Основным инструментом Airflow являются DAG файлы, расшифровывается как Directed Acyclic Graph - направленный ациклический граф.

По сути своей DAG является инструкцией с описанием последовательности операций над данными для Airflow. DAG пишется пользователем на Python в удобной для него IDE среде разработки.

Обычно DAG состоит из нескольких частей, рассмотрим их на примере простого DAGа по созданию таблицы.

  1. Импортируем необходимые библиотеки и инструменты для подключения к БД
from airflow import DAG
from datetime import datetime
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.hooks.postgres_hook import PostgresHook
  1. Задаем параметры запуска DAG'а подробнее
with DAG(
dag_id = "bootcamp4",
start_date = datetime.now(),
schedule_interval = None,
catchup = False,
tags = ['bootcamp']
) as dag:
  1. Определяем параметры для подключения к БД, id конекта задается в подключениях Airflow
    src = PostgresHook(postgres_conn_id='admin_instance')
src_conn = src.get_conn()
  1. Далее идут блоки задач которые должен выполнять DAG
#Удалить таблицу таблицы
delete_table = PostgresOperator(
task_id ="delete_table",
postgres_conn_id = "admin_instance",
sql = """
DROP TABLE superset1_adb.test_b1
"""
)

#Создаем таблицу в БД Analytics
create_new_table_1 = PostgresOperator(
task_id = "create_new_table_1",
trigger_rule = 'one_failed',
postgres_conn_id = "admin_instance",
sql = """
CREATE TABLE IF NOT EXISTS superset1_adb.test_b1 (
code int8 NULL

);
"""
)
  1. Задаем последовательность выполнения операторов в DAG'е,
delete_table >> create_new_table_1

После того как DAG написан его необходимо загрузить в систему. Для этого необходимо авторизоваться на портале и пройти по ссылке в SDP Analytics. Далее открыть доступные области Airflow и у области куда необходимо добавить DAG нажать Список DAGs.

В открывшимся окне мы увидим список всех DAG файлов этой рабочей области, для добавление нового необходимо нажать кнопку Добавить сверху справа, перетащить файл в открывшуюся область и сохранить.

Изменение уже загруженного DAG файл происходит путем перезаписи текущего.

После загрузки DAGа он появляется в списке в Airflow, с наименованием, указанным в поле dag_id, в нашем примере это "bootcamp4"

При заходе в Airflow мы попадаем на стартовую страницу DAGs, на ней мы видим список всех загруженных сценариев, в том числе и наш и краткую информацию по ним.

Для запуска DAGа используется кнопка

Кнопка run
колонки с действиями. На этом создание и запуск DAGа заканчивается.

Однако основная функция Airflow заключается не только в запуске, но и в мониторинге и управление работой DAGов, рассмотрим предназначенные для этого инструменты.

После того как сценарий запущен мы можем увидеть краткую статистику его работы, если пройти по имени DAGа, то можно получить доступ к более развернутой информации.

Каждая вкладка информационного окна DAGа позволяет посмотреть различные статистики и данные, их более подробное описание находится отдельно.

Через меню Browse - Dag Runs мы можем ознакомится с уже отработавшими сценариями.

Здесь мы видим результат запуска отдельных экземпляров DAG запущенных в конкретное время. По полю Dag Id мы можем перейти к DAG, по полю Run Id к конкретному экземпляру задачи для данного DAG. С помощью меню Actions можно массово изменить статус запуска DAG.

Для мониторинга выполнения большого количества заданий в разных DAG существует интерфейс Browse – Task Instance.

В нем представлен список задач из всех отработавших DAG с информацией по ним (подробнее).

ПАО Сбербанк использует cookie для персонализации сервисов и удобства пользователей.
Вы можете запретить сохранение cookie в настройках своего браузера.