airflow pre_commit_vendor_k8s_json_schema 源码
airflow pre_commit_vendor_k8s_json_schema 代码
文件路径:/scripts/ci/pre_commit/pre_commit_vendor_k8s_json_schema.py
#!/usr/bin/env python
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations
import json
from typing import Iterator
import requests
K8S_DEFINITIONS = (
"https://raw.githubusercontent.com/yannh/kubernetes-json-schema"
"/master/v1.22.0-standalone-strict/_definitions.json"
)
VALUES_SCHEMA_FILE = "chart/values.schema.json"
with open(VALUES_SCHEMA_FILE) as f:
schema = json.load(f)
def find_refs(props: dict) -> Iterator[str]:
for value in props.values():
if "$ref" in value:
yield value["$ref"]
if "items" in value:
if "$ref" in value["items"]:
yield value["items"]["$ref"]
if "properties" in value:
yield from find_refs(value["properties"])
def get_remote_schema(url: str) -> dict:
req = requests.get(url)
req.raise_for_status()
return req.json()
# Create 'definitions' if it doesn't exist or reset the io.k8s defs
schema["definitions"] = {k: v for k, v in schema.get("definitions", {}).items() if not k.startswith("io.k8s")}
# Get the k8s defs
defs = get_remote_schema(K8S_DEFINITIONS)
# first find refs in our schema
refs = set(find_refs(schema["properties"]))
# now we look for refs in refs
i = 0
while True:
starting_refs = refs
for ref in refs:
ref_id = ref.split('/')[-1]
schema["definitions"][ref_id] = defs["definitions"][ref_id]
refs = set(find_refs(schema["definitions"]))
if refs == starting_refs:
break
# Make sure we don't have a runaway loop
i += 1
if i > 15:
raise SystemExit("Wasn't able to find all nested references in 15 cycles")
# and finally, sort them all!
schema["definitions"] = dict(sorted(schema["definitions"].items()))
# Then write out our schema
with open(VALUES_SCHEMA_FILE, 'w') as f:
json.dump(schema, f, indent=4)
f.write('\n') # with a newline!
相关信息
相关文章
airflow common_precommit_utils 源码
airflow pre_commit_base_operator_partial_arguments 源码
airflow pre_commit_boring_cyborg 源码
airflow pre_commit_breeze_cmd_line 源码
airflow pre_commit_build_providers_dependencies 源码
airflow pre_commit_changelog_duplicates 源码
airflow pre_commit_chart_schema 源码
airflow pre_commit_check_2_2_compatibility 源码
0
赞
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
8、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