What are the DAG and How to Create Them Using Airflow?

Modern

Airflow is the new-age orchestrator to create, schedule, and monitor workflows. You can write dynamic workflows using python language in Airflow. Workflow is a directed acyclic graph in the airflow that represents the data pipeline.

In this article you have learned how to create DAG and how to create them using airflow? Airflow is undoubtedly the most accepted orchestrator in today’s Apache spark consulting services company which provides us the way to write the most dynamic data pipelines.

What makes Airflow Unique?

Modern

Scalable: In Airflow, you can run data pipelines on multiple machines. You can run airflow workflows on Celery or Kubernetes, making it highly scalable.

Multiple UI: Airflow has multiple UI for running data pipelines that allow debugging, rerunning, and monitoring workflows in the most powerful manner.

Dynamic: In Airflow, you write your workflows using python, which means it comes with the flexibility and power of a programming language making it more dynamic.

What is DAG in Airflow?

DAG is a directed acyclic graph that represents a data pipeline. The below diagram shows a data pipeline where we ingest the data, analyze the input, and check the data for correctness. If an error is found, send an email for the error and generate a report. If no issues are found, then save the data and generate the report. What is the most important feature of DAG is that we cannot have loops in the workflow. Every node in the below DAG is called a Task and every task is connected to another task using dependencies.

Modern

What are Operators in Airflow?

Every Task in a DAG is defined by an operator. There are various types of operators in Airflow. Operators provide an abstraction over underlying technology.

Action Operators: As the name suggests, these operators are used to perform an action such as Bash Operator is used to perform some bash commands, and python operator is used to write python code to perform some transformation or simple logic in your pipeline. SQL Operators, on the other hand, are used to connecting to some RDBSM database to run SQL queries against the database. There are many other action operators in the Airflow as well.

Transfer Operators: As the name suggests, these operators are used to transfer data. For example, MongoDB-S3 is used to transfer data from MongoDB to AWS S3, whereas S3-Redshift is used to transfer data from AWS S3 to Redshift. There are a number of other transfer operators used for data migration in your pipeline.

Modern

Sensor Operators: These last kinds of operators wait for something to happen before proceeding with the data pipeline. For example, File sensors wait for a file to get download or arrive at a specific location to proceed further. Similarly, SQL Sensor waits for some SQL query to return true before going to the next step in the data pipeline.

Airflow Core Components

Modern

Web Server: This component of Airflow is used to expose UI to the outside world using which you can monitor your workflows.

Scheduler: This component is responsible for scheduling the task at defined intervals.

Meta Store: This is a database that stores the meta information of already run jobs so that the state can be reflected on the UI.

Executor: The executor defines how the task in a DAG will run. For example, a Sequential executor means that the tasks are executed sequentially on a single machine one after the other, whereas in a Local Executor, tasks are executed on one machine but could be run parallelly as multiple threads. In celery executors, tasks can be assigned on multiple machines connected as workers, and lastly, In Kubernetes executors, tasks can run on the Kubernetes cluster as Pods.

Worker: Finally worker is the machine where code is actually executed.

First DAG Explained

from airflow import DAG from datetime import datetime from airflow.providers.postgres.operators.postgres import PostgresOperator from airflow.providers.http.sensors.http import HttpSensor from airflow.providers.http.operators.http import SimpleHttpOperator from airflow.operators.python import PythonOperator from pandas import json_normalize import json def _process_user(ti): user = ti.xcom_pull(task_ids="extract_user") user = user['results'][0] processed_user = json_normalize({ 'firstname': user['name']['first'], 'lastname': user['name']['last'], 'country': user['location']['country'], 'username': user['login']['username'], 'password': user['login']['password'], 'email': user['email'] }) processed_user.to_csv('/tmp/processed_user.csv', index=None, header=False) with DAG ('first_dag_v3', start_date=datetime(2022,1,1), schedule_interval='@daily', catchup=False, tags=['first_dag']) as dag: create_table = PostgresOperator( task_id='create_table', postgres_conn_id='postgres', sql=''' CREATE TABLE IF NOT EXISTS users ( firstname TEXT NOT NULL, lastname TEXT NOT NULL, country TEXT NOT NULL, username TEXT NOT NULL, password TEXT NOT NULL, email TEXT NOT NULL );''' ) is_api_available = HttpSensor( task_id='is_api_available', http_conn_id='user_api', endpoint='api/' ) extract_user = SimpleHttpOperator( task_id='extract_user', http_conn_id='user_api', endpoint='api/', method='GET', response_filter=lambda response: json.loads(response.text), log_response=True ) process_user = PythonOperator( task_id='process_user', python_callable=_process_user ) create_table >> is_api_available >> extract_user >> process_user

The above DAG is scheduled to run daily at 00:00 midnight starting from 1 January 2022. It contains five tasks:

createtable: The first task is created using Postgres Operator which means it will be connected to a Postgres operator that will run the SQL query defined against the database. In this task, we are creating a table named user in the Postgres database. The connection to connect to the database is defined through Airflow UI.

Is_api_available: The second task in the above DAG is created using HTTP Sensor operator which will hit an endpoint and proceed further only when it gets a success code (HTTP code 200).

extract_user: The third task in the above pipeline uses SimpleHTTP Operator to run HTTP method against an endpoint. The connection to the endpoints is again defined in Airflow UI. In the above task, we are running HTTP Get method against an endpoint to get a response back in form of JSON.

process_user: The final task in the above data pipeline is used to define a python function to process the JSON received in the last step.

Read more