-9.3 C
New York
Monday, December 23, 2024

Introducing Amazon MWAA assist for Apache Airflow model 2.9.2


Amazon Managed Workflows for Apache Airflow (Amazon MWAA) is a managed orchestration service for Apache Airflow that considerably improves safety and availability, and reduces infrastructure administration overhead when establishing and working end-to-end knowledge pipelines within the cloud.

Immediately, we’re asserting the supply of Apache Airflow model 2.9.2 environments on Amazon MWAA. Apache Airflow 2.9.2 introduces a number of notable enhancements, akin to new API endpoints for improved dataset administration, superior scheduling choices together with conditional expressions for dataset dependencies, the mix of dataset and time-based schedules, and customized names in dynamic job mapping for higher readability of your DAGs.

On this put up, we stroll you thru a few of these new options and capabilities, how you should utilize them, and how one can arrange or improve your Amazon MWAA environments to Airflow 2.9.2.

With every new model launch, the Apache Airflow group is innovating to make Airflow extra data-aware, enabling you to construct reactive, event-driven workflows that may accommodate modifications in datasets, both between Airflow environments or in exterior techniques. Let’s undergo a few of these new capabilities.

Logical operators and conditional expressions for DAG scheduling

Previous to the introduction of this functionality, customers confronted important limitations when working with advanced scheduling situations involving a number of datasets. Airflow’s scheduling capabilities have been restricted to logical AND mixtures of datasets, which means {that a} DAG run would solely be created in spite of everything specified datasets have been up to date for the reason that final run. This inflexible strategy posed challenges for workflows that required extra nuanced triggering situations, akin to operating a DAG when any one in every of a number of datasets was up to date or when particular mixtures of dataset updates occurred.

With the discharge of Airflow 2.9.2, now you can use logical operators (AND and OR) and conditional expressions to outline intricate scheduling situations primarily based on dataset updates. This characteristic permits for granular management over workflow triggers, enabling DAGs to be scheduled each time a selected dataset or mixture of datasets is up to date.

For instance, within the monetary providers trade, a threat administration course of would possibly must be run each time buying and selling knowledge from any regional market is refreshed, or when each buying and selling and regulatory updates can be found. The brand new scheduling capabilities out there in Amazon MWAA permit you to specific such advanced logic utilizing easy expressions. The next diagram illustrates the dependency we have to set up.

The next DAG code accommodates the logical operations to implement these dependencies:

from airflow.decorators import dag, job
from airflow.datasets import Dataset
from pendulum import datetime

trading_data_asia = Dataset("s3://buying and selling/asia/knowledge.parquet")
trading_data_europe = Dataset("s3://buying and selling/europe/knowledge.parquet")
trading_data_americas = Dataset("s3://buying and selling/americas/knowledge.parquet")
regulatory_updates = Dataset("s3://regulators/updates.json")

@dag(
    dag_id='risk_management_trading_data',
    start_date=datetime(2023, 5, 1),
    schedule=((trading_data_asia | trading_data_europe | trading_data_americas) & regulatory_updates),
    catchup=False
)
def risk_management_pipeline():
    @job
    def risk_analysis():
        # Job for threat evaluation
        ...

    @job
    def reporting():
        # Job for reporting
        ...

    @job
    def notifications():
        # Job for notifications
        ...

    evaluation = risk_analysis()
    report = reporting()
    notify = notifications()

risk_management_pipeline()

To study extra about this characteristic, confer with Logical operators for datasets within the Airflow documentation.

Combining dataset and time-based schedules

With Airflow 2.9.2 environments, Amazon MWAA now has a extra complete scheduling mechanism that mixes the flexibleness of data-driven execution with the consistency of time-based schedules.

Think about a situation the place your group is liable for managing an information pipeline that generates each day gross sales studies. This pipeline depends on knowledge from a number of sources. Though it’s important to generate these gross sales studies each day to offer well timed insights to enterprise stakeholders, you additionally want to ensure the studies are updated and mirror vital knowledge modifications as quickly as doable. As an example, if there’s a big inflow of orders throughout a promotional marketing campaign, or if stock ranges change unexpectedly, the report ought to incorporate these updates to keep up relevance.

