airflow hive_metastore 源码

  • 2022-10-20
  • 浏览 (633)

airflow hive_metastore 代码

文件路径:/metastore_browser/hive_metastore.py

#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
"""Plugins metabrowser"""
from __future__ import annotations

import json
from datetime import datetime
from typing import List

import pandas as pd
from flask import Blueprint, request
from flask_appbuilder import BaseView, expose
from markupsafe import Markup

from airflow.plugins_manager import AirflowPlugin
from airflow.providers.apache.hive.hooks.hive import HiveCliHook, HiveMetastoreHook
from airflow.providers.mysql.hooks.mysql import MySqlHook
from airflow.providers.presto.hooks.presto import PrestoHook
from airflow.www.decorators import gzipped

METASTORE_CONN_ID = 'metastore_default'
METASTORE_MYSQL_CONN_ID = 'metastore_mysql'
PRESTO_CONN_ID = 'presto_default'
HIVE_CLI_CONN_ID = 'hive_default'
DEFAULT_DB = 'default'
DB_ALLOW_LIST = []  # type: List[str]
DB_DENY_LIST = ['tmp']  # type: List[str]
TABLE_SELECTOR_LIMIT = 2000

# Keeping pandas from truncating long strings
pd.set_option('display.max_colwidth', -1)


class MetastoreBrowserView(BaseView):
    """Creating a Flask-AppBuilder BaseView"""

    default_view = 'index'

    @expose('/')
    def index(self):
        """Create default view"""
        sql = """
        SELECT
            a.name as db, db_location_uri as location,
            count(1) as object_count, a.desc as description
        FROM DBS a
        JOIN TBLS b ON a.DB_ID = b.DB_ID
        GROUP BY a.name, db_location_uri, a.desc
        """
        hook = MySqlHook(METASTORE_MYSQL_CONN_ID)
        df = hook.get_pandas_df(sql)
        df.db = '<a href="/metastorebrowserview/db/?db=' + df.db + '">' + df.db + '</a>'
        table = df.to_html(
            classes="table table-striped table-bordered table-hover",
            index=False,
            escape=False,
            na_rep='',
        )
        return self.render_template("metastore_browser/dbs.html", table=Markup(table))

    @expose('/table/')
    def table(self):
        """Create table view"""
        table_name = request.args.get("table")
        metastore = HiveMetastoreHook(METASTORE_CONN_ID)
        table = metastore.get_table(table_name)
        return self.render_template(
            "metastore_browser/table.html", table=table, table_name=table_name, datetime=datetime, int=int
        )

    @expose('/db/')
    def db(self):
        """Show tables in database"""
        db = request.args.get("db")
        metastore = HiveMetastoreHook(METASTORE_CONN_ID)
        tables = sorted(metastore.get_tables(db=db), key=lambda x: x.tableName)
        return self.render_template("metastore_browser/db.html", tables=tables, db=db)

    @gzipped
    @expose('/partitions/')
    def partitions(self):
        """Retrieve table partitions"""
        schema, table = request.args.get("table").split('.')
        sql = f"""
        SELECT
            a.PART_NAME,
            a.CREATE_TIME,
            c.LOCATION,
            c.IS_COMPRESSED,
            c.INPUT_FORMAT,
            c.OUTPUT_FORMAT
        FROM PARTITIONS a
        JOIN TBLS b ON a.TBL_ID = b.TBL_ID
        JOIN DBS d ON b.DB_ID = d.DB_ID
        JOIN SDS c ON a.SD_ID = c.SD_ID
        WHERE
            b.TBL_NAME like '{table}' AND
            d.NAME like '{schema}'
        ORDER BY PART_NAME DESC
        """
        hook = MySqlHook(METASTORE_MYSQL_CONN_ID)
        df = hook.get_pandas_df(sql)
        return df.to_html(
            classes="table table-striped table-bordered table-hover",
            index=False,
            na_rep='',
        )

    @gzipped
    @expose('/objects/')
    def objects(self):
        """Retrieve objects from TBLS and DBS"""
        where_clause = ''
        if DB_ALLOW_LIST:
            dbs = ",".join("'" + db + "'" for db in DB_ALLOW_LIST)
            where_clause = f"AND b.name IN ({dbs})"
        if DB_DENY_LIST:
            dbs = ",".join("'" + db + "'" for db in DB_DENY_LIST)
            where_clause = f"AND b.name NOT IN ({dbs})"
        sql = f"""
        SELECT CONCAT(b.NAME, '.', a.TBL_NAME), TBL_TYPE
        FROM TBLS a
        JOIN DBS b ON a.DB_ID = b.DB_ID
        WHERE
            a.TBL_NAME NOT LIKE '%tmp%' AND
            a.TBL_NAME NOT LIKE '%temp%' AND
            b.NAME NOT LIKE '%tmp%' AND
            b.NAME NOT LIKE '%temp%'
        {where_clause}
        LIMIT {TABLE_SELECTOR_LIMIT};
        """
        hook = MySqlHook(METASTORE_MYSQL_CONN_ID)
        data = [{'id': row[0], 'text': row[0]} for row in hook.get_records(sql)]
        return json.dumps(data)

    @gzipped
    @expose('/data/')
    def data(self):
        """Retrieve data from table"""
        table = request.args.get("table")
        sql = f"SELECT * FROM {table} LIMIT 1000;"
        hook = PrestoHook(PRESTO_CONN_ID)
        df = hook.get_pandas_df(sql)
        return df.to_html(
            classes="table table-striped table-bordered table-hover",
            index=False,
            na_rep='',
        )

    @expose('/ddl/')
    def ddl(self):
        """Retrieve table ddl"""
        table = request.args.get("table")
        sql = f"SHOW CREATE TABLE {table};"
        hook = HiveCliHook(HIVE_CLI_CONN_ID)
        return hook.run_cli(sql)


# Creating a flask blueprint to integrate the templates and static folder
bp = Blueprint(
    "metastore_browser",
    __name__,
    template_folder='templates',
    static_folder='static',
    static_url_path='/static/metastore_browser',
)


class MetastoreBrowserPlugin(AirflowPlugin):
    """Defining the plugin class"""

    name = "metastore_browser"
    flask_blueprints = [bp]
    appbuilder_views = [
        {"name": "Hive Metadata Browser", "category": "Plugins", "view": MetastoreBrowserView()}
    ]

相关信息

airflow 源码目录

相关文章

airflow init 源码

airflow main 源码

airflow init 源码

airflow init 源码

airflow init 源码

airflow basic_auth 源码

airflow default 源码

airflow deny_all 源码

airflow kerberos_auth 源码

airflow session 源码

0  赞