Source code for sqllineage.runner

import logging
import warnings
from collections import OrderedDict
from typing import Dict, List, Optional, Tuple

from sqllineage.config import SQLLineageConfig
from sqllineage.core.holders import SQLLineageHolder
from sqllineage.core.metadata.dummy import DummyMetaDataProvider
from sqllineage.core.metadata_provider import MetaDataProvider
from sqllineage.core.models import Column, Table
from sqllineage.core.parser.sqlfluff.analyzer import SqlFluffLineageAnalyzer
from sqllineage.core.parser.sqlparse.analyzer import SqlParseLineageAnalyzer
from sqllineage.drawing import draw_lineage_graph
from import to_cytoscape
from sqllineage.utils.constant import LineageLevel
from sqllineage.utils.helpers import split, trim_comment

logger = logging.getLogger(__name__)

def lazy_method(func):
    def wrapper(*args, **kwargs):
        self = args[0]
        if not self._evaluated:
        return func(*args, **kwargs)

    return wrapper

def lazy_property(func):
    return property(lazy_method(func))

[docs] class LineageRunner(object): def __init__( self, sql: str, dialect: str = DEFAULT_DIALECT, metadata_provider: MetaDataProvider = DummyMetaDataProvider(), verbose: bool = False, silent_mode: bool = False, draw_options: Optional[Dict[str, str]] = None, ): """ The entry point of SQLLineage after command line options are parsed. :param sql: a string representation of SQL statements. :param dialect: sql dialect :param metadata_provider: metadata service object providing table schema :param verbose: verbose flag indicating whether statement-wise lineage result will be shown :param silent_mode: boolean flag indicating whether to skip lineage analysis for unknown statement types """ if dialect == SQLPARSE_DIALECT: warnings.warn( f"dialect `{SQLPARSE_DIALECT}` is deprecated, use `ansi` or dialect of your SQL instead. " f"`{SQLPARSE_DIALECT}` will be completely removed in v1.6.x", DeprecationWarning, stacklevel=2, ) self._sql = sql self._verbose = verbose self._draw_options = draw_options if draw_options else {} self._evaluated = False self._stmt: List[str] = [] self._dialect = dialect self._metadata_provider = metadata_provider self._silent_mode = silent_mode
[docs] @lazy_method def __str__(self): """ print out the Lineage Summary. """ statements = self.statements() source_tables = "\n ".join(str(t) for t in self.source_tables) target_tables = "\n ".join(str(t) for t in self.target_tables) combined = f"""Statements(#): {len(statements)} Source Tables: {source_tables} Target Tables: {target_tables} """ if self.intermediate_tables: intermediate_tables = "\n ".join( str(t) for t in self.intermediate_tables ) combined += f"""Intermediate Tables: {intermediate_tables}""" if self._verbose: result = "" for i, holder in enumerate(self._stmt_holders): stmt_short = statements[i].replace("\n", "") if len(stmt_short) > 50: stmt_short = stmt_short[:50] + "..." content = str(holder).replace("\n", "\n ") result += f"""Statement #{i + 1}: {stmt_short} {content} """ combined = result + "==========\nSummary:\n" + combined return combined
@lazy_method def to_cytoscape(self, level=LineageLevel.TABLE) -> List[Dict[str, Dict[str, str]]]: """ to turn the DAG into cytoscape format. """ if level == LineageLevel.COLUMN: return to_cytoscape(self._sql_holder.column_lineage_graph, compound=True) else: return to_cytoscape(self._sql_holder.table_lineage_graph)
[docs] def draw(self) -> None: """ to draw the lineage directed graph """ draw_options = self._draw_options if draw_options.get("f") is None: draw_options.pop("f", None) draw_options["e"] = self._sql draw_options["dialect"] = self._dialect return draw_lineage_graph(**draw_options)
@lazy_method def statements(self) -> List[str]: """ a list of SQL statements. """ return [trim_comment(s) for s in self._stmt] @lazy_property def source_tables(self) -> List[Table]: """ a list of source :class:`sqllineage.models.Table` """ return sorted(self._sql_holder.source_tables, key=lambda x: str(x)) @lazy_property def target_tables(self) -> List[Table]: """ a list of target :class:`sqllineage.models.Table` """ return sorted(self._sql_holder.target_tables, key=lambda x: str(x)) @lazy_property def intermediate_tables(self) -> List[Table]: """ a list of intermediate :class:`sqllineage.models.Table` """ return sorted(self._sql_holder.intermediate_tables, key=lambda x: str(x)) @lazy_method def get_column_lineage( self, exclude_path_ending_in_subquery=True, exclude_subquery_columns=False ) -> List[Tuple[Column, Column]]: """ a list of column tuple :class:`sqllineage.models.Column` """ # sort by target column, and then source column return sorted( self._sql_holder.get_column_lineage( exclude_path_ending_in_subquery, exclude_subquery_columns ), key=lambda x: (str(x[-1]), str(x[0])), )
[docs] def print_column_lineage(self) -> None: """ print column level lineage to stdout """ for path in self.get_column_lineage(): print(" <- ".join(str(col) for col in reversed(path)))
[docs] def print_table_lineage(self) -> None: """ print table level lineage to stdout """ print(str(self))
def _eval(self): analyzer = ( SqlParseLineageAnalyzer() if self._dialect == SQLPARSE_DIALECT else SqlFluffLineageAnalyzer(self._dialect, self._silent_mode) ) if SQLLineageConfig.TSQL_NO_SEMICOLON and self._dialect == "tsql": self._stmt = analyzer.split_tsql(self._sql.strip()) else: if SQLLineageConfig.TSQL_NO_SEMICOLON and self._dialect != "tsql": warnings.warn( f"Dialect={self._dialect}, TSQL_NO_SEMICOLON will be ignored unless dialect is tsql" ) self._stmt = split(self._sql.strip()) with self._metadata_provider.session() as session: stmt_holders = [] for stmt in self._stmt: stmt_holder = analyzer.analyze(stmt, session.metadata_provider) if write := stmt_holder.write: tgt_table = next(iter(write)) if isinstance(tgt_table, Table) and ( tgt_columns := stmt_holder.get_table_columns(tgt_table) ): session.register_session_metadata(tgt_table, tgt_columns) stmt_holders.append(stmt_holder) self._stmt_holders = stmt_holders self._sql_holder = SQLLineageHolder.of( session.metadata_provider, *self._stmt_holders ) self._evaluated = True
[docs] @staticmethod def supported_dialects() -> Dict[str, List[str]]: """ an ordered dict (so we can make sure the default parser implementation comes first) with key, value as parser_name, dialect list respectively """ dialects = OrderedDict( [ ( SqlFluffLineageAnalyzer.PARSER_NAME, SqlFluffLineageAnalyzer.SUPPORTED_DIALECTS, ), ( SqlParseLineageAnalyzer.PARSER_NAME, SqlParseLineageAnalyzer.SUPPORTED_DIALECTS, ), ] ) return dialects