Source code for sqllineage.runner

import logging
import warnings
from typing import Dict, List, Optional, Tuple

from sqllineage.core.holders import SQLLineageHolder
from sqllineage.core.models import Column, Table
from sqllineage.core.parser.sqlfluff.analyzer import SqlFluffLineageAnalyzer
from sqllineage.core.parser.sqlparse.analyzer import SqlParseLineageAnalyzer
from sqllineage.drawing import draw_lineage_graph
from import to_cytoscape
from sqllineage.utils.constant import LineageLevel
from sqllineage.utils.helpers import split, trim_comment

logger = logging.getLogger(__name__)

def lazy_method(func):
    def wrapper(*args, **kwargs):
        self = args[0]
        if not self._evaluated:
        return func(*args, **kwargs)

    return wrapper

def lazy_property(func):
    return property(lazy_method(func))

[docs]class LineageRunner(object): def __init__( self, sql: str, dialect: str = DEFAULT_DIALECT, encoding: Optional[str] = None, verbose: bool = False, draw_options: Optional[Dict[str, str]] = None, ): """ The entry point of SQLLineage after command line options are parsed. :param sql: a string representation of SQL statements. :param encoding: the encoding for sql string :param verbose: verbose flag indicate whether statement-wise lineage result will be shown """ if dialect == SQLPARSE_DIALECT: warnings.warn( "dialect `non-validating` is deprecated, use `ansi` or dialect of your SQL instead. " "`non-validating` will stop being the default dialect in v1.5.x release " "and be completely removed in v1.6.x", DeprecationWarning, stacklevel=2, ) self._encoding = encoding self._sql = sql self._verbose = verbose self._draw_options = draw_options if draw_options else {} self._evaluated = False self._stmt: List[str] = [] self._dialect = dialect
[docs] @lazy_method def __str__(self): """ print out the Lineage Summary. """ statements = self.statements() source_tables = "\n ".join(str(t) for t in self.source_tables) target_tables = "\n ".join(str(t) for t in self.target_tables) combined = f"""Statements(#): {len(statements)} Source Tables: {source_tables} Target Tables: {target_tables} """ if self.intermediate_tables: intermediate_tables = "\n ".join( str(t) for t in self.intermediate_tables ) combined += f"""Intermediate Tables: {intermediate_tables}""" if self._verbose: result = "" for i, holder in enumerate(self._stmt_holders): stmt_short = statements[i].replace("\n", "") if len(stmt_short) > 50: stmt_short = stmt_short[:50] + "..." content = str(holder).replace("\n", "\n ") result += f"""Statement #{i + 1}: {stmt_short} {content} """ combined = result + "==========\nSummary:\n" + combined return combined
@lazy_method def to_cytoscape(self, level=LineageLevel.TABLE) -> List[Dict[str, Dict[str, str]]]: """ to turn the DAG into cytoscape format. """ if level == LineageLevel.COLUMN: return to_cytoscape(self._sql_holder.column_lineage_graph, compound=True) else: return to_cytoscape(self._sql_holder.table_lineage_graph)
[docs] def draw(self, dialect: str) -> None: """ to draw the lineage directed graph """ draw_options = self._draw_options if draw_options.get("f") is None: draw_options.pop("f", None) draw_options["e"] = self._sql draw_options["dialect"] = dialect return draw_lineage_graph(**draw_options)
@lazy_method def statements(self) -> List[str]: """ a list of SQL statements. """ return [trim_comment(s) for s in self._stmt] @lazy_property def source_tables(self) -> List[Table]: """ a list of source :class:`sqllineage.models.Table` """ return sorted(self._sql_holder.source_tables, key=lambda x: str(x)) @lazy_property def target_tables(self) -> List[Table]: """ a list of target :class:`sqllineage.models.Table` """ return sorted(self._sql_holder.target_tables, key=lambda x: str(x)) @lazy_property def intermediate_tables(self) -> List[Table]: """ a list of intermediate :class:`sqllineage.models.Table` """ return sorted(self._sql_holder.intermediate_tables, key=lambda x: str(x)) @lazy_method def get_column_lineage(self, exclude_subquery=True) -> List[Tuple[Column, Column]]: """ a list of column tuple :class:`sqllineage.models.Column` """ # sort by target column, and then source column return sorted( self._sql_holder.get_column_lineage(exclude_subquery), key=lambda x: (str(x[-1]), str(x[0])), )
[docs] def print_column_lineage(self) -> None: """ print column level lineage to stdout """ for path in self.get_column_lineage(): print(" <- ".join(str(col) for col in reversed(path)))
[docs] def print_table_lineage(self) -> None: """ print table level lineage to stdout """ print(str(self))
def _eval(self): self._stmt = split(self._sql.strip()) analyzer = ( SqlParseLineageAnalyzer() if self._dialect == SQLPARSE_DIALECT else SqlFluffLineageAnalyzer(self._dialect) ) self._stmt_holders = [analyzer.analyze(stmt) for stmt in self._stmt] self._sql_holder = SQLLineageHolder.of(*self._stmt_holders) self._evaluated = True