airflow pre_commit_json_schema 源码

  • 2022-10-20
  • 浏览 (367)

airflow pre_commit_json_schema 代码

文件路径:/scripts/ci/pre_commit/pre_commit_json_schema.py

#!/usr/bin/env python
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

import argparse
import hashlib
import json
import os
import re
import sys

import requests
import yaml
from jsonschema.exceptions import ValidationError
from jsonschema.validators import extend, validator_for

if __name__ != "__main__":
    raise Exception(
        "This file is intended to be executed as an executable program. You cannot use it as a module."
        "To run this script, run the ./build_docs.py command"
    )

AIRFLOW_SOURCES_DIR = os.path.join(os.path.dirname(__file__), os.pardir, os.pardir, os.pardir)


def _cache_dir():
    """Return full path to the user-specific cache dir for this application"""
    path = os.path.join(AIRFLOW_SOURCES_DIR, ".build", "cache")
    os.makedirs(path, exist_ok=True)
    return path


def _gethash(string: str):
    hash_object = hashlib.sha256(string.encode())
    return hash_object.hexdigest()[:8]


def fetch_and_cache(url: str, output_filename: str):
    """Fetch URL to local cache and returns path."""
    cache_key = _gethash(url)
    cache_dir = _cache_dir()
    cache_metadata_filepath = os.path.join(cache_dir, "cache-metadata.json")
    cache_filepath = os.path.join(cache_dir, f"{cache_key}-{output_filename[:64]}")
    # Create cache directory
    os.makedirs(cache_dir, exist_ok=True)
    # Load cache metadata
    cache_metadata: dict[str, str] = {}
    if os.path.exists(cache_metadata_filepath):
        try:
            with open(cache_metadata_filepath) as cache_file:
                cache_metadata = json.load(cache_file)
        except json.JSONDecodeError:
            os.remove(cache_metadata_filepath)
    etag = cache_metadata.get(cache_key)

    # If we have a file and etag, check the fast path
    if os.path.exists(cache_filepath) and etag:
        res = requests.get(url, headers={"If-None-Match": etag})
        if res.status_code == 304:
            return cache_filepath

    # Slow patch
    res = requests.get(url)
    res.raise_for_status()

    with open(cache_filepath, "wb") as output_file:
        output_file.write(res.content)

    # Save cache metadata, if needed
    etag = res.headers.get('etag', None)
    if etag:
        cache_metadata[cache_key] = etag
        with open(cache_metadata_filepath, 'w') as cache_file:
            json.dump(cache_metadata, cache_file)

    return cache_filepath


class _ValidatorError(Exception):
    pass


def load_file(file_path: str):
    """Loads a file using a serializer which guesses based on the file extension"""
    if file_path.lower().endswith('.json'):
        with open(file_path) as input_file:
            return json.load(input_file)
    elif file_path.lower().endswith('.yaml') or file_path.lower().endswith('.yml'):
        with open(file_path) as input_file:
            return yaml.safe_load(input_file)
    raise _ValidatorError("Unknown file format. Supported extension: '.yaml', '.json'")


def _get_parser() -> argparse.ArgumentParser:
    parser = argparse.ArgumentParser(description='Validates the file using JSON Schema specifications')
    parser.add_argument(
        '--enforce-defaults', action='store_true', help="Values must match the default in the schema"
    )

    group = parser.add_mutually_exclusive_group(required=True)
    group.add_argument('--spec-file', help="The path to specification")
    group.add_argument('--spec-url', help="The URL to specification")

    parser.add_argument('file', nargs='+')

    return parser


def _process_files(validator, file_paths: list[str]):
    exit_code = 0
    for input_path in file_paths:
        print("Processing file: ", input_path)
        instance = load_file(input_path)
        for error in validator.iter_errors(instance):
            print(error)
            exit_code = 1
    return exit_code


def _create_validator(schema, enforce_defaults: bool):
    cls = validator_for(schema)
    cls.check_schema(schema)
    if enforce_defaults:
        cls = extend(cls, {"default": _default_validator})
    return cls(schema)


def _default_validator(validator, default, instance, schema):
    # We will also accept a "See values.yaml" default
    if default != instance and default != "See values.yaml":
        yield ValidationError(f"{instance} is not equal to the default of {default}")


def _load_spec(spec_file: str | None, spec_url: str | None):
    if spec_url:
        spec_file = fetch_and_cache(url=spec_url, output_filename=re.sub(r"[^a-zA-Z0-9]", "-", spec_url))
    if not spec_file:
        raise Exception(f"The {spec_file} was None and {spec_url} did not lead to any file loading.")
    with open(spec_file) as schema_file:
        schema = json.loads(schema_file.read())
    return schema


def main() -> int:
    """Main code"""
    parser = _get_parser()
    args = parser.parse_args()
    spec_url = args.spec_url
    spec_file = args.spec_file
    enforce_defaults = args.enforce_defaults

    schema = _load_spec(spec_file, spec_url)

    validator = _create_validator(schema, enforce_defaults)

    file_paths = args.file
    exit_code = _process_files(validator, file_paths)

    return exit_code


sys.exit(main())

相关信息

airflow 源码目录

相关文章

airflow common_precommit_utils 源码

airflow pre_commit_base_operator_partial_arguments 源码

airflow pre_commit_boring_cyborg 源码

airflow pre_commit_breeze_cmd_line 源码

airflow pre_commit_build_providers_dependencies 源码

airflow pre_commit_changelog_duplicates 源码

airflow pre_commit_chart_schema 源码

airflow pre_commit_check_2_2_compatibility 源码

airflow pre_commit_check_lazy_logging 源码

airflow pre_commit_check_license 源码

0  赞