Relying solely on time-based scheduling for the sort of knowledge pipeline might result in potential points akin to outdated info and infrastructure useful resource wastage.

The DatasetOrTimeSchedule characteristic launched in Airflow 2.9 provides the aptitude to mix conditional dataset expressions with time-based schedules. Which means that your workflow could be invoked not solely at predefined intervals but additionally each time there are updates to the desired datasets, with the precise dependency relationship amongst them. The next diagram illustrates how you should utilize this functionality to accommodate such situations.

See the next DAG code for an instance implementation:

from airflow.decorators import dag, job
from airflow.timetables.datasets import DatasetOrTimeSchedule
from airflow.timetables.set off import CronTriggerTimetable
from airflow.datasets import Dataset
from datetime import datetime

# Outline datasets
orders_dataset = Dataset("s3://path/to/orders/knowledge")
inventory_dataset = Dataset("s3://path/to/stock/knowledge")
customer_dataset = Dataset("s3://path/to/buyer/knowledge")

# Mix datasets utilizing logical operators
combined_dataset = (orders_dataset & inventory_dataset) | customer_dataset

@dag(
    dag_id="dataset_time_scheduling",
    start_date=datetime(2024, 1, 1),
    schedule=DatasetOrTimeSchedule(
        timetable=CronTriggerTimetable("0 0 * * *", timezone="UTC"),  # Day by day at midnight
        datasets=combined_dataset
    ),
    catchup=False,
)
def dataset_time_scheduling_pipeline():
    @job
    def process_orders():
        # Job logic for processing orders
        cross

    @job
    def update_inventory():
        # Job logic for updating stock
        cross

    @job
    def update_customer_data():
        # Job logic for updating buyer knowledge
        cross

    orders_task = process_orders()
    inventory_task = update_inventory()
    customer_task = update_customer_data()

dataset_time_scheduling_pipeline()

Within the instance, the DAG shall be run beneath two situations:

  • When the time-based schedule is met (each day at midnight UTC)
  • When the mixed dataset situation is met, when there are updates to each orders and stock knowledge, or when there are updates to buyer knowledge, whatever the different datasets

This flexibility lets you create subtle scheduling guidelines that cater to the distinctive necessities of your knowledge pipelines, so that they run when crucial and incorporate the most recent knowledge updates from a number of sources.

For extra particulars on data-aware scheduling, confer with Information-aware scheduling within the Airflow documentation.

Dataset occasion REST API endpoints

Previous to the introduction of this characteristic, making your Airflow atmosphere conscious of modifications to datasets in exterior techniques was a problem—there was no choice to mark a dataset as externally up to date. With the brand new dataset occasion endpoints characteristic, you possibly can programmatically provoke dataset-related occasions. The REST API has endpoints to create, checklist, and delete dataset occasions.

This functionality permits exterior techniques and purposes to seamlessly combine and work together together with your Amazon MWAA atmosphere. It considerably improves your skill to increase your knowledge pipeline’s capability for dynamic knowledge administration.

For instance, operating the next code from an exterior system means that you can invoke a dataset occasion within the goal Amazon MWAA atmosphere. This occasion might then be dealt with by downstream processes or workflows, enabling higher connectivity and responsiveness in data-driven workflows that depend on well timed knowledge updates and interactions.

curl -X POST <https://{web_server_host_name}>/api/v1/datasets/occasions 
   -H 'Content material-Kind: utility/json' 
   -d '{"dataset_uri": "s3://bucket_name/bucket_key", "additional": { }}'

The next diagram illustrates how the completely different parts within the situation work together with one another.

To get extra particulars on use the Airflow REST API in Amazon MWAA, confer with Introducing Amazon MWAA assist for the Airflow REST API and internet server auto scaling. To study extra concerning the dataset occasion REST API endpoints, confer with Dataset UI Enhancements within the Airflow documentation.

Airflow 2.9.2 additionally contains options to ease the operation and monitoring of your environments. Let’s discover a few of these new capabilities.

Dag auto-pausing

