airflow base_task_runner 源码
airflow base_task_runner 代码
文件路径:/airflow/task/task_runner/base_task_runner.py
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Base task runner"""
from __future__ import annotations
import os
import subprocess
import threading
from airflow.utils.dag_parsing_context import _airflow_parsing_context_manager
from airflow.utils.platform import IS_WINDOWS
if not IS_WINDOWS:
# ignored to avoid flake complaining on Linux
from pwd import getpwnam # noqa
from airflow.configuration import conf
from airflow.exceptions import AirflowConfigException
from airflow.utils.configuration import tmp_configuration_copy
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.net import get_hostname
from airflow.utils.platform import getuser
PYTHONPATH_VAR = 'PYTHONPATH'
class BaseTaskRunner(LoggingMixin):
"""
Runs Airflow task instances by invoking the `airflow tasks run` command with raw
mode enabled in a subprocess.
:param local_task_job: The local task job associated with running the
associated task instance.
"""
def __init__(self, local_task_job):
# Pass task instance context into log handlers to setup the logger.
super().__init__(local_task_job.task_instance)
self._task_instance = local_task_job.task_instance
popen_prepend = []
if self._task_instance.run_as_user:
self.run_as_user = self._task_instance.run_as_user
else:
try:
self.run_as_user = conf.get('core', 'default_impersonation')
except AirflowConfigException:
self.run_as_user = None
# Add sudo commands to change user if we need to. Needed to handle SubDagOperator
# case using a SequentialExecutor.
self.log.debug("Planning to run as the %s user", self.run_as_user)
if self.run_as_user and (self.run_as_user != getuser()):
# We want to include any environment variables now, as we won't
# want to have to specify them in the sudo call - they would show
# up in `ps` that way! And run commands now, as the other user
# might not be able to run the cmds to get credentials
cfg_path = tmp_configuration_copy(chmod=0o600, include_env=True, include_cmds=True)
# Give ownership of file to user; only they can read and write
subprocess.check_call(['sudo', 'chown', self.run_as_user, cfg_path], close_fds=True)
# propagate PYTHONPATH environment variable
pythonpath_value = os.environ.get(PYTHONPATH_VAR, '')
popen_prepend = ['sudo', '-E', '-H', '-u', self.run_as_user]
if pythonpath_value:
popen_prepend.append(f'{PYTHONPATH_VAR}={pythonpath_value}')
else:
# Always provide a copy of the configuration file settings. Since
# we are running as the same user, and can pass through environment
# variables then we don't need to include those in the config copy
# - the runner can read/execute those values as it needs
cfg_path = tmp_configuration_copy(chmod=0o600, include_env=False, include_cmds=False)
self._cfg_path = cfg_path
self._command = popen_prepend + self._task_instance.command_as_list(
raw=True,
pickle_id=local_task_job.pickle_id,
mark_success=local_task_job.mark_success,
job_id=local_task_job.id,
pool=local_task_job.pool,
cfg_path=cfg_path,
)
self.process = None
def _read_task_logs(self, stream):
while True:
line = stream.readline()
if isinstance(line, bytes):
line = line.decode('utf-8')
if not line:
break
self.log.info(
'Job %s: Subtask %s %s',
self._task_instance.job_id,
self._task_instance.task_id,
line.rstrip('\n'),
)
def run_command(self, run_with=None):
"""
Run the task command.
:param run_with: list of tokens to run the task command with e.g. ``['bash', '-c']``
:return: the process that was run
:rtype: subprocess.Popen
"""
run_with = run_with or []
full_cmd = run_with + self._command
self.log.info("Running on host: %s", get_hostname())
self.log.info('Running: %s', full_cmd)
with _airflow_parsing_context_manager(
dag_id=self._task_instance.dag_id,
task_id=self._task_instance.task_id,
):
if IS_WINDOWS:
proc = subprocess.Popen(
full_cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
universal_newlines=True,
close_fds=True,
env=os.environ.copy(),
)
else:
proc = subprocess.Popen(
full_cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
universal_newlines=True,
close_fds=True,
env=os.environ.copy(),
preexec_fn=os.setsid,
)
# Start daemon thread to read subprocess logging output
log_reader = threading.Thread(
target=self._read_task_logs,
args=(proc.stdout,),
)
log_reader.daemon = True
log_reader.start()
return proc
def start(self):
"""Start running the task instance in a subprocess."""
raise NotImplementedError()
def return_code(self, timeout: int = 0) -> int | None:
"""
:return: The return code associated with running the task instance or
None if the task is not yet done.
:rtype: int
"""
raise NotImplementedError()
def terminate(self) -> None:
"""Force kill the running task instance."""
raise NotImplementedError()
def on_finish(self) -> None:
"""A callback that should be called when this is done running."""
if self._cfg_path and os.path.isfile(self._cfg_path):
if self.run_as_user:
subprocess.call(['sudo', 'rm', self._cfg_path], close_fds=True)
else:
os.remove(self._cfg_path)
相关信息
相关文章
0
赞
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
8、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