src.db.database
Точки входа в БД
1"""Точки входа в БД""" 2 3import logging 4 5from asyncpg.exceptions import UniqueViolationError 6from redis.asyncio import ConnectionPool, Redis 7from redis.asyncio.client import Pipeline 8from sqlalchemy.exc import IntegrityError 9from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine 10 11from core.config import settings 12from core.err import redis_exceptor 13from core.exceptions import DatabaseError, UniquenessError 14 15logger = logging.getLogger("sqlalchemy") 16"""Logger: Логгер для SQLAlchemy.""" 17 18rlogger = logging.getLogger("redis") 19"""Logger: Логгер для Redis.""" 20 21async_engine = create_async_engine( 22 url=settings.DATABASE_URL, 23 echo=False, # Логирование всех выполняемых операций 24 pool_size=10, # Максимальное количество подключений к базе 25 max_overflow=20, # Количество дополнительных подключений при переполнении 26) 27"""AsyncEngine: Асинхронный движок SQLAlchemy для работы с базой данных.""" 28 29async_session_factory = async_sessionmaker(async_engine) 30"""async_sessionmaker: Фабрика асинхронных сессий для работы с базой данных.""" 31 32__pool = ConnectionPool.from_url(settings.CASHBASE_URL, decode_responses=True) 33"""ConnectionPool: Пул соединений для Redis.""" 34 35redis_engine = Redis.from_pool(__pool) 36"""Redis: Асинхронный клиент Redis, использующий пул соединений.""" 37 38 39async def execute_query(query, echo=True): 40 """Выполняет SQL-запрос в асинхронной сессии. 41 42 Args: 43 query (sqlalchemy.sql.expression): Запрос для выполнения. 44 echo (bool): Если True, логирует запрос и параметры. По умолчанию True. 45 46 Returns: 47 Result: Результат выполнения запроса. 48 49 Raises: 50 UniquenessError: Если запись уже существует. 51 DatabaseError: Если произошла ошибка при подключении к БД. 52 """ 53 async with async_session_factory() as session: 54 async with session.begin(): 55 try: 56 if echo: 57 q = query.compile() 58 if q.params: 59 logger.info(f"{q}\nQuery PARAMS: {q.params}") 60 else: 61 logger.info(str(q)) 62 res = await session.execute(query) 63 session.expunge_all() 64 return res 65 except (UniqueViolationError, IntegrityError): 66 await session.rollback() 67 logger.warning("Эта запись в базе уже существует") 68 raise UniquenessError 69 except Exception: 70 await session.rollback() 71 logger.exception( 72 "Ошибка при подключении к БД", 73 exc_info=getattr(query, "__dict__", query), 74 ) 75 raise DatabaseError 76 77 78@redis_exceptor 79async def execute_redis_query(pipeline: Pipeline): 80 """Выполняет запросы Redis в рамках асинхронного пайплайна. 81 82 Args: 83 pipeline (Pipeline): Пайплайн Redis с командами для выполнения. 84 85 Returns: 86 list: Результаты выполнения команд в пайплайне. 87 88 Raises: 89 Exception: Если произошла ошибка при выполнении Redis-запроса. 90 """ 91 if pipeline.command_stack: 92 rlogger.info(f"TRYING TO REDIS QUERY :: {pipeline.command_stack}") 93 async with pipeline as pipe: 94 result = await pipe.execute() 95 rlogger.info("REDIS QUERY COMPLETED") 96 return result 97 return [] 98 99 100@redis_exceptor 101async def iter_redis_keys(key_pattern): 102 """Итерация по ключам Redis, соответствующим заданному шаблону. 103 104 Args: 105 key_pattern (str): Шаблон ключей для поиска. 106 107 Returns: 108 AsyncIterator: Итератор по найденным ключам. 109 """ 110 return redis_engine.scan_iter(key_pattern)
logger =
<Logger sqlalchemy (INFO)>
Logger: Логгер для SQLAlchemy.
rlogger =
<Logger redis (INFO)>
Logger: Логгер для Redis.
async_engine =
<sqlalchemy.ext.asyncio.engine.AsyncEngine object>
AsyncEngine: Асинхронный движок SQLAlchemy для работы с базой данных.
async_session_factory =
async_sessionmaker(class_='AsyncSession', bind=<sqlalchemy.ext.asyncio.engine.AsyncEngine object>, autoflush=True, expire_on_commit=True)
async_sessionmaker: Фабрика асинхронных сессий для работы с базой данных.
redis_engine =
<redis.asyncio.client.Redis(<redis.asyncio.connection.ConnectionPool(<redis.asyncio.connection.Connection(host=localhost,port=6379,db=0)>)>)>
Redis: Асинхронный клиент Redis, использующий пул соединений.
async def
execute_query(query, echo=True):
40async def execute_query(query, echo=True): 41 """Выполняет SQL-запрос в асинхронной сессии. 42 43 Args: 44 query (sqlalchemy.sql.expression): Запрос для выполнения. 45 echo (bool): Если True, логирует запрос и параметры. По умолчанию True. 46 47 Returns: 48 Result: Результат выполнения запроса. 49 50 Raises: 51 UniquenessError: Если запись уже существует. 52 DatabaseError: Если произошла ошибка при подключении к БД. 53 """ 54 async with async_session_factory() as session: 55 async with session.begin(): 56 try: 57 if echo: 58 q = query.compile() 59 if q.params: 60 logger.info(f"{q}\nQuery PARAMS: {q.params}") 61 else: 62 logger.info(str(q)) 63 res = await session.execute(query) 64 session.expunge_all() 65 return res 66 except (UniqueViolationError, IntegrityError): 67 await session.rollback() 68 logger.warning("Эта запись в базе уже существует") 69 raise UniquenessError 70 except Exception: 71 await session.rollback() 72 logger.exception( 73 "Ошибка при подключении к БД", 74 exc_info=getattr(query, "__dict__", query), 75 ) 76 raise DatabaseError
Выполняет SQL-запрос в асинхронной сессии.
Arguments:
- query (sqlalchemy.sql.expression): Запрос для выполнения.
- echo (bool): Если True, логирует запрос и параметры. По умолчанию True.
Returns:
Result: Результат выполнения запроса.
Raises:
- UniquenessError: Если запись уже существует.
- DatabaseError: Если произошла ошибка при подключении к БД.
@redis_exceptor
async def
execute_redis_query(pipeline: redis.asyncio.client.Pipeline):
79@redis_exceptor 80async def execute_redis_query(pipeline: Pipeline): 81 """Выполняет запросы Redis в рамках асинхронного пайплайна. 82 83 Args: 84 pipeline (Pipeline): Пайплайн Redis с командами для выполнения. 85 86 Returns: 87 list: Результаты выполнения команд в пайплайне. 88 89 Raises: 90 Exception: Если произошла ошибка при выполнении Redis-запроса. 91 """ 92 if pipeline.command_stack: 93 rlogger.info(f"TRYING TO REDIS QUERY :: {pipeline.command_stack}") 94 async with pipeline as pipe: 95 result = await pipe.execute() 96 rlogger.info("REDIS QUERY COMPLETED") 97 return result 98 return []
Выполняет запросы Redis в рамках асинхронного пайплайна.
Arguments:
- pipeline (Pipeline): Пайплайн Redis с командами для выполнения.
Returns:
list: Результаты выполнения команд в пайплайне.
Raises:
- Exception: Если произошла ошибка при выполнении Redis-запроса.
@redis_exceptor
async def
iter_redis_keys(key_pattern):
101@redis_exceptor 102async def iter_redis_keys(key_pattern): 103 """Итерация по ключам Redis, соответствующим заданному шаблону. 104 105 Args: 106 key_pattern (str): Шаблон ключей для поиска. 107 108 Returns: 109 AsyncIterator: Итератор по найденным ключам. 110 """ 111 return redis_engine.scan_iter(key_pattern)
Итерация по ключам Redis, соответствующим заданному шаблону.
Arguments:
- key_pattern (str): Шаблон ключей для поиска.
Returns:
AsyncIterator: Итератор по найденным ключам.