Source code for sqllineage.core.metadata_provider

from abc import abstractmethod
from typing import Dict, List

from sqllineage.core.models import Column, Table


[docs] class MetaDataProvider: """ Base class used to provide metadata like table schema. When parse below sql: .. code-block:: sql INSERT INTO db1.table1 SELECT c1 FROM db2.table2 t2 JOIN db3.table3 t3 ON t2.id = t3.id Only by literal analysis, we don't know which table is selected column c1 from. A subclass of MetaDataProvider implementing _get_table_columns passing to :class:`sqllineage.runner.LineageRunner`. can help parse column lineage correctly. """ def __init__(self) -> None: self._session_metadata: Dict[str, List[str]] = {}
[docs] def get_table_columns(self, table: Table, **kwargs) -> List[Column]: """ return columns of given table. """ if (key := str(table)) in self._session_metadata: cols = self._session_metadata[key] else: cols = self._get_table_columns(str(table.schema), table.raw_name, **kwargs) columns = [] for col in cols: column = Column(col) column.parent = table columns.append(column) return columns
@abstractmethod def _get_table_columns(self, schema: str, table: str, **kwargs) -> List[str]: """To be implemented by subclasses."""
[docs] def register_session_metadata(self, table: Table, columns: List[Column]) -> None: """Register session-level metadata, like temporary table or view created.""" self._session_metadata[str(table)] = [c.raw_name for c in columns]
[docs] def deregister_session_metadata(self) -> None: """Deregister session-level metadata.""" self._session_metadata.clear()
def session(self): return MetaDataSession(self) def __bool__(self): """ bool value tells whether this provider is ready to provide metadata """ return True
class MetaDataSession: """ Create an analyzer session which can register session-level metadata as a supplement to global metadata. This way, table or views created during the session can be queried. All session-level metadata will be deregistered once session closed. """ def __init__(self, metadata_provider: MetaDataProvider): self.metadata_provider = metadata_provider def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): self.metadata_provider.deregister_session_metadata() def register_session_metadata(self, table: Table, columns: List[Column]) -> None: self.metadata_provider.register_session_metadata(table, columns)