ym88659208ym87991671
Создание пользовательских сценариев ETL-процессов | Документация для разработчиков

Создание пользовательских сценариев ETL-процессов

Обновлено 11 сентября 2023

Большинство процессов обработки данных строятся на определении набора «задач» для извлечения, преобразования, загрузки данных.

Так как Airflow является оркестратором ETL, процессы обработки данных, или пайплайны, в Airflow описываются при помощи DAG (Directed Acyclic Graph).

Ниже описан типовой процесс создания сценария обработки данных, в котором шаги ETL-процесса разработаны на языке Python:

  1. В начале создания DAG’а, как и в любом Python-скрипте, в первую очередь импортируются необходимые библиотеки (как библиотеки самого Airflow, так и другие библиотеки Python)
from airflow import DAG
from datetime import datetime
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.hooks.postgres_hook import PostgresHook
  1. После этого объявляется сам DAG посредством указания необходимых параметров, таких как дата и время старта, расписание запусков, расчет за предыдущие даты.
with DAG(
dag_id = "test_etl",
start_date = datetime(2021, 122, 1),
shedule_interval = None,
catchup = False,
tags = ('sdp_analytics')
) as dag:

dag_id - имя дага, которое будет отображаться в Airflow, может отличатся от имени файла

start_date - дата запуска, в данном случае установлена на 1 декабря 2021 года, если дату не заполнить, то DAG может запустить только пользователь.

schedule_interval - интервал запуска. Может принимать несколько значений

  • None - Не планируйте, используйте исключительно для групп доступности баз данных, запускаемых извне.

  • \@once - Расписание один и только один раз

  • \@hourly - Запускать раз в час в начале часа

  • \@daily - Запускать раз в день в полночь

  • \@weekly - Запускайте раз в неделю в полночь в воскресенье утром

  • \@monthly - Запускать раз в месяц в полночь первого дня месяца

  • \@yearly - Выполняется один раз в год в полночь 1 января.

catchup – настройка, отвечающая за то, будет ли DAG наверстывать пропущенное выполнение, и указывается тег, для упрощения поиска в Airflow.

  1. Далее указываются различные параметры в виде переменных, классов, функций и других конструкций на языке Python, которые так или иначе будут использоваться в ETL/ELT-процессе.
src = PostgresHook(postgres_conn_id='gp_test')
target = PostgresHook(postgres_conn_id='etl_test')
src_conn = src.get_conn()
trg_conn = target.get_conn()
src_cursor = src.conn_cursor()

def insert (**kwargs):
src_cursor.execute("SELECT * FROM test.cities;")
target.insert_rows(table="sdpa.cities", rows=src_cursor)
  1. После этого определяются таски DAG – шаги задачи, отвечающие за определенную операцию в процессе. В качестве оператора могут выступать операторы, сенсоры или триггеры Airflow.
create_new_table = PostgresOperator(
task_id = "Create_new_table",
postgres_conn_id = "etl_test",
sql = """
CREATE TABLE IF NOT EXISTS sdpa.cities (
city varchar (100) ,
population numberic(8, 1) ,
lat float4 ,
lon float4 ,
region_name varchar(100) ,
region_ao varchar(100) ,
region_iso_code varchar(10) ,
federal_district varchar(50) ,
okato int8 ,
oktmo int8 ,
kladr_id int8 ,
fias_id varchar(100) ,
place_id numeruc(12, 1) );
"""
)

download_data_into_new_table = PythonOperator(
task_id = "Download_data_into_new_table",
python_callable = insert
)
  1. В конце необходимо указать порядок запуска тасков в виде цепочки зависимостей.
create_new table >> download_data_into_new_table >> create_join_table >> insert_join_data_into_join_table
ПАО Сбербанк использует cookie для персонализации сервисов и удобства пользователей.
Вы можете запретить сохранение cookie в настройках своего браузера.