src.db.utils.transactions
Функционал для работы с БД. Работа с транзакциями
1"""Функционал для работы с БД. Работа с транзакциями""" 2 3import logging 4from datetime import datetime, timezone 5from uuid import uuid4 6 7from sqlalchemy import and_, desc, insert, select, update 8 9from core.config import settings 10from core.metric import async_speed_metric 11from db.database import execute_query, iter_redis_keys 12from db.models import Transactions, UserActivity, UserData 13from db.utils.redis import CashManager 14 15logger = logging.getLogger() 16 17 18@async_speed_metric 19async def get_cash_wg_transactions(user_id): 20 """Получает транзакции пользователя из кеша. 21 22 Функция ищет транзакции пользователя в Redis, используя ключи, 23 соответствующие его идентификатору. 24 25 Args: 26 user_id (str): Идентификатор пользователя. 27 28 Returns: 29 list: Список транзакций пользователя. 30 """ 31 cash = CashManager(Transactions) 32 33 tr_keys = await iter_redis_keys(f"data:{Transactions.__tablename__}:*:{user_id}") 34 async for trans_key in tr_keys: 35 cash.cmd.hgetall(trans_key) 36 37 return await cash() 38 39 40@async_speed_metric 41async def delete_cash_transactions(user_id): 42 """Удаляет кешированные транзакции пользователя. 43 44 Функция удаляет все кешированные транзакции пользователя из Redis. 45 46 Args: 47 user_id (str): Идентификатор пользователя. 48 49 Returns: 50 None 51 """ 52 rkeys = await iter_redis_keys(f"data:{Transactions.__tablename__}:*:{user_id}") 53 await CashManager(Transactions).delete(*[key async for key in rkeys], fullkey=True) 54 55 56@async_speed_metric 57async def insert_transaction(conf: dict): 58 """Вставляет новую транзакцию в базу данных. 59 60 Функция сначала удаляет кешированные транзакции пользователя, 61 а затем добавляет новую транзакцию в базу данных. 62 63 Args: 64 conf (dict): Словарь, содержащий данные транзакции. 65 66 Returns: 67 None 68 """ 69 await delete_cash_transactions(conf["user_id"]) 70 71 query = insert(Transactions).values(**conf) 72 await execute_query(query) 73 74 75async def confirm_success_pay(transaction: Transactions): 76 """Подтверждает успешную оплату транзакции. 77 78 Функция проверяет, существует ли предыдущая транзакция с таким же 79 ярлыком, и либо создает новую транзакцию, либо обновляет существующую. 80 81 Args: 82 transaction (Transactions): Объект транзакции. 83 84 Returns: 85 Transactions: Объект транзакции, который был создан или обновлен. 86 """ 87 query = select(Transactions).where(Transactions.label == transaction.label) 88 prev_result: Transactions = (await execute_query(query)).scalars().first() 89 90 if getattr(prev_result, "transaction_id", None): 91 query = ( 92 insert(Transactions) 93 .values( 94 user_id=prev_result.user_id, 95 date=transaction.date, 96 amount=transaction.amount, 97 label=transaction.label, 98 transaction_id=transaction.transaction_id, 99 sha1_hash=transaction.sha1_hash, 100 sender=transaction.sender, 101 withdraw_amount=transaction.withdraw_amount, 102 transaction_reference=prev_result.transaction_reference, 103 ) 104 .returning(Transactions) 105 ) 106 107 else: 108 query = ( 109 update(Transactions) 110 .values( 111 date=transaction.date, 112 amount=transaction.amount, 113 transaction_id=transaction.transaction_id, 114 sha1_hash=transaction.sha1_hash, 115 sender=transaction.sender, 116 withdraw_amount=transaction.withdraw_amount, 117 ) 118 .filter_by(label=transaction.label) 119 .returning(Transactions) 120 ) 121 122 result: Transactions = (await execute_query(query)).scalar_one_or_none() 123 124 if result: 125 await delete_cash_transactions(result.user_id) 126 await CashManager(UserData).delete(result.user_id) 127 128 return result 129 130 131async def close_free_trial(user_id): 132 """Закрывает бесплатный пробный период пользователя. 133 134 Функция обновляет тариф пользователя, переводя его в состояние 135 завершенного пробного периода. 136 137 Args: 138 user_id (str): Идентификатор пользователя. 139 140 Returns: 141 UserData: Объект данных пользователя, обновленный после завершения пробного периода. 142 """ 143 query = ( 144 update(UserData) 145 .values(stage=1) 146 .where(and_(UserData.telegram_id == user_id, UserData.stage == 0.3)) 147 .returning(UserData) 148 ) 149 150 result: UserData = (await execute_query(query)).scalar_one_or_none() 151 # NOTE не удаляется из кеша потому что кеш чистится после пополнения баланса 152 return result 153 154 155async def get_user_transactions(user_id): 156 """Получает транзакции пользователя. 157 158 Функция сначала пытается получить транзакции из кеша, 159 а если они отсутствуют, то извлекает их из базы данных. 160 161 Args: 162 user_id (str): Идентификатор пользователя. 163 164 Returns: 165 list: Список транзакций пользователя. 166 """ 167 trans: list[Transactions] = await get_cash_wg_transactions(user_id) 168 169 if trans: 170 return sorted(trans, key=lambda x: x.date, reverse=True) 171 172 query = ( 173 select(Transactions) 174 .where(Transactions.user_id == user_id) 175 .order_by(desc(Transactions.date)) 176 ) 177 178 result: list[Transactions] = (await execute_query(query)).scalars().all() 179 180 if result: 181 await CashManager(Transactions).add( 182 **{f"{trans.id}:{user_id}": trans.__ustr_dict__ for trans in result} 183 ) 184 185 return result 186 187 188@async_speed_metric 189async def raise_money(): 190 """Списывает деньги с активных пользователей. 191 192 Функция выбирает всех активных пользователей и списывает 193 с их баланса определенную сумму в соответствии с их стадией. 194 195 Returns: 196 None 197 """ 198 query = select(UserData).filter_by(active=UserActivity.active) 199 users: list[UserData] = (await execute_query(query)).scalars().all() 200 201 data = [ 202 dict( 203 user_id=user.telegram_id, 204 date=datetime.now(timezone.utc), 205 amount=-1 * user.stage * settings.cost, 206 withdraw_amount=-1 * user.stage * settings.cost, 207 label=uuid4(), 208 transaction_id="Ежедневное списание", 209 transaction_reference="", 210 ) 211 for user in users 212 ] 213 214 if data: 215 query = insert(Transactions).values(data) 216 await execute_query(query) 217 218 for user in users: 219 await delete_cash_transactions(user.telegram_id) 220 await CashManager(UserData).delete(user.telegram_id)
19@async_speed_metric 20async def get_cash_wg_transactions(user_id): 21 """Получает транзакции пользователя из кеша. 22 23 Функция ищет транзакции пользователя в Redis, используя ключи, 24 соответствующие его идентификатору. 25 26 Args: 27 user_id (str): Идентификатор пользователя. 28 29 Returns: 30 list: Список транзакций пользователя. 31 """ 32 cash = CashManager(Transactions) 33 34 tr_keys = await iter_redis_keys(f"data:{Transactions.__tablename__}:*:{user_id}") 35 async for trans_key in tr_keys: 36 cash.cmd.hgetall(trans_key) 37 38 return await cash()
Получает транзакции пользователя из кеша.
Функция ищет транзакции пользователя в Redis, используя ключи, соответствующие его идентификатору.
Arguments:
- user_id (str): Идентификатор пользователя.
Returns:
list: Список транзакций пользователя.
41@async_speed_metric 42async def delete_cash_transactions(user_id): 43 """Удаляет кешированные транзакции пользователя. 44 45 Функция удаляет все кешированные транзакции пользователя из Redis. 46 47 Args: 48 user_id (str): Идентификатор пользователя. 49 50 Returns: 51 None 52 """ 53 rkeys = await iter_redis_keys(f"data:{Transactions.__tablename__}:*:{user_id}") 54 await CashManager(Transactions).delete(*[key async for key in rkeys], fullkey=True)
Удаляет кешированные транзакции пользователя.
Функция удаляет все кешированные транзакции пользователя из Redis.
Arguments:
- user_id (str): Идентификатор пользователя.
Returns:
None
57@async_speed_metric 58async def insert_transaction(conf: dict): 59 """Вставляет новую транзакцию в базу данных. 60 61 Функция сначала удаляет кешированные транзакции пользователя, 62 а затем добавляет новую транзакцию в базу данных. 63 64 Args: 65 conf (dict): Словарь, содержащий данные транзакции. 66 67 Returns: 68 None 69 """ 70 await delete_cash_transactions(conf["user_id"]) 71 72 query = insert(Transactions).values(**conf) 73 await execute_query(query)
Вставляет новую транзакцию в базу данных.
Функция сначала удаляет кешированные транзакции пользователя, а затем добавляет новую транзакцию в базу данных.
Arguments:
- conf (dict): Словарь, содержащий данные транзакции.
Returns:
None
76async def confirm_success_pay(transaction: Transactions): 77 """Подтверждает успешную оплату транзакции. 78 79 Функция проверяет, существует ли предыдущая транзакция с таким же 80 ярлыком, и либо создает новую транзакцию, либо обновляет существующую. 81 82 Args: 83 transaction (Transactions): Объект транзакции. 84 85 Returns: 86 Transactions: Объект транзакции, который был создан или обновлен. 87 """ 88 query = select(Transactions).where(Transactions.label == transaction.label) 89 prev_result: Transactions = (await execute_query(query)).scalars().first() 90 91 if getattr(prev_result, "transaction_id", None): 92 query = ( 93 insert(Transactions) 94 .values( 95 user_id=prev_result.user_id, 96 date=transaction.date, 97 amount=transaction.amount, 98 label=transaction.label, 99 transaction_id=transaction.transaction_id, 100 sha1_hash=transaction.sha1_hash, 101 sender=transaction.sender, 102 withdraw_amount=transaction.withdraw_amount, 103 transaction_reference=prev_result.transaction_reference, 104 ) 105 .returning(Transactions) 106 ) 107 108 else: 109 query = ( 110 update(Transactions) 111 .values( 112 date=transaction.date, 113 amount=transaction.amount, 114 transaction_id=transaction.transaction_id, 115 sha1_hash=transaction.sha1_hash, 116 sender=transaction.sender, 117 withdraw_amount=transaction.withdraw_amount, 118 ) 119 .filter_by(label=transaction.label) 120 .returning(Transactions) 121 ) 122 123 result: Transactions = (await execute_query(query)).scalar_one_or_none() 124 125 if result: 126 await delete_cash_transactions(result.user_id) 127 await CashManager(UserData).delete(result.user_id) 128 129 return result
Подтверждает успешную оплату транзакции.
Функция проверяет, существует ли предыдущая транзакция с таким же ярлыком, и либо создает новую транзакцию, либо обновляет существующую.
Arguments:
- transaction (Transactions): Объект транзакции.
Returns:
Transactions: Объект транзакции, который был создан или обновлен.
132async def close_free_trial(user_id): 133 """Закрывает бесплатный пробный период пользователя. 134 135 Функция обновляет тариф пользователя, переводя его в состояние 136 завершенного пробного периода. 137 138 Args: 139 user_id (str): Идентификатор пользователя. 140 141 Returns: 142 UserData: Объект данных пользователя, обновленный после завершения пробного периода. 143 """ 144 query = ( 145 update(UserData) 146 .values(stage=1) 147 .where(and_(UserData.telegram_id == user_id, UserData.stage == 0.3)) 148 .returning(UserData) 149 ) 150 151 result: UserData = (await execute_query(query)).scalar_one_or_none() 152 # NOTE не удаляется из кеша потому что кеш чистится после пополнения баланса 153 return result
Закрывает бесплатный пробный период пользователя.
Функция обновляет тариф пользователя, переводя его в состояние завершенного пробного периода.
Arguments:
- user_id (str): Идентификатор пользователя.
Returns:
UserData: Объект данных пользователя, обновленный после завершения пробного периода.
156async def get_user_transactions(user_id): 157 """Получает транзакции пользователя. 158 159 Функция сначала пытается получить транзакции из кеша, 160 а если они отсутствуют, то извлекает их из базы данных. 161 162 Args: 163 user_id (str): Идентификатор пользователя. 164 165 Returns: 166 list: Список транзакций пользователя. 167 """ 168 trans: list[Transactions] = await get_cash_wg_transactions(user_id) 169 170 if trans: 171 return sorted(trans, key=lambda x: x.date, reverse=True) 172 173 query = ( 174 select(Transactions) 175 .where(Transactions.user_id == user_id) 176 .order_by(desc(Transactions.date)) 177 ) 178 179 result: list[Transactions] = (await execute_query(query)).scalars().all() 180 181 if result: 182 await CashManager(Transactions).add( 183 **{f"{trans.id}:{user_id}": trans.__ustr_dict__ for trans in result} 184 ) 185 186 return result
Получает транзакции пользователя.
Функция сначала пытается получить транзакции из кеша, а если они отсутствуют, то извлекает их из базы данных.
Arguments:
- user_id (str): Идентификатор пользователя.
Returns:
list: Список транзакций пользователя.
189@async_speed_metric 190async def raise_money(): 191 """Списывает деньги с активных пользователей. 192 193 Функция выбирает всех активных пользователей и списывает 194 с их баланса определенную сумму в соответствии с их стадией. 195 196 Returns: 197 None 198 """ 199 query = select(UserData).filter_by(active=UserActivity.active) 200 users: list[UserData] = (await execute_query(query)).scalars().all() 201 202 data = [ 203 dict( 204 user_id=user.telegram_id, 205 date=datetime.now(timezone.utc), 206 amount=-1 * user.stage * settings.cost, 207 withdraw_amount=-1 * user.stage * settings.cost, 208 label=uuid4(), 209 transaction_id="Ежедневное списание", 210 transaction_reference="", 211 ) 212 for user in users 213 ] 214 215 if data: 216 query = insert(Transactions).values(data) 217 await execute_query(query) 218 219 for user in users: 220 await delete_cash_transactions(user.telegram_id) 221 await CashManager(UserData).delete(user.telegram_id)
Списывает деньги с активных пользователей.
Функция выбирает всех активных пользователей и списывает с их баланса определенную сумму в соответствии с их стадией.
Returns:
None