Source code for data_manipulation.sqlalchemy_

import sqlalchemy
from sqlalchemy.engine.url import URL
from sqlalchemy.engine.base import Engine
from sqlalchemy.exc import SQLAlchemyError
from typing import Optional, Dict, Any
from loguru import logger


[docs] class DatabaseConnectionError(Exception): """Custom exception for database connection errors.""" pass
[docs] def create_sqlalchemy_url( drivername: str, host: str, dbname: str, user: str, password: str, port: int = 3306, query: Optional[Dict[str, Any]] = None, ) -> URL: """Creates a SQLAlchemy URL object for database connection. Args: drivername (str): Database driver name. Supported options include: - 'mysql+mysqlconnector' - 'mysql+pymysql' - 'postgresql+psycopg2' - 'mssql+pyodbc' - 'oracle+cx_oracle' - 'sqlite3' host (str): Database server hostname or IP address dbname (str): Name of the target database user (str): Database username for authentication password (str): Database password for authentication port (int, optional): Database server port number. Defaults to 3306. query (Optional[Dict[str, Any]], optional): Additional connection parameters. Useful for SSL configuration. Defaults to None. Returns: sqlalchemy.engine.url.URL: Configured URL object for database connection Examples: >>> url = create_sqlalchemy_url( ... drivername='postgresql+psycopg2', ... host='localhost', ... dbname='mydb', ... user='admin', ... password='secret', ... port=5432 ... ) >>> str(url) 'postgresql+psycopg2://admin:secret@localhost:5432/mydb' """ return URL.create( drivername=drivername, username=user, password=password, host=host, port=port, database=dbname, query=query or {}, )
[docs] def create_sqlalchemy_engine( drivername: str, host: str, dbname: str, user: str, password: str, port: int = 3306, pool_size: int = 5, max_overflow: int = 10, pool_timeout: int = 30, connect_timeout: int = 10, ssl_ca: Optional[str] = None, ) -> Engine: """Creates and tests a SQLAlchemy engine for database operations. Args: drivername (str): Database driver name. Supported options include: - 'mysql+mysqlconnector' - 'mysql+pymysql' - 'postgresql+psycopg2' - 'mssql+pyodbc' - 'oracle+cx_oracle' - 'sqlite3' host (str): Database server hostname or IP address dbname (str): Name of the target database user (str): Database username for authentication password (str): Database password for authentication port (int, optional): Database server port number. Defaults to 3306. pool_size (int, optional): The size of the connection pool. Defaults to 5. max_overflow (int, optional): Maximum number of connections above pool_size. Defaults to 10. pool_timeout (int, optional): Timeout for getting a connection from pool. Defaults to 30. connect_timeout (int, optional): Timeout for database connections. Defaults to 10. ssl_ca (Optional[str], optional): Path to SSL CA certificate. Defaults to None. Returns: sqlalchemy.engine.base.Engine: Configured database engine object Raises: DatabaseConnectionError: If engine creation or connection test fails Examples: >>> engine = create_sqlalchemy_engine( ... drivername='postgresql+psycopg2', ... host='localhost', ... dbname='mydb', ... user='admin', ... password='secret', ... port=5432 ... ) # Logs "create_sqlalchemy_engine: True" on success # or "create_sqlalchemy_engine: False (error_message)" on failure Note: The function automatically tests the connection upon creation and logs the result using loguru. A successful connection will be logged as info, while failures will be logged as errors with the specific exception message. """ query_params = {} # Configure SSL if certificate provided if ssl_ca: query_params.update({"ssl_ca": ssl_ca, "ssl_verify_cert": "true"}) # Add connection timeout if "mysql" in drivername: query_params["connect_timeout"] = str(connect_timeout) elif "postgresql" in drivername: query_params["connect_timeout"] = str(connect_timeout) url = create_sqlalchemy_url( drivername=drivername, host=host, dbname=dbname, user=user, password=password, port=port, query=query_params, ) engine = sqlalchemy.create_engine( url, pool_size=pool_size, max_overflow=max_overflow, pool_timeout=pool_timeout, pool_pre_ping=True, # Enable connection health checks ) # Test connection try: with engine.connect() as conn: conn.execute(sqlalchemy.text("SELECT 1")) logger.info("Database connection established successfully") except SQLAlchemyError as e: error_msg = f"Failed to connect to database: {str(e)}" logger.error(error_msg) engine.dispose() # Clean up resources raise DatabaseConnectionError(error_msg) from e return engine
[docs] def dispose_engine(engine: Engine) -> None: """Safely dispose of the SQLAlchemy engine and its connection pool. Args: engine (Engine): The SQLAlchemy engine to dispose """ if engine: engine.dispose() logger.info("Database engine disposed successfully")
if __name__ == "__main__": import doctest doctest.testmod()