Source code for sqllineage.runner

import logging
import warnings
from collections import OrderedDict
from typing import Any, Dict, List, Optional, Tuple

from sqllineage import DEFAULT_DIALECT, SQLPARSE_DIALECT
from sqllineage.config import SQLLineageConfig
from sqllineage.core.holders import SQLLineageHolder
from sqllineage.core.metadata.dummy import DummyMetaDataProvider
from sqllineage.core.metadata_provider import MetaDataProvider
from sqllineage.core.models import Column, Table
from sqllineage.core.parser.sqlfluff.analyzer import SqlFluffLineageAnalyzer
from sqllineage.core.parser.sqlparse.analyzer import SqlParseLineageAnalyzer
from sqllineage.drawing import draw_lineage_graph
from sqllineage.io import to_cytoscape
from sqllineage.utils.constant import LineageLevel
from sqllineage.utils.helpers import split, trim_comment

logger = logging.getLogger(__name__)


def lazy_method(func):
    def wrapper(*args, **kwargs):
        self = args[0]
        if not self._evaluated:
            self._eval()
        return func(*args, **kwargs)

    return wrapper


def lazy_property(func):
    return property(lazy_method(func))


[docs] class LineageRunner(object): def __init__( self, sql: str, dialect: str = DEFAULT_DIALECT, metadata_provider: MetaDataProvider = DummyMetaDataProvider(), verbose: bool = False, silent_mode: bool = False, draw_options: Optional[Dict[str, Any]] = None, ): """ The entry point of SQLLineage after command line options are parsed. :param sql: a string representation of SQL statements. :param dialect: sql dialect :param metadata_provider: metadata service object providing table schema :param verbose: verbose flag indicating whether statement-wise lineage result will be shown :param silent_mode: boolean flag indicating whether to skip lineage analysis for unknown statement types """ if dialect == SQLPARSE_DIALECT: warnings.warn( f"dialect `{SQLPARSE_DIALECT}` is deprecated, use `ansi` or dialect of your SQL instead. " f"`{SQLPARSE_DIALECT}` will be completely removed in v1.6.x", DeprecationWarning, stacklevel=2, ) self._sql = sql self._verbose = verbose self._draw_options = draw_options if draw_options else {} self._evaluated = False self._stmt: List[str] = [] self._dialect = dialect self._metadata_provider = metadata_provider self._silent_mode = silent_mode
[docs] @lazy_method def __str__(self): """ print out the Lineage Summary. """ statements = self.statements() source_tables = "\n ".join(str(t) for t in self.source_tables) target_tables = "\n ".join(str(t) for t in self.target_tables) combined = f"""Statements(#): {len(statements)} Source Tables: {source_tables} Target Tables: {target_tables} """ if self.intermediate_tables: intermediate_tables = "\n ".join( str(t) for t in self.intermediate_tables ) combined += f"""Intermediate Tables: {intermediate_tables}""" if self._verbose: result = "" for i, holder in enumerate(self._stmt_holders): stmt_short = statements[i].replace("\n", "") if len(stmt_short) > 50: stmt_short = stmt_short[:50] + "..." content = str(holder).replace("\n", "\n ") result += f"""Statement #{i + 1}: {stmt_short} {content} """ combined = result + "==========\nSummary:\n" + combined return combined
@lazy_method def to_cytoscape(self, level=LineageLevel.TABLE) -> List[Dict[str, Dict[str, str]]]: """ to turn the DAG into cytoscape format. """ if level == LineageLevel.COLUMN: return to_cytoscape(self._sql_holder.column_lineage_graph, compound=True) else: return to_cytoscape(self._sql_holder.table_lineage_graph)
[docs] def draw(self) -> None: """ to draw the lineage directed graph """ draw_options = self._draw_options if draw_options.get("f") is None: draw_options.pop("f", None) draw_options["e"] = self._sql draw_options["dialect"] = self._dialect draw_options["metadata_provider"] = self._metadata_provider return draw_lineage_graph(**draw_options)
@lazy_method def statements(self) -> List[str]: """ a list of SQL statements. """ return [trim_comment(s) for s in self._stmt] @lazy_property def source_tables(self) -> List[Table]: """ a list of source :class:`sqllineage.models.Table` """ return sorted(self._sql_holder.source_tables, key=lambda x: str(x)) @lazy_property def target_tables(self) -> List[Table]: """ a list of target :class:`sqllineage.models.Table` """ return sorted(self._sql_holder.target_tables, key=lambda x: str(x)) @lazy_property def intermediate_tables(self) -> List[Table]: """ a list of intermediate :class:`sqllineage.models.Table` """ return sorted(self._sql_holder.intermediate_tables, key=lambda x: str(x)) @lazy_method def get_column_lineage( self, exclude_path_ending_in_subquery=True, exclude_subquery_columns=False ) -> List[Tuple[Column, Column]]: """ a list of column tuple :class:`sqllineage.models.Column` """ # sort by target column, and then source column return sorted( self._sql_holder.get_column_lineage( exclude_path_ending_in_subquery, exclude_subquery_columns ), key=lambda x: (str(x[-1]), str(x[0])), )
[docs] def print_column_lineage(self) -> None: """ print column level lineage to stdout """ for path in self.get_column_lineage(): print(" <- ".join(str(col) for col in reversed(path)))
[docs] def print_table_lineage(self) -> None: """ print table level lineage to stdout """ print(str(self))
def _eval(self): analyzer = ( SqlParseLineageAnalyzer() if self._dialect == SQLPARSE_DIALECT else SqlFluffLineageAnalyzer(self._dialect, self._silent_mode) ) if SQLLineageConfig.TSQL_NO_SEMICOLON and self._dialect == "tsql": self._stmt = analyzer.split_tsql(self._sql.strip()) else: if SQLLineageConfig.TSQL_NO_SEMICOLON and self._dialect != "tsql": warnings.warn( f"Dialect={self._dialect}, TSQL_NO_SEMICOLON will be ignored unless dialect is tsql" ) self._stmt = split(self._sql.strip()) with self._metadata_provider.session() as session: stmt_holders = [] for stmt in self._stmt: stmt_holder = analyzer.analyze(stmt, session.metadata_provider) if write := stmt_holder.write: tgt_table = next(iter(write)) if isinstance(tgt_table, Table) and ( tgt_columns := stmt_holder.get_table_columns(tgt_table) ): session.register_session_metadata(tgt_table, tgt_columns) stmt_holders.append(stmt_holder) self._stmt_holders = stmt_holders self._sql_holder = SQLLineageHolder.of( session.metadata_provider, *self._stmt_holders ) self._evaluated = True
[docs] @staticmethod def supported_dialects() -> Dict[str, List[str]]: """ an ordered dict (so we can make sure the default parser implementation comes first) with key, value as parser_name, dialect list respectively """ dialects = OrderedDict( [ ( SqlFluffLineageAnalyzer.PARSER_NAME, SqlFluffLineageAnalyzer.SUPPORTED_DIALECTS, ), ( SqlParseLineageAnalyzer.PARSER_NAME, SqlParseLineageAnalyzer.SUPPORTED_DIALECTS, ), ] ) return dialects