Edit on GitHub

src.db.database

Точки входа в БД

  1"""Точки входа в БД"""
  2
  3import logging
  4
  5from asyncpg.exceptions import UniqueViolationError
  6from redis.asyncio import ConnectionPool, Redis
  7from redis.asyncio.client import Pipeline
  8from sqlalchemy.exc import IntegrityError
  9from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine
 10
 11from core.config import settings
 12from core.err import redis_exceptor
 13from core.exceptions import DatabaseError, UniquenessError
 14
 15logger = logging.getLogger("sqlalchemy")
 16"""Logger: Логгер для SQLAlchemy."""
 17
 18rlogger = logging.getLogger("redis")
 19"""Logger: Логгер для Redis."""
 20
 21async_engine = create_async_engine(
 22    url=settings.DATABASE_URL,
 23    echo=False,  # Логирование всех выполняемых операций
 24    pool_size=10,  # Максимальное количество подключений к базе
 25    max_overflow=20,  # Количество дополнительных подключений при переполнении
 26)
 27"""AsyncEngine: Асинхронный движок SQLAlchemy для работы с базой данных."""
 28
 29async_session_factory = async_sessionmaker(async_engine)
 30"""async_sessionmaker: Фабрика асинхронных сессий для работы с базой данных."""
 31
 32__pool = ConnectionPool.from_url(settings.CASHBASE_URL, decode_responses=True)
 33"""ConnectionPool: Пул соединений для Redis."""
 34
 35redis_engine = Redis.from_pool(__pool)
 36"""Redis: Асинхронный клиент Redis, использующий пул соединений."""
 37
 38
 39async def execute_query(query, echo=True):
 40    """Выполняет SQL-запрос в асинхронной сессии.
 41
 42    Args:
 43        query (sqlalchemy.sql.expression): Запрос для выполнения.
 44        echo (bool): Если True, логирует запрос и параметры. По умолчанию True.
 45
 46    Returns:
 47        Result: Результат выполнения запроса.
 48
 49    Raises:
 50        UniquenessError: Если запись уже существует.
 51        DatabaseError: Если произошла ошибка при подключении к БД.
 52    """
 53    async with async_session_factory() as session:
 54        async with session.begin():
 55            try:
 56                if echo:
 57                    q = query.compile()
 58                    if q.params:
 59                        logger.info(f"{q}\nQuery PARAMS: {q.params}")
 60                    else:
 61                        logger.info(str(q))
 62                res = await session.execute(query)
 63                session.expunge_all()
 64                return res
 65            except (UniqueViolationError, IntegrityError):
 66                await session.rollback()
 67                logger.warning("Эта запись в базе уже существует")
 68                raise UniquenessError
 69            except Exception:
 70                await session.rollback()
 71                logger.exception(
 72                    "Ошибка при подключении к БД",
 73                    exc_info=getattr(query, "__dict__", query),
 74                )
 75                raise DatabaseError
 76
 77
 78@redis_exceptor
 79async def execute_redis_query(pipeline: Pipeline):
 80    """Выполняет запросы Redis в рамках асинхронного пайплайна.
 81
 82    Args:
 83        pipeline (Pipeline): Пайплайн Redis с командами для выполнения.
 84
 85    Returns:
 86        list: Результаты выполнения команд в пайплайне.
 87
 88    Raises:
 89        Exception: Если произошла ошибка при выполнении Redis-запроса.
 90    """
 91    if pipeline.command_stack:
 92        rlogger.info(f"TRYING TO REDIS QUERY :: {pipeline.command_stack}")
 93        async with pipeline as pipe:
 94            result = await pipe.execute()
 95            rlogger.info("REDIS QUERY COMPLETED")
 96            return result
 97    return []
 98
 99
100@redis_exceptor
101async def iter_redis_keys(key_pattern):
102    """Итерация по ключам Redis, соответствующим заданному шаблону.
103
104    Args:
105        key_pattern (str): Шаблон ключей для поиска.
106
107    Returns:
108        AsyncIterator: Итератор по найденным ключам.
109    """
110    return redis_engine.scan_iter(key_pattern)
logger = <Logger sqlalchemy (INFO)>

Logger: Логгер для SQLAlchemy.

