import logging
import warnings
from typing import Dict, List, Optional, Tuple
from sqllineage import DEFAULT_DIALECT, SQLPARSE_DIALECT
from sqllineage.core.holders import SQLLineageHolder
from sqllineage.core.models import Column, Table
from sqllineage.core.parser.sqlfluff.analyzer import SqlFluffLineageAnalyzer
from sqllineage.core.parser.sqlparse.analyzer import SqlParseLineageAnalyzer
from sqllineage.drawing import draw_lineage_graph
from sqllineage.io import to_cytoscape
from sqllineage.utils.constant import LineageLevel
from sqllineage.utils.helpers import split, trim_comment
logger = logging.getLogger(__name__)
def lazy_method(func):
def wrapper(*args, **kwargs):
self = args[0]
if not self._evaluated:
self._eval()
return func(*args, **kwargs)
return wrapper
def lazy_property(func):
return property(lazy_method(func))
[docs]class LineageRunner(object):
def __init__(
self,
sql: str,
dialect: str = DEFAULT_DIALECT,
encoding: Optional[str] = None,
verbose: bool = False,
draw_options: Optional[Dict[str, str]] = None,
):
"""
The entry point of SQLLineage after command line options are parsed.
:param sql: a string representation of SQL statements.
:param encoding: the encoding for sql string
:param verbose: verbose flag indicate whether statement-wise lineage result will be shown
"""
if dialect == SQLPARSE_DIALECT:
warnings.warn(
"dialect `non-validating` is deprecated, use `ansi` or dialect of your SQL instead. "
"`non-validating` will stop being the default dialect in v1.5.x release "
"and be completely removed in v1.6.x",
DeprecationWarning,
stacklevel=2,
)
self._encoding = encoding
self._sql = sql
self._verbose = verbose
self._draw_options = draw_options if draw_options else {}
self._evaluated = False
self._stmt: List[str] = []
self._dialect = dialect
[docs] @lazy_method
def __str__(self):
"""
print out the Lineage Summary.
"""
statements = self.statements()
source_tables = "\n ".join(str(t) for t in self.source_tables)
target_tables = "\n ".join(str(t) for t in self.target_tables)
combined = f"""Statements(#): {len(statements)}
Source Tables:
{source_tables}
Target Tables:
{target_tables}
"""
if self.intermediate_tables:
intermediate_tables = "\n ".join(
str(t) for t in self.intermediate_tables
)
combined += f"""Intermediate Tables:
{intermediate_tables}"""
if self._verbose:
result = ""
for i, holder in enumerate(self._stmt_holders):
stmt_short = statements[i].replace("\n", "")
if len(stmt_short) > 50:
stmt_short = stmt_short[:50] + "..."
content = str(holder).replace("\n", "\n ")
result += f"""Statement #{i + 1}: {stmt_short}
{content}
"""
combined = result + "==========\nSummary:\n" + combined
return combined
@lazy_method
def to_cytoscape(self, level=LineageLevel.TABLE) -> List[Dict[str, Dict[str, str]]]:
"""
to turn the DAG into cytoscape format.
"""
if level == LineageLevel.COLUMN:
return to_cytoscape(self._sql_holder.column_lineage_graph, compound=True)
else:
return to_cytoscape(self._sql_holder.table_lineage_graph)
[docs] def draw(self, dialect: str) -> None:
"""
to draw the lineage directed graph
"""
draw_options = self._draw_options
if draw_options.get("f") is None:
draw_options.pop("f", None)
draw_options["e"] = self._sql
draw_options["dialect"] = dialect
return draw_lineage_graph(**draw_options)
@lazy_method
def statements(self) -> List[str]:
"""
a list of SQL statements.
"""
return [trim_comment(s) for s in self._stmt]
@lazy_property
def source_tables(self) -> List[Table]:
"""
a list of source :class:`sqllineage.models.Table`
"""
return sorted(self._sql_holder.source_tables, key=lambda x: str(x))
@lazy_property
def target_tables(self) -> List[Table]:
"""
a list of target :class:`sqllineage.models.Table`
"""
return sorted(self._sql_holder.target_tables, key=lambda x: str(x))
@lazy_property
def intermediate_tables(self) -> List[Table]:
"""
a list of intermediate :class:`sqllineage.models.Table`
"""
return sorted(self._sql_holder.intermediate_tables, key=lambda x: str(x))
@lazy_method
def get_column_lineage(self, exclude_subquery=True) -> List[Tuple[Column, Column]]:
"""
a list of column tuple :class:`sqllineage.models.Column`
"""
# sort by target column, and then source column
return sorted(
self._sql_holder.get_column_lineage(exclude_subquery),
key=lambda x: (str(x[-1]), str(x[0])),
)
[docs] def print_column_lineage(self) -> None:
"""
print column level lineage to stdout
"""
for path in self.get_column_lineage():
print(" <- ".join(str(col) for col in reversed(path)))
[docs] def print_table_lineage(self) -> None:
"""
print table level lineage to stdout
"""
print(str(self))
def _eval(self):
self._stmt = split(self._sql.strip())
analyzer = (
SqlParseLineageAnalyzer()
if self._dialect == SQLPARSE_DIALECT
else SqlFluffLineageAnalyzer(self._dialect)
)
self._stmt_holders = [analyzer.analyze(stmt) for stmt in self._stmt]
self._sql_holder = SQLLineageHolder.of(*self._stmt_holders)
self._evaluated = True