Clients are utilizing Amazon MWAA to construct advanced knowledge pipelines with a number of interconnected duties and dependencies. When one in every of these pipelines encounters a difficulty or failure, it may end up in a cascade of pointless and redundant job runs, resulting in wasted sources. This drawback is especially prevalent in situations the place pipelines run at frequent intervals, akin to hourly or each day. A typical situation is a essential pipeline that begins failing throughout the night, and as a result of failure, it continues to run and fails repeatedly till somebody manually intervenes the subsequent morning. This may end up in dozens of pointless duties, consuming worthwhile compute sources and probably inflicting knowledge corruption or inconsistencies.

The DAG auto-pausing characteristic goals to handle this problem by introducing two new configuration parameters:

  • max_consecutive_failed_dag_runs_per_dag – This can be a world Airflow configuration setting. It means that you can specify the utmost variety of consecutive failed DAG runs earlier than the DAG is robotically paused.
  • max_consecutive_failed_dag_runs – This can be a DAG-level argument. It overrides the earlier world configuration, permitting you to set a customized threshold for every DAG.

Within the following code instance, we outline a DAG with a single PythonOperator. The failing_task is designed to fail by elevating a ValueError. The important thing configuration for DAG auto-pausing is the max_consecutive_failed_dag_runs parameter set within the DAG object. By setting max_consecutive_failed_dag_runs=3, we’re instructing Airflow to robotically pause the DAG after it fails three consecutive occasions.

from airflow.decorators import dag, job
from datetime import datetime, timedelta

@job
def failing_task():
    elevate ValueError("This job is designed to fail")

@dag(
    dag_id="auto_pause",
    start_date=datetime(2023, 1, 1),
    schedule_interval=timedelta(minutes=1),  # Run each minute
    catchup=False,
    max_consecutive_failed_dag_runs=3,  # Set the utmost variety of consecutive failed DAG runs
)
def example_dag_with_auto_pause():
    failing_task_instance = failing_task()

example_dag_with_auto_pause()

With this parameter, now you can configure your Airflow DAGs to robotically pause after a specified variety of consecutive failures.

To study extra, confer with DAG Auto-pausing within the Airflow documentation.

CLI assist for bulk pause and resume of DAGs

Because the variety of DAGs in your atmosphere grows, managing them turns into more and more difficult. Whether or not for upgrading or migrating environments, or different operational actions, you could have to pause or resume a number of DAGs. This course of can turn out to be a frightening cyclical endeavor as a result of it’s good to navigate via the Airflow UI, manually pausing or resuming DAGs one by one. These guide actions are time consuming and improve the chance of human error that may end up in missteps and result in knowledge inconsistencies or pipeline disruptions. The earlier CLI instructions for pausing and resuming DAGs might solely deal with one DAG at a time, making it inefficient.

Airflow 2.9.2 improves these CLI instructions by including the aptitude to deal with DAG IDs as common expressions, permitting you to pause or resume a number of DAGs with a single command. This new characteristic eliminates the necessity for repetitive guide intervention or particular person DAG operations, considerably decreasing the chance of human error, offering reliability and consistency in your knowledge pipelines.

For instance, to pause all DAGs producing each day liquidity reporting utilizing Amazon Redshift as an information supply, you should utilize the next CLI command with an everyday expression:

airflow dags pause —treat-dag-id-as-regex -y "^(redshift|daily_liquidity_reporting)"

Customized names for Dynamic Job Mapping

Dynamic Job Mapping was added in Airflow 2.3. This highly effective characteristic permits workflows to create duties dynamically at runtime primarily based on knowledge. As an alternative of counting on the DAG writer to foretell the variety of duties wanted prematurely, the scheduler can generate the suitable variety of copies of a job primarily based on the output of a earlier job. In fact, with nice powers comes nice obligations. By default, dynamically mapped duties have been assigned numeric indexes as names. In advanced workflows involving excessive numbers of mapped duties, it turns into more and more difficult to pinpoint the precise duties that require consideration, resulting in potential delays and inefficiencies in managing and sustaining your knowledge workflows.

Airflow 2.9 introduces the map_index_template parameter, a extremely requested characteristic that addresses the problem of job identification in Dynamic Job Mapping. With this functionality, now you can present customized names to your dynamically mapped duties, enhancing visibility and manageability throughout the Airflow UI.

