Создание пользовательских сценариев ETL-процессов
Большинство процессов обработки данных строятся на определении набора «задач» для извлечения, преобразования, загрузки данных.
Так как Airflow является оркестратором ETL, процессы обработки данных, или пайплайны, в Airflow описываются при помощи DAG (Directed Acyclic Graph).
Ниже описан типовой процесс создания сценария обработки данных, в котором шаги ETL-процесса разработаны на языке Python:
- В начале создания DAG’а, как и в любом Python-скрипте, в первую очередь импортируются необходимые библиотеки (как библиотеки самого Airflow, так и другие библиотеки Python)
from airflow import DAG
from datetime import datetime
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.hooks.postgres_hook import PostgresHook
- После этого объявляется сам DAG посредством указания необходимых параметров, таких как дата и время старта, расписание запусков, расчет за предыдущие даты.
with DAG(
dag_id = "test_etl",
start_date = datetime(2021, 122, 1),
shedule_interval = None,
catchup = False,
tags = ('sdp_analytics')
) as dag:
dag_id - имя дага, которое будет отображаться в Airflow, может отличатся от имени файла
start_date - дата запуска, в данном случае установлена на 1 декабря 2021 года, если дату не заполнить, то DAG может запустить только пользователь.
schedule_interval - интервал запуска. Может принимать несколько значений
None - Не планируйте, используйте исключительно для групп доступности баз данных, запускаемых извне.
\@once - Расписание один и только один раз
\@hourly - Запускать раз в час в начале часа
\@daily - Запускать раз в день в полночь
\@weekly - Запускайте раз в неделю в полночь в воскресенье утром
\@monthly - Запускать раз в месяц в полночь первого дня месяца
\@yearly - Выполняется один раз в год в полночь 1 января.
catchup – настройка, отвечающая за то, будет ли DAG наверстывать пропущенное выполнение, и указывается тег, для упрощения поиска в Airflow.
- Далее указываются различные параметры в виде переменных, классов, функций и других конструкций на языке Python, которые так или иначе будут использоваться в ETL/ELT-процессе.
src = PostgresHook(postgres_conn_id='gp_test')
target = PostgresHook(postgres_conn_id='etl_test')
src_conn = src.get_conn()
trg_conn = target.get_conn()
src_cursor = src.conn_cursor()
def insert (**kwargs):
src_cursor.execute("SELECT * FROM test.cities;")
target.insert_rows(table="sdpa.cities", rows=src_cursor)
- После этого определяются таски DAG – шаги задачи, отвечающие за определенную операцию в процессе. В качестве оператора могут выступать операторы, сенсоры или триггеры Airflow.
create_new_table = PostgresOperator(
task_id = "Create_new_table",
postgres_conn_id = "etl_test",
sql = """
CREATE TABLE IF NOT EXISTS sdpa.cities (
city varchar (100) ,
population numberic(8, 1) ,
lat float4 ,
lon float4 ,
region_name varchar(100) ,
region_ao varchar(100) ,
region_iso_code varchar(10) ,
federal_district varchar(50) ,
okato int8 ,
oktmo int8 ,
kladr_id int8 ,
fias_id varchar(100) ,
place_id numeruc(12, 1) );
"""
)
download_data_into_new_table = PythonOperator(
task_id = "Download_data_into_new_table",
python_callable = insert
)
- В конце необходимо указать порядок запуска тасков в виде цепочки зависимостей.
create_new table >> download_data_into_new_table >> create_join_table >> insert_join_data_into_join_table