airflow example_branch_datetime_operator 源码
airflow example_branch_datetime_operator 代码
文件路径:/airflow/example_dags/example_branch_datetime_operator.py
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
Example DAG demonstrating the usage of DateTimeBranchOperator with datetime as well as time objects as
targets.
"""
from __future__ import annotations
import pendulum
from airflow import DAG
from airflow.operators.datetime import BranchDateTimeOperator
from airflow.operators.empty import EmptyOperator
dag1 = DAG(
dag_id="example_branch_datetime_operator",
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=["example"],
schedule="@daily",
)
# [START howto_branch_datetime_operator]
empty_task_11 = EmptyOperator(task_id='date_in_range', dag=dag1)
empty_task_21 = EmptyOperator(task_id='date_outside_range', dag=dag1)
cond1 = BranchDateTimeOperator(
task_id='datetime_branch',
follow_task_ids_if_true=['date_in_range'],
follow_task_ids_if_false=['date_outside_range'],
target_upper=pendulum.datetime(2020, 10, 10, 15, 0, 0),
target_lower=pendulum.datetime(2020, 10, 10, 14, 0, 0),
dag=dag1,
)
# Run empty_task_11 if cond1 executes between 2020-10-10 14:00:00 and 2020-10-10 15:00:00
cond1 >> [empty_task_11, empty_task_21]
# [END howto_branch_datetime_operator]
dag2 = DAG(
dag_id="example_branch_datetime_operator_2",
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=["example"],
schedule="@daily",
)
# [START howto_branch_datetime_operator_next_day]
empty_task_12 = EmptyOperator(task_id='date_in_range', dag=dag2)
empty_task_22 = EmptyOperator(task_id='date_outside_range', dag=dag2)
cond2 = BranchDateTimeOperator(
task_id='datetime_branch',
follow_task_ids_if_true=['date_in_range'],
follow_task_ids_if_false=['date_outside_range'],
target_upper=pendulum.time(0, 0, 0),
target_lower=pendulum.time(15, 0, 0),
dag=dag2,
)
# Since target_lower happens after target_upper, target_upper will be moved to the following day
# Run empty_task_12 if cond2 executes between 15:00:00, and 00:00:00 of the following day
cond2 >> [empty_task_12, empty_task_22]
# [END howto_branch_datetime_operator_next_day]
dag3 = DAG(
dag_id="example_branch_datetime_operator_3",
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=["example"],
schedule="@daily",
)
# [START howto_branch_datetime_operator_logical_date]
empty_task_13 = EmptyOperator(task_id='date_in_range', dag=dag3)
empty_task_23 = EmptyOperator(task_id='date_outside_range', dag=dag3)
cond3 = BranchDateTimeOperator(
task_id='datetime_branch',
use_task_logical_date=True,
follow_task_ids_if_true=['date_in_range'],
follow_task_ids_if_false=['date_outside_range'],
target_upper=pendulum.datetime(2020, 10, 10, 15, 0, 0),
target_lower=pendulum.datetime(2020, 10, 10, 14, 0, 0),
dag=dag3,
)
# Run empty_task_13 if cond3 executes between 2020-10-10 14:00:00 and 2020-10-10 15:00:00
cond3 >> [empty_task_13, empty_task_23]
# [END howto_branch_datetime_operator_logical_date]
相关信息
相关文章
airflow example_bash_operator 源码
airflow example_branch_day_of_week_operator 源码
airflow example_branch_labels 源码
airflow example_branch_operator 源码
airflow example_branch_operator_decorator 源码
airflow example_branch_python_dop_operator_3 源码
0
赞
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
8、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