See the next instance:

from airflow.decorators import dag
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

def process_data(knowledge):
    # Carry out knowledge processing logic right here
    print(f"Processing knowledge: {knowledge}")

@dag(
    dag_id="custom_task_mapping_example",
    start_date=datetime(2023, 1, 1),
    schedule_interval=None,
    catchup=False,
)
def custom_task_mapping_example():
    mapped_processes = PythonOperator.partial(
        task_id="process_data_source",
        python_callable=process_data,
        map_index_template="Processing supply={{ job.op_args[0] }}",
    ).increase(op_args=[["source_a"], ["source_b"], ["source_c"]])

custom_task_mapping_example()

The important thing facet within the code is the map_index_template parameter specified within the PythonOperator.partial name. This Jinja template instructs Airflow to make use of the values of the ops_args atmosphere variable because the map index for every dynamically mapped job occasion. Within the Airflow UI, you will notice three job situations with the indexes source_a, source_b, and source_c, making it simple to establish and observe the duties related to every knowledge supply. In case of failures, this functionality improves monitoring and troubleshooting.

The map_index_template characteristic goes past easy template rendering, providing dynamic injection capabilities into the rendering context. This performance unlocks higher ranges of flexibility and customization when naming dynamically mapped duties.

Check with Named mapping within the Airflow documentation to study extra about named mapping.

TaskFlow decorator for Bash instructions

Writing advanced Bash instructions and scripts utilizing the standard Airflow BashOperator could convey challenges in areas akin to code consistency, job dependencies definition, and dynamic command era. The brand new @job.bash decorator addresses these challenges, permitting you to outline Bash statements utilizing Python capabilities, making the code extra readable and maintainable. It seamlessly integrates with Airflow’s TaskFlow API, enabling you to outline dependencies between duties and create advanced workflows. It’s also possible to use Airflow’s scheduling and monitoring capabilities whereas sustaining a constant coding model.

The next pattern code showcases how the @job.bash decorator simplifies the combination of Bash instructions into DAGs, whereas utilizing the total capabilities of Python for dynamic command era and knowledge processing:

from airflow.decorators import dag, job
from datetime import datetime, timedelta

