src.db.utils.user
Функционал для работы с БД. Работа с пользовательскими данными
1"""Функционал для работы с БД. Работа с пользовательскими данными""" 2 3import logging 4from datetime import datetime, timezone 5from uuid import uuid4 6 7from sqlalchemy import insert, not_, select, update 8 9from core.config import settings 10from core.metric import async_speed_metric 11from db.database import execute_query 12from db.models import Transactions, UserActivity, UserData 13from db.utils import delete_cash_transactions 14from db.utils.redis import CashManager 15 16logger = logging.getLogger() 17 18 19@async_speed_metric 20async def get_user(user_id): 21 """Получает данные пользователя по идентификатору. 22 23 Функция сначала пытается получить данные пользователя из кеша. 24 Если данных нет, выполняется выборка из базы данных. 25 26 Args: 27 user_id (str): Идентификатор пользователя. 28 29 Returns: 30 UserData: Объект данных пользователя или None, если пользователь не найден. 31 """ 32 result: UserData = await CashManager(UserData).get({user_id: ...}) 33 if result: 34 return result[0] 35 36 query = select(UserData).where(UserData.telegram_id == user_id) 37 result: UserData = (await execute_query(query)).scalar_one_or_none() 38 39 if result: 40 await CashManager(UserData).add({user_id: result.__ustr_dict__}) 41 42 return result 43 44 45@async_speed_metric 46async def get_all_userdata(user_id): 47 """Получает все данные пользователя (конфигурации, транзакции, отчеты, уведомления) по идентификатору. 48 49 Функция сначала пытается получить данные пользователя из кеша. 50 Если данных нет, выполняется выборка из базы данных. 51 52 Args: 53 user_id (str): Идентификатор пользователя. 54 55 Returns: 56 UserData(BaseModel): Объект данных пользователя или None, если пользователь не найден. 57 """ 58 query = select(UserData).where(UserData.telegram_id == user_id) 59 user: UserData = (await execute_query(query)).scalar_one_or_none() 60 if user: 61 all_userdata = UserData.ValidationSchemaExtended.model_validate( 62 user, from_attributes=True 63 ) 64 return all_userdata 65 return user 66 67 68@async_speed_metric 69async def add_user(user_id, user_name): 70 """Добавляет нового пользователя в базу данных. 71 72 Функция удаляет кешированные данные пользователя и добавляет его 73 в базу данных. 74 75 Args: 76 user_id (str): Идентификатор пользователя. 77 user_name (str): Имя пользователя. 78 79 Returns: 80 UserData: Объект данных пользователя, который был добавлен. 81 """ 82 await CashManager(UserData).delete(user_id) 83 84 user_data = dict(telegram_id=user_id, telegram_name=user_name) 85 86 query = insert(UserData).values(user_data).returning(UserData) 87 88 return (await execute_query(query)).scalar_one_or_none() 89 90 91@async_speed_metric 92async def freeze_user(user_id): 93 """Замораживает аккаунт пользователя. 94 95 Функция обновляет статус пользователя на "заморожен" и создает 96 транзакцию для списания средств. 97 98 Args: 99 user_id (str): Идентификатор пользователя. 100 101 Returns: 102 None 103 """ 104 await clear_cash(user_id) 105 106 query = ( 107 update(UserData) 108 .values(active=UserActivity.freezed) 109 .filter_by(telegram_id=user_id) 110 .returning(UserData) 111 ) 112 await execute_query(query) 113 114 user: UserData = (await execute_query(query)).scalar_one_or_none() 115 116 tax = -1 * user.stage * settings.cost 117 118 if tax != 0: 119 data = dict( 120 user_id=user.telegram_id, 121 date=datetime.now(timezone.utc), 122 amount=tax, 123 withdraw_amount=tax, 124 label=uuid4(), 125 transaction_id="Заморозка аккаунта", 126 transaction_reference="", 127 ) 128 129 query = insert(Transactions).values(data) 130 await execute_query(query) 131 132 await delete_cash_transactions(user.telegram_id) 133 134 135@async_speed_metric 136async def ban_user(user_id): 137 """Банит пользователя. 138 139 Функция обновляет статус пользователя на "заблокирован". 140 141 Args: 142 user_id (str): Идентификатор пользователя. 143 144 Returns: 145 None 146 """ 147 await clear_cash(user_id) 148 149 query = ( 150 update(UserData) 151 .values(active=UserActivity.banned) 152 .filter_by(telegram_id=user_id) 153 ) 154 await execute_query(query) 155 156 157@async_speed_metric 158async def recover_user(user_id): 159 """Восстанавливает аккаунт пользователя. 160 161 Функция обновляет статус пользователя на "неактивный". 162 163 Args: 164 user_id (str): Идентификатор пользователя. 165 166 Returns: 167 UserData: Объект данных пользователя, который был восстановлен. 168 """ 169 await clear_cash(user_id) 170 171 query = ( 172 update(UserData) 173 .values(active=UserActivity.inactive) 174 .filter_by(telegram_id=user_id) 175 .returning(UserData) 176 ) 177 result: UserData = (await execute_query(query)).scalar_one_or_none() 178 return result 179 180 181@async_speed_metric 182async def update_rate_user(user_id, stage, tax=0, trial=False): 183 """Обновляет тариф пользователя. 184 185 Функция обновляет стадию пользователя и создает транзакцию 186 для списания комиссии. 187 188 Args: 189 user_id (str): Идентификатор пользователя. 190 stage (float): Новая стадия пользователя. 191 tax (float): Комиссия за смену тарифа. По умолчанию 0. 192 trial (bool): Указывает, является ли это активацией пробного периода. По умолчанию False. 193 194 Returns: 195 UserData: Объект данных пользователя, который был обновлен. 196 """ 197 await clear_cash(user_id) 198 199 tax *= -1 200 tax_descr = "Комиссия за смену тарифа" 201 202 if trial: 203 tax_descr = "Активация пробного периода" 204 tax += 7 205 206 query = ( 207 update(UserData) 208 .values(stage=stage, free=False) 209 .filter_by(telegram_id=user_id) 210 .returning(UserData) 211 ) 212 213 user: UserData = (await execute_query(query)).scalar_one_or_none() 214 215 if tax != 0: 216 data = dict( 217 user_id=user.telegram_id, 218 date=datetime.now(timezone.utc), 219 amount=tax, 220 withdraw_amount=tax, 221 label=uuid4(), 222 transaction_id=tax_descr, 223 transaction_reference="", 224 ) 225 226 query = insert(Transactions).values(data) 227 await execute_query(query) 228 229 await delete_cash_transactions(user.telegram_id) 230 231 return user 232 233 234@async_speed_metric 235async def mute_user(user_id): 236 """Включает или выключает уведомления для пользователя. 237 238 Функция изменяет статус mute для пользователя. 239 240 Args: 241 user_id (str): Идентификатор пользователя. 242 243 Returns: 244 None 245 """ 246 await clear_cash(user_id) 247 248 query = ( 249 update(UserData).values(mute=not_(UserData.mute)).filter_by(telegram_id=user_id) 250 ) 251 await execute_query(query) 252 253 254@async_speed_metric 255async def clear_cash(user_id): 256 """Очищает кеш для пользователя. 257 258 Функция удаляет все кешированные данные для указанного пользователя. 259 260 Args: 261 user_id (str): Идентификатор пользователя. 262 263 Returns: 264 None 265 """ 266 await CashManager(None).clear(user_id)
20@async_speed_metric 21async def get_user(user_id): 22 """Получает данные пользователя по идентификатору. 23 24 Функция сначала пытается получить данные пользователя из кеша. 25 Если данных нет, выполняется выборка из базы данных. 26 27 Args: 28 user_id (str): Идентификатор пользователя. 29 30 Returns: 31 UserData: Объект данных пользователя или None, если пользователь не найден. 32 """ 33 result: UserData = await CashManager(UserData).get({user_id: ...}) 34 if result: 35 return result[0] 36 37 query = select(UserData).where(UserData.telegram_id == user_id) 38 result: UserData = (await execute_query(query)).scalar_one_or_none() 39 40 if result: 41 await CashManager(UserData).add({user_id: result.__ustr_dict__}) 42 43 return result
Получает данные пользователя по идентификатору.
Функция сначала пытается получить данные пользователя из кеша. Если данных нет, выполняется выборка из базы данных.
Arguments:
- user_id (str): Идентификатор пользователя.
Returns:
UserData: Объект данных пользователя или None, если пользователь не найден.
46@async_speed_metric 47async def get_all_userdata(user_id): 48 """Получает все данные пользователя (конфигурации, транзакции, отчеты, уведомления) по идентификатору. 49 50 Функция сначала пытается получить данные пользователя из кеша. 51 Если данных нет, выполняется выборка из базы данных. 52 53 Args: 54 user_id (str): Идентификатор пользователя. 55 56 Returns: 57 UserData(BaseModel): Объект данных пользователя или None, если пользователь не найден. 58 """ 59 query = select(UserData).where(UserData.telegram_id == user_id) 60 user: UserData = (await execute_query(query)).scalar_one_or_none() 61 if user: 62 all_userdata = UserData.ValidationSchemaExtended.model_validate( 63 user, from_attributes=True 64 ) 65 return all_userdata 66 return user
Получает все данные пользователя (конфигурации, транзакции, отчеты, уведомления) по идентификатору.
Функция сначала пытается получить данные пользователя из кеша. Если данных нет, выполняется выборка из базы данных.
Arguments:
- user_id (str): Идентификатор пользователя.
Returns:
UserData(BaseModel): Объект данных пользователя или None, если пользователь не найден.
69@async_speed_metric 70async def add_user(user_id, user_name): 71 """Добавляет нового пользователя в базу данных. 72 73 Функция удаляет кешированные данные пользователя и добавляет его 74 в базу данных. 75 76 Args: 77 user_id (str): Идентификатор пользователя. 78 user_name (str): Имя пользователя. 79 80 Returns: 81 UserData: Объект данных пользователя, который был добавлен. 82 """ 83 await CashManager(UserData).delete(user_id) 84 85 user_data = dict(telegram_id=user_id, telegram_name=user_name) 86 87 query = insert(UserData).values(user_data).returning(UserData) 88 89 return (await execute_query(query)).scalar_one_or_none()
Добавляет нового пользователя в базу данных.
Функция удаляет кешированные данные пользователя и добавляет его в базу данных.
Arguments:
- user_id (str): Идентификатор пользователя.
- user_name (str): Имя пользователя.
Returns:
UserData: Объект данных пользователя, который был добавлен.
92@async_speed_metric 93async def freeze_user(user_id): 94 """Замораживает аккаунт пользователя. 95 96 Функция обновляет статус пользователя на "заморожен" и создает 97 транзакцию для списания средств. 98 99 Args: 100 user_id (str): Идентификатор пользователя. 101 102 Returns: 103 None 104 """ 105 await clear_cash(user_id) 106 107 query = ( 108 update(UserData) 109 .values(active=UserActivity.freezed) 110 .filter_by(telegram_id=user_id) 111 .returning(UserData) 112 ) 113 await execute_query(query) 114 115 user: UserData = (await execute_query(query)).scalar_one_or_none() 116 117 tax = -1 * user.stage * settings.cost 118 119 if tax != 0: 120 data = dict( 121 user_id=user.telegram_id, 122 date=datetime.now(timezone.utc), 123 amount=tax, 124 withdraw_amount=tax, 125 label=uuid4(), 126 transaction_id="Заморозка аккаунта", 127 transaction_reference="", 128 ) 129 130 query = insert(Transactions).values(data) 131 await execute_query(query) 132 133 await delete_cash_transactions(user.telegram_id)
Замораживает аккаунт пользователя.
Функция обновляет статус пользователя на "заморожен" и создает транзакцию для списания средств.
Arguments:
- user_id (str): Идентификатор пользователя.
Returns:
None
136@async_speed_metric 137async def ban_user(user_id): 138 """Банит пользователя. 139 140 Функция обновляет статус пользователя на "заблокирован". 141 142 Args: 143 user_id (str): Идентификатор пользователя. 144 145 Returns: 146 None 147 """ 148 await clear_cash(user_id) 149 150 query = ( 151 update(UserData) 152 .values(active=UserActivity.banned) 153 .filter_by(telegram_id=user_id) 154 ) 155 await execute_query(query)
Банит пользователя.
Функция обновляет статус пользователя на "заблокирован".
Arguments:
- user_id (str): Идентификатор пользователя.
Returns:
None
158@async_speed_metric 159async def recover_user(user_id): 160 """Восстанавливает аккаунт пользователя. 161 162 Функция обновляет статус пользователя на "неактивный". 163 164 Args: 165 user_id (str): Идентификатор пользователя. 166 167 Returns: 168 UserData: Объект данных пользователя, который был восстановлен. 169 """ 170 await clear_cash(user_id) 171 172 query = ( 173 update(UserData) 174 .values(active=UserActivity.inactive) 175 .filter_by(telegram_id=user_id) 176 .returning(UserData) 177 ) 178 result: UserData = (await execute_query(query)).scalar_one_or_none() 179 return result
Восстанавливает аккаунт пользователя.
Функция обновляет статус пользователя на "неактивный".
Arguments:
- user_id (str): Идентификатор пользователя.
Returns:
UserData: Объект данных пользователя, который был восстановлен.
182@async_speed_metric 183async def update_rate_user(user_id, stage, tax=0, trial=False): 184 """Обновляет тариф пользователя. 185 186 Функция обновляет стадию пользователя и создает транзакцию 187 для списания комиссии. 188 189 Args: 190 user_id (str): Идентификатор пользователя. 191 stage (float): Новая стадия пользователя. 192 tax (float): Комиссия за смену тарифа. По умолчанию 0. 193 trial (bool): Указывает, является ли это активацией пробного периода. По умолчанию False. 194 195 Returns: 196 UserData: Объект данных пользователя, который был обновлен. 197 """ 198 await clear_cash(user_id) 199 200 tax *= -1 201 tax_descr = "Комиссия за смену тарифа" 202 203 if trial: 204 tax_descr = "Активация пробного периода" 205 tax += 7 206 207 query = ( 208 update(UserData) 209 .values(stage=stage, free=False) 210 .filter_by(telegram_id=user_id) 211 .returning(UserData) 212 ) 213 214 user: UserData = (await execute_query(query)).scalar_one_or_none() 215 216 if tax != 0: 217 data = dict( 218 user_id=user.telegram_id, 219 date=datetime.now(timezone.utc), 220 amount=tax, 221 withdraw_amount=tax, 222 label=uuid4(), 223 transaction_id=tax_descr, 224 transaction_reference="", 225 ) 226 227 query = insert(Transactions).values(data) 228 await execute_query(query) 229 230 await delete_cash_transactions(user.telegram_id) 231 232 return user
Обновляет тариф пользователя.
Функция обновляет стадию пользователя и создает транзакцию для списания комиссии.
Arguments:
- user_id (str): Идентификатор пользователя.
- stage (float): Новая стадия пользователя.
- tax (float): Комиссия за смену тарифа. По умолчанию 0.
- trial (bool): Указывает, является ли это активацией пробного периода. По умолчанию False.
Returns:
UserData: Объект данных пользователя, который был обновлен.
235@async_speed_metric 236async def mute_user(user_id): 237 """Включает или выключает уведомления для пользователя. 238 239 Функция изменяет статус mute для пользователя. 240 241 Args: 242 user_id (str): Идентификатор пользователя. 243 244 Returns: 245 None 246 """ 247 await clear_cash(user_id) 248 249 query = ( 250 update(UserData).values(mute=not_(UserData.mute)).filter_by(telegram_id=user_id) 251 ) 252 await execute_query(query)
Включает или выключает уведомления для пользователя.
Функция изменяет статус mute для пользователя.
Arguments:
- user_id (str): Идентификатор пользователя.
Returns:
None
255@async_speed_metric 256async def clear_cash(user_id): 257 """Очищает кеш для пользователя. 258 259 Функция удаляет все кешированные данные для указанного пользователя. 260 261 Args: 262 user_id (str): Идентификатор пользователя. 263 264 Returns: 265 None 266 """ 267 await CashManager(None).clear(user_id)
Очищает кеш для пользователя.
Функция удаляет все кешированные данные для указанного пользователя.
Arguments:
- user_id (str): Идентификатор пользователя.
Returns:
None