DAGs¶
A DAG is a collection of tasks or operators, each representing a distinct unit of work, and the dependencies between these tasks. The DAG defines the overall workflow logic, including the order in which tasks should be executed and the relationships between them. Each task typically represents a discrete action or computation that needs to be performed, such as running a script, executing a SQL query, or transferring data between systems.
├── dags
├── crawl
| ├── domain_drivers
| | ├── announcement_dag.py
| | ├── channel_dag.py
| | ├── conversation_dag.py
| | ├── conversation_info_dag.py
| | ├── conversation_message_dag.py
| | ├── course_dag.py
| | ├── member_dag.py
| | ├── new_announcement_comments.py
| | ├── space_dag.py
| | ├── update_space.py
| | └── message_dag.py
| | ├── ranged_channel_message_dag.py
| | └── ranged_conversation_message_dag.py
| |
| ├── websites
| | ├── article_dag.py
| | └── new_article_comment_dag.py
| |
| └── social_media_profiles
| ├── friend_dag.py
| ├── new_comment_dag.py
| ├── profile_dag.py
| ├── publication_dag.py
| ├── replies.py
| ├── login_dag.py
| ├── activate_credentials.py
| └── stories_dag.py
|
├── crawlserver_periodic_tasks
| ├── blocked_jobs.py
| ├── check_private_accounts.py
| ├── inactive_channels.py
| ├── inactive_conversations.py
| ├── push_file_minio.py
| └── push_social_media_files_minio.py
|
└── scheduler
├── scheduler_dag.py
├── scheduler_domain.py
├── scheduler_google_classroom.py
└── scheduler_website.py
In the context of our project, we have three type of dags :
The scheduler:
Scheduler dags are responsible for retrieving the necessairy informations (the pending jobs) from crawlserver mongo db to schedule the crawl. We have 3 schedulers:
-
scheduler_dag schedule the data collection for social media profiles
-
schedule_domain: schedule the data collection for domain based drivers like Microsoft Teams and Google Classroom
-
schedule_website: schedule the data collection for WEBSITE.
The crawlers:
All the dags that are inside the folder crawl are responsible for collecting data for the jobs that are retrieved by the schedulers. Once the crawl ends, they update the state of the jobs to pending and set the new execution date or they set the jobs as failed. The crawlsers are divided into 3 categories:
- domain_drivers: data collection for domain based drivers
- Websites: data collection for websites
- social_media_profiles: data collection for social media profiles
Crawlserver periodic tasks:
Each periodic task is designed to perform specific tasks at regular intervals and they don't belong to the data collection pipeline.
Declaring a DAG¶
To create a DAG using the DAG class from airflow, you need to define the DAG configuration attributes as follows:
- dag_id: a unique identifier for the DAG, used to identify and refer to the DAG throughout the project. In the example below: "profile_dag".
- schedule_interval: a parameter defines how often the DAG should run. "@once" indicates that the DAG should run only once.
- catchup: a parameter determines whether the DAG should run and catch up on any missed or backfilled tasks. By setting it to False, the DAG will only execute tasks for the current and future execution dates, skipping any missed ones.
- on_success_callback: a parameter specifies a function to be called after the successful execution of the DAG. In the below example, it is set to the cleanup_xcom function. This function will be invoked to delete the XCom entries related to the DAG, ensuring a clean state for subsequent runs.
with DAG(
dag_id="profile_dag",
schedule_interval="@once",
default_args=default_args,
catchup=False,
on_success_callback=cleanup_xcom,
) as dag:
The following line represents a task within the DAG.
get_config = PythonOperator(
task_id="get_accounts", provide_context=True, python_callable=print_context
)
crawl_profile_task = ProfileOperator(
pseudo="Arkunir",
task_id="profile_dag",
driver_name="twitter",
)
- The PythonOperator executes a Python callable, which is the print_context function. The provide_context=True parameter allows passing the Airflow context as an argument to the print_context function, enabling access to information about the task execution.
To define the task Dependencies as follows:
get_config >> crawl_profile_task
- The >> operator indicates that get_config must be executed before crawl_profile_task