Source code for sqllineage.core.metadata.sqlalchemy

import logging
from typing import Any, Dict, List, Optional

from sqlalchemy import MetaData, Table, create_engine, make_url
from sqlalchemy.exc import NoSuchModuleError, NoSuchTableError, OperationalError

from sqllineage.core.metadata_provider import MetaDataProvider
from sqllineage.exceptions import MetaDataProviderException


logger = logging.getLogger(__name__)


[docs] class SQLAlchemyMetaDataProvider(MetaDataProvider): """ SQLAlchemyMetaDataProvider queries metadata from database using SQLAlchemy """ def __init__(self, url: str, engine_kwargs: Optional[Dict[str, Any]] = None): """ :param url: sqlalchemy url :param engine_kwargs: a dictionary of keyword arguments that will be passed to sqlalchemy create_engine """ super().__init__() self.metadata_obj = MetaData() try: if engine_kwargs is None: engine_kwargs = {} self.engine = create_engine(url, **engine_kwargs) except NoSuchModuleError as e: u = make_url(url) raise MetaDataProviderException( f"SQLAlchemy dialect driver {u.drivername} is not installed correctly" ) from e try: self.engine.connect() except OperationalError as e: raise MetaDataProviderException(f"Could not connect to {url}") from e def _get_table_columns(self, schema: str, table: str, **kwargs) -> List[str]: columns = [] try: sqlalchemy_table = Table( table, self.metadata_obj, schema=schema, autoload_with=self.engine ) columns = [c.name for c in sqlalchemy_table.columns] except (NoSuchTableError, OperationalError): logger.warning( "error listing columns for table %s.%s in %s, return empty list instead", schema, table, self.engine.url, ) return columns