Intro

dag-factory 라는 yaml 기반 동적 DAG 생성 프로젝트를 간단히 살펴본다. dag-factory는 YAML 로 Apache Airflow DAG를 동적으로 생성하기 위한 패키지로 airflow 매니지드 서비스를 제공하는 Astronomer 에서 관리하고 있다.

Prerequisites

airflow && curiosity

Follow readme

simple dag's yaml

example_dag1:  
  default_args:  
    owner: 'jj.lee'  
    email: 'jj.lee@coinone.com'  
    email_on_failure: False  
    email_on_retry: False  
    retries: 1  
    retry_delay_min: 10  
    start_date: 2022-09-01  
  catup: False  
  schedule_interval: '0 0 1 * *'  
  concurrency: 1  
  max_active_runs: 1  
  default_view: 'tree'  # or 'graph', 'duration', 'gantt', 'landing_times'  
  orientation: 'LR'  # or 'TB', 'RL', 'BT'  
  description: 'this is an example dag!'  
  on_success_callback_name: print_hello  
  on_success_callback_file: /opt/airflow/plugins/print_hello.py  
  on_failure_callback_name: print_hello  
  on_failure_callback_file: /opt/airflow/plugins/print_hello.py  
  tasks:  
    task_1:  
      operator: airflow.operators.bash_operator.BashOperator  
      bash_command: 'echo 1'  
    task_2:  
      operator: airflow.operators.bash_operator.BashOperator  
      bash_command: 'echo 2'  
      dependencies: [task_1]  
    task_3:  
      operator: airflow.operators.bash_operator.BashOperator  
      bash_command: 'echo 3'  
      dependencies: [task_1]  
  
default:  
  default_args:  
    owner: "default_owner"  
    retries: 1  
    retry_delay_sec: 300  
    start_date: 2022-09-01  
  concurrency: 1  
  max_active_runs: 1  
  dagrun_timeout_sec: 600  
  default_view: "tree"  
  orientation: "LR"  
  schedule_interval: "0 1 * * *"  
  on_success_callback_name: print_hello  
  on_success_callback_file: /opt/airflow/plugins/print_hello.py  
  on_failure_callback_name: print_hello  
  on_failure_callback_file: /opt/airflow/plugins/print_hello.py

example_dag_factory.py

# Airflow 의 dag_folder 내 존재해야한다.
from airflow import DAG  
import dagfactory  
  
dag_factory = dagfactory.DagFactory("/opt/airflow/dags/config/example.yml")  
  
dag_factory.clean_dags(globals())  
dag_factory.generate_dags(globals())

Caution

Result

|400

code 는 dag-factory 를 불러오는 코드가 보인다. 그래프를 확인해보자.

|400

yaml 로 정의한 그래프가 잘 보인다.

|400

Conclusion

Reference