default_args = {
    'proprietor': 'airflow',
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

# Pattern buyer knowledge
customer_data = """
id,title,age,metropolis
1,John Doe,35,New York
2,Jane Smith,42,Los Angeles
3,Michael Johnson,28,Chicago
4,Emily Williams,31,Houston
5,David Brown,47,Phoenix
"""

# Pattern order knowledge
order_data = """
order_id,customer_id,product,amount,value
101,1,Product A,2,19.99
102,2,Product B,1,29.99
103,3,Product A,3,19.99
104,4,Product C,2,14.99
105,5,Product B,1,29.99
"""

@dag(
    dag_id='task-bash-customer_order_analysis',
    default_args=default_args,
    start_date=datetime(2023, 1, 1),
    schedule_interval=timedelta(days=1),
    catchup=False,
)
def customer_order_analysis_dag():
    @job.bash
    def clean_data():
        # Clear buyer knowledge
        customer_cleaning_commands = """
            echo '{}' > cleaned_customers.csv
            cat cleaned_customers.csv | sed 's/,/;/g' > cleaned_customers.csv
            cat cleaned_customers.csv | awk 'NR > 1' > cleaned_customers.csv
        """.format(customer_data)

        # Clear order knowledge
        order_cleaning_commands = """
            echo '{}' > cleaned_orders.csv
            cat cleaned_orders.csv | sed 's/,/;/g' > cleaned_orders.csv
            cat cleaned_orders.csv | awk 'NR > 1' > cleaned_orders.csv
        """.format(order_data)

        return customer_cleaning_commands + "n" + order_cleaning_commands

    @job.bash
    def transform_data(cleaned_customers, cleaned_orders):
        # Remodel buyer knowledge
        customer_transform_commands = """
            cat {cleaned_customers} | awk -F';' '{{printf "%s,%s,%sn", $1, $2, $3}}' > transformed_customers.csv
        """.format(cleaned_customers=cleaned_customers)

        # Remodel order knowledge
        order_transform_commands = """
            cat {cleaned_orders} | awk -F';' '{{printf "%s,%s,%s,%s,%sn", $1, $2, $3, $4, $5}}' > transformed_orders.csv
        """.format(cleaned_orders=cleaned_orders)

        return customer_transform_commands + "n" + order_transform_commands

    @job.bash
    def analyze_data(transformed_customers, transformed_orders):
        analysis_commands = """
            # Calculate whole income
            total_revenue=$(awk -F',' '{{sum += $5 * $4}} END {{printf "%.2f", sum}}' {transformed_orders})
            echo "Complete income: $total_revenue"

            # Discover clients with a number of orders
            customers_with_multiple_orders=$(
                awk -F',' '{{orders[$2]++}} END {{for (c in orders) if (orders[c] > 1) printf "%s,", c}}' {transformed_orders}
            )
            echo "Clients with a number of orders: $customers_with_multiple_orders"

            # Discover hottest product
            popular_product=$(
                awk -F',' '{{merchandise[$3]++}} END {{max=0; for (p in merchandise) if (merchandise[p] > max) {{max=merchandise[p]; well-liked=p}}}} END {{print well-liked}}'
            {transformed_orders})
            echo "Hottest product: $popular_product"
        """.format(transformed_customers=transformed_customers, transformed_orders=transformed_orders)

        return analysis_commands

    cleaned_data = clean_data()
    transformed_data = transform_data(cleaned_data, cleaned_data)
    analysis_results = analyze_data(transformed_data, transformed_data)

customer_order_analysis_dag()

You may study extra concerning the @job.bash decorator within the Airflow documentation.

Arrange a brand new Airflow 2.9.2 atmosphere in Amazon MWAA

You may provoke the setup in your account and most well-liked AWS Area utilizing the AWS Administration Console, API, or AWS Command Line Interface (AWS CLI). If you happen to’re adopting infrastructure as code (IaC), you possibly can automate the setup utilizing AWS CloudFormation, the AWS Cloud Growth Package (AWS CDK), or Terraform scripts.

Upon profitable creation of an Airflow 2.9 atmosphere in Amazon MWAA, sure packages are robotically put in on the scheduler and employee nodes. For a whole checklist of put in packages and their variations, confer with Apache Airflow supplier packages put in on Amazon MWAA environments. You may set up further packages utilizing a necessities file.

Improve from older variations of Airflow to model 2.9.2

You may make the most of these newest capabilities by upgrading your older Airflow model 2.x-based environments to model 2.9 utilizing in-place model upgrades. To study extra about in-place model upgrades, confer with Upgrading the Apache Airflow model or Introducing in-place model upgrades with Amazon MWAA.

Conclusion

On this put up, we introduced the supply of Apache Airflow 2.9 environments in Amazon MWAA. We mentioned how a number of the newest options added within the launch allow you to design extra reactive, event-driven workflows, akin to DAG scheduling primarily based on the results of logical operations, and the supply of endpoints within the REST API to programmatically create dataset occasions. We additionally supplied some pattern code to point out the implementation in Amazon MWAA.

For the entire checklist of modifications, confer with Airflow’s launch notes. For extra particulars and code examples on Amazon MWAA, go to the Amazon MWAA Consumer Information and the Amazon MWAA examples GitHub repo.

Apache, Apache Airflow, and Airflow are both registered emblems or emblems of the Apache Software program Basis in america and/or different nations.


Concerning the authors

Hernan Garcia is a Senior Options Architect at AWS, primarily based out of Amsterdam, working with enterprises within the Monetary Companies Business. He focuses on utility modernization and helps clients within the adoption of serverless applied sciences.

Parnab Basak is a Options Architect and a Serverless Specialist at AWS. He focuses on creating new options which are cloud native utilizing fashionable software program improvement practices like serverless, DevOps, and analytics. Parnab works carefully within the analytics and integration providers area serving to clients undertake AWS providers for his or her workflow orchestration wants.

Related Articles

LEAVE A REPLY

Please enter your comment!
Please enter your name here

Latest Articles