rlogger = <Logger redis (INFO)>

Logger: Логгер для Redis.

async_engine = <sqlalchemy.ext.asyncio.engine.AsyncEngine object>

AsyncEngine: Асинхронный движок SQLAlchemy для работы с базой данных.

async_session_factory = async_sessionmaker(class_='AsyncSession', bind=<sqlalchemy.ext.asyncio.engine.AsyncEngine object>, autoflush=True, expire_on_commit=True)

async_sessionmaker: Фабрика асинхронных сессий для работы с базой данных.

redis_engine = <redis.asyncio.client.Redis(<redis.asyncio.connection.ConnectionPool(<redis.asyncio.connection.Connection(host=localhost,port=6379,db=0)>)>)>

Redis: Асинхронный клиент Redis, использующий пул соединений.

async def execute_query(query, echo=True):
40async def execute_query(query, echo=True):
41    """Выполняет SQL-запрос в асинхронной сессии.
42
43    Args:
44        query (sqlalchemy.sql.expression): Запрос для выполнения.
45        echo (bool): Если True, логирует запрос и параметры. По умолчанию True.
46
47    Returns:
48        Result: Результат выполнения запроса.
49
50    Raises:
51        UniquenessError: Если запись уже существует.
52        DatabaseError: Если произошла ошибка при подключении к БД.
53    """
54    async with async_session_factory() as session:
55        async with session.begin():
56            try:
57                if echo:
58                    q = query.compile()
59                    if q.params:
60                        logger.info(f"{q}\nQuery PARAMS: {q.params}")
61                    else:
62                        logger.info(str(q))
63                res = await session.execute(query)
64                session.expunge_all()
65                return res
66            except (UniqueViolationError, IntegrityError):
67                await session.rollback()
68                logger.warning("Эта запись в базе уже существует")
69                raise UniquenessError
70            except Exception:
71                await session.rollback()
72                logger.exception(
73                    "Ошибка при подключении к БД",
74                    exc_info=getattr(query, "__dict__", query),
75                )
76                raise DatabaseError

Выполняет SQL-запрос в асинхронной сессии.

Arguments:
  • query (sqlalchemy.sql.expression): Запрос для выполнения.
  • echo (bool): Если True, логирует запрос и параметры. По умолчанию True.
Returns:

Result: Результат выполнения запроса.

Raises:
  • UniquenessError: Если запись уже существует.
  • DatabaseError: Если произошла ошибка при подключении к БД.
@redis_exceptor
async def execute_redis_query(pipeline: redis.asyncio.client.Pipeline):
79@redis_exceptor
80async def execute_redis_query(pipeline: Pipeline):
81    """Выполняет запросы Redis в рамках асинхронного пайплайна.
82
83    Args:
84        pipeline (Pipeline): Пайплайн Redis с командами для выполнения.
85
86    Returns:
87        list: Результаты выполнения команд в пайплайне.
88
89    Raises:
90        Exception: Если произошла ошибка при выполнении Redis-запроса.
91    """
92    if pipeline.command_stack:
93        rlogger.info(f"TRYING TO REDIS QUERY :: {pipeline.command_stack}")
94        async with pipeline as pipe:
95            result = await pipe.execute()
96            rlogger.info("REDIS QUERY COMPLETED")
97            return result
98    return []

Выполняет запросы Redis в рамках асинхронного пайплайна.

Arguments:
  • pipeline (Pipeline): Пайплайн Redis с командами для выполнения.
Returns:

list: Результаты выполнения команд в пайплайне.

Raises:
  • Exception: Если произошла ошибка при выполнении Redis-запроса.
@redis_exceptor
async def iter_redis_keys(key_pattern):
101@redis_exceptor
102async def iter_redis_keys(key_pattern):
103    """Итерация по ключам Redis, соответствующим заданному шаблону.
104
105    Args:
106        key_pattern (str): Шаблон ключей для поиска.
107
108    Returns:
109        AsyncIterator: Итератор по найденным ключам.
110    """
111    return redis_engine.scan_iter(key_pattern)

Итерация по ключам Redis, соответствующим заданному шаблону.

Arguments:
  • key_pattern (str): Шаблон ключей для поиска.
Returns:

AsyncIterator: Итератор по найденным ключам.