Edit on GitHub

src.db.utils.user

Функционал для работы с БД. Работа с пользовательскими данными

  1"""Функционал для работы с БД. Работа с пользовательскими данными"""
  2
  3import logging
  4from datetime import datetime, timezone
  5from uuid import uuid4
  6
  7from sqlalchemy import insert, not_, select, update
  8
  9from core.config import settings
 10from core.metric import async_speed_metric
 11from db.database import execute_query
 12from db.models import Transactions, UserActivity, UserData
 13from db.utils import delete_cash_transactions
 14from db.utils.redis import CashManager
 15
 16logger = logging.getLogger()
 17
 18
 19@async_speed_metric
 20async def get_user(user_id):
 21    """Получает данные пользователя по идентификатору.
 22
 23    Функция сначала пытается получить данные пользователя из кеша.
 24    Если данных нет, выполняется выборка из базы данных.
 25
 26    Args:
 27        user_id (str): Идентификатор пользователя.
 28
 29    Returns:
 30        UserData: Объект данных пользователя или None, если пользователь не найден.
 31    """
 32    result: UserData = await CashManager(UserData).get({user_id: ...})
 33    if result:
 34        return result[0]
 35
 36    query = select(UserData).where(UserData.telegram_id == user_id)
 37    result: UserData = (await execute_query(query)).scalar_one_or_none()
 38
 39    if result:
 40        await CashManager(UserData).add({user_id: result.__ustr_dict__})
 41
 42    return result
 43
 44
 45@async_speed_metric
 46async def get_all_userdata(user_id):
 47    """Получает все данные пользователя (конфигурации, транзакции, отчеты, уведомления) по идентификатору.
 48
 49    Функция сначала пытается получить данные пользователя из кеша.
 50    Если данных нет, выполняется выборка из базы данных.
 51
 52    Args:
 53        user_id (str): Идентификатор пользователя.
 54
 55    Returns:
 56        UserData(BaseModel): Объект данных пользователя или None, если пользователь не найден.
 57    """
 58    query = select(UserData).where(UserData.telegram_id == user_id)
 59    user: UserData = (await execute_query(query)).scalar_one_or_none()
 60    if user:
 61        all_userdata = UserData.ValidationSchemaExtended.model_validate(
 62            user, from_attributes=True
 63        )
 64        return all_userdata
 65    return user
 66
 67
 68@async_speed_metric
 69async def add_user(user_id, user_name):
 70    """Добавляет нового пользователя в базу данных.
 71
 72    Функция удаляет кешированные данные пользователя и добавляет его
 73    в базу данных.
 74
 75    Args:
 76        user_id (str): Идентификатор пользователя.
 77        user_name (str): Имя пользователя.
 78
 79    Returns:
 80        UserData: Объект данных пользователя, который был добавлен.
 81    """
 82    await CashManager(UserData).delete(user_id)
 83
 84    user_data = dict(telegram_id=user_id, telegram_name=user_name)
 85
 86    query = insert(UserData).values(user_data).returning(UserData)
 87
 88    return (await execute_query(query)).scalar_one_or_none()
 89
 90
 91@async_speed_metric
 92async def freeze_user(user_id):
 93    """Замораживает аккаунт пользователя.
 94
 95    Функция обновляет статус пользователя на "заморожен" и создает
 96    транзакцию для списания средств.
 97
 98    Args:
 99        user_id (str): Идентификатор пользователя.
100
101    Returns:
102        None
103    """
104    await clear_cash(user_id)
105
106    query = (
107        update(UserData)
108        .values(active=UserActivity.freezed)
109        .filter_by(telegram_id=user_id)
110        .returning(UserData)
111    )
112    await execute_query(query)
113
114    user: UserData = (await execute_query(query)).scalar_one_or_none()
115
116    tax = -1 * user.stage * settings.cost
117
118    if tax != 0:
119        data = dict(
120            user_id=user.telegram_id,
121            date=datetime.now(timezone.utc),
122            amount=tax,
123            withdraw_amount=tax,
124            label=uuid4(),
125            transaction_id="Заморозка аккаунта",
126            transaction_reference="",
127        )
128
129        query = insert(Transactions).values(data)
130        await execute_query(query)
131
132        await delete_cash_transactions(user.telegram_id)
133
134
135@async_speed_metric
136async def ban_user(user_id):
137    """Банит пользователя.
138
139    Функция обновляет статус пользователя на "заблокирован".
140
141    Args:
142        user_id (str): Идентификатор пользователя.
143
144    Returns:
145        None
146    """
147    await clear_cash(user_id)
148
149    query = (
150        update(UserData)
151        .values(active=UserActivity.banned)
152        .filter_by(telegram_id=user_id)
153    )
154    await execute_query(query)
155
156
157@async_speed_metric
158async def recover_user(user_id):
159    """Восстанавливает аккаунт пользователя.
160
161    Функция обновляет статус пользователя на "неактивный".
162
163    Args:
164        user_id (str): Идентификатор пользователя.
165
166    Returns:
167        UserData: Объект данных пользователя, который был восстановлен.
168    """
169    await clear_cash(user_id)
170
171    query = (
172        update(UserData)
173        .values(active=UserActivity.inactive)
174        .filter_by(telegram_id=user_id)
175        .returning(UserData)
176    )
177    result: UserData = (await execute_query(query)).scalar_one_or_none()
178    return result
179
180
181@async_speed_metric
182async def update_rate_user(user_id, stage, tax=0, trial=False):
183    """Обновляет тариф пользователя.
184
185    Функция обновляет стадию пользователя и создает транзакцию
186    для списания комиссии.
187
188    Args:
189        user_id (str): Идентификатор пользователя.
190        stage (float): Новая стадия пользователя.
191        tax (float): Комиссия за смену тарифа. По умолчанию 0.
192        trial (bool): Указывает, является ли это активацией пробного периода. По умолчанию False.
193
194    Returns:
195        UserData: Объект данных пользователя, который был обновлен.
196    """
197    await clear_cash(user_id)
198
199    tax *= -1
200    tax_descr = "Комиссия за смену тарифа"
201
202    if trial:
203        tax_descr = "Активация пробного периода"
204        tax += 7
205
206    query = (
207        update(UserData)
208        .values(stage=stage, free=False)
209        .filter_by(telegram_id=user_id)
210        .returning(UserData)
211    )
212
213    user: UserData = (await execute_query(query)).scalar_one_or_none()
214
215    if tax != 0:
216        data = dict(
217            user_id=user.telegram_id,
218            date=datetime.now(timezone.utc),
219            amount=tax,
220            withdraw_amount=tax,
221            label=uuid4(),
222            transaction_id=tax_descr,
223            transaction_reference="",
224        )
225
226        query = insert(Transactions).values(data)
227        await execute_query(query)
228
229        await delete_cash_transactions(user.telegram_id)
230
231    return user
232
233
234@async_speed_metric
235async def mute_user(user_id):
236    """Включает или выключает уведомления для пользователя.
237
238    Функция изменяет статус mute для пользователя.
239
240    Args:
241        user_id (str): Идентификатор пользователя.
242
243    Returns:
244        None
245    """
246    await clear_cash(user_id)
247
248    query = (
249        update(UserData).values(mute=not_(UserData.mute)).filter_by(telegram_id=user_id)
250    )
251    await execute_query(query)
252
253
254@async_speed_metric
255async def clear_cash(user_id):
256    """Очищает кеш для пользователя.
257
258    Функция удаляет все кешированные данные для указанного пользователя.
259
260    Args:
261        user_id (str): Идентификатор пользователя.
262
263    Returns:
264        None
265    """
266    await CashManager(None).clear(user_id)
logger = <RootLogger root (DEBUG)>
@async_speed_metric
async def get_user(user_id):
20@async_speed_metric
21async def get_user(user_id):
22    """Получает данные пользователя по идентификатору.
23
24    Функция сначала пытается получить данные пользователя из кеша.
25    Если данных нет, выполняется выборка из базы данных.
26
27    Args:
28        user_id (str): Идентификатор пользователя.
29
30    Returns:
31        UserData: Объект данных пользователя или None, если пользователь не найден.
32    """
33    result: UserData = await CashManager(UserData).get({user_id: ...})
34    if result:
35        return result[0]
36
37    query = select(UserData).where(UserData.telegram_id == user_id)
38    result: UserData = (await execute_query(query)).scalar_one_or_none()
39
40    if result:
41        await CashManager(UserData).add({user_id: result.__ustr_dict__})
42
43    return result

Получает данные пользователя по идентификатору.

Функция сначала пытается получить данные пользователя из кеша. Если данных нет, выполняется выборка из базы данных.

Arguments:
  • user_id (str): Идентификатор пользователя.
Returns:

UserData: Объект данных пользователя или None, если пользователь не найден.

@async_speed_metric
async def get_all_userdata(user_id):
46@async_speed_metric
47async def get_all_userdata(user_id):
48    """Получает все данные пользователя (конфигурации, транзакции, отчеты, уведомления) по идентификатору.
49
50    Функция сначала пытается получить данные пользователя из кеша.
51    Если данных нет, выполняется выборка из базы данных.
52
53    Args:
54        user_id (str): Идентификатор пользователя.
55
56    Returns:
57        UserData(BaseModel): Объект данных пользователя или None, если пользователь не найден.
58    """
59    query = select(UserData).where(UserData.telegram_id == user_id)
60    user: UserData = (await execute_query(query)).scalar_one_or_none()
61    if user:
62        all_userdata = UserData.ValidationSchemaExtended.model_validate(
63            user, from_attributes=True
64        )
65        return all_userdata
66    return user

Получает все данные пользователя (конфигурации, транзакции, отчеты, уведомления) по идентификатору.

Функция сначала пытается получить данные пользователя из кеша. Если данных нет, выполняется выборка из базы данных.

Arguments:
  • user_id (str): Идентификатор пользователя.
Returns:

UserData(BaseModel): Объект данных пользователя или None, если пользователь не найден.

@async_speed_metric
async def add_user(user_id, user_name):
69@async_speed_metric
70async def add_user(user_id, user_name):
71    """Добавляет нового пользователя в базу данных.
72
73    Функция удаляет кешированные данные пользователя и добавляет его
74    в базу данных.
75
76    Args:
77        user_id (str): Идентификатор пользователя.
78        user_name (str): Имя пользователя.
79
80    Returns:
81        UserData: Объект данных пользователя, который был добавлен.
82    """
83    await CashManager(UserData).delete(user_id)
84
85    user_data = dict(telegram_id=user_id, telegram_name=user_name)
86
87    query = insert(UserData).values(user_data).returning(UserData)
88
89    return (await execute_query(query)).scalar_one_or_none()

Добавляет нового пользователя в базу данных.

Функция удаляет кешированные данные пользователя и добавляет его в базу данных.

Arguments:
  • user_id (str): Идентификатор пользователя.
  • user_name (str): Имя пользователя.
Returns:

UserData: Объект данных пользователя, который был добавлен.

@async_speed_metric
async def freeze_user(user_id):
 92@async_speed_metric
 93async def freeze_user(user_id):
 94    """Замораживает аккаунт пользователя.
 95
 96    Функция обновляет статус пользователя на "заморожен" и создает
 97    транзакцию для списания средств.
 98
 99    Args:
100        user_id (str): Идентификатор пользователя.
101
102    Returns:
103        None
104    """
105    await clear_cash(user_id)
106
107    query = (
108        update(UserData)
109        .values(active=UserActivity.freezed)
110        .filter_by(telegram_id=user_id)
111        .returning(UserData)
112    )
113    await execute_query(query)
114
115    user: UserData = (await execute_query(query)).scalar_one_or_none()
116
117    tax = -1 * user.stage * settings.cost
118
119    if tax != 0:
120        data = dict(
121            user_id=user.telegram_id,
122            date=datetime.now(timezone.utc),
123            amount=tax,
124            withdraw_amount=tax,
125            label=uuid4(),
126            transaction_id="Заморозка аккаунта",
127            transaction_reference="",
128        )
129
130        query = insert(Transactions).values(data)
131        await execute_query(query)
132
133        await delete_cash_transactions(user.telegram_id)

Замораживает аккаунт пользователя.

Функция обновляет статус пользователя на "заморожен" и создает транзакцию для списания средств.

Arguments:
  • user_id (str): Идентификатор пользователя.
Returns:

None

@async_speed_metric
async def ban_user(user_id):
136@async_speed_metric
137async def ban_user(user_id):
138    """Банит пользователя.
139
140    Функция обновляет статус пользователя на "заблокирован".
141
142    Args:
143        user_id (str): Идентификатор пользователя.
144
145    Returns:
146        None
147    """
148    await clear_cash(user_id)
149
150    query = (
151        update(UserData)
152        .values(active=UserActivity.banned)
153        .filter_by(telegram_id=user_id)
154    )
155    await execute_query(query)

Банит пользователя.

Функция обновляет статус пользователя на "заблокирован".

Arguments:
  • user_id (str): Идентификатор пользователя.
Returns:

None

@async_speed_metric
async def recover_user(user_id):
158@async_speed_metric
159async def recover_user(user_id):
160    """Восстанавливает аккаунт пользователя.
161
162    Функция обновляет статус пользователя на "неактивный".
163
164    Args:
165        user_id (str): Идентификатор пользователя.
166
167    Returns:
168        UserData: Объект данных пользователя, который был восстановлен.
169    """
170    await clear_cash(user_id)
171
172    query = (
173        update(UserData)
174        .values(active=UserActivity.inactive)
175        .filter_by(telegram_id=user_id)
176        .returning(UserData)
177    )
178    result: UserData = (await execute_query(query)).scalar_one_or_none()
179    return result

Восстанавливает аккаунт пользователя.

Функция обновляет статус пользователя на "неактивный".

Arguments:
  • user_id (str): Идентификатор пользователя.
Returns:

UserData: Объект данных пользователя, который был восстановлен.

@async_speed_metric
async def update_rate_user(user_id, stage, tax=0, trial=False):
182@async_speed_metric
183async def update_rate_user(user_id, stage, tax=0, trial=False):
184    """Обновляет тариф пользователя.
185
186    Функция обновляет стадию пользователя и создает транзакцию
187    для списания комиссии.
188
189    Args:
190        user_id (str): Идентификатор пользователя.
191        stage (float): Новая стадия пользователя.
192        tax (float): Комиссия за смену тарифа. По умолчанию 0.
193        trial (bool): Указывает, является ли это активацией пробного периода. По умолчанию False.
194
195    Returns:
196        UserData: Объект данных пользователя, который был обновлен.
197    """
198    await clear_cash(user_id)
199
200    tax *= -1
201    tax_descr = "Комиссия за смену тарифа"
202
203    if trial:
204        tax_descr = "Активация пробного периода"
205        tax += 7
206
207    query = (
208        update(UserData)
209        .values(stage=stage, free=False)
210        .filter_by(telegram_id=user_id)
211        .returning(UserData)
212    )
213
214    user: UserData = (await execute_query(query)).scalar_one_or_none()
215
216    if tax != 0:
217        data = dict(
218            user_id=user.telegram_id,
219            date=datetime.now(timezone.utc),
220            amount=tax,
221            withdraw_amount=tax,
222            label=uuid4(),
223            transaction_id=tax_descr,
224            transaction_reference="",
225        )
226
227        query = insert(Transactions).values(data)
228        await execute_query(query)
229
230        await delete_cash_transactions(user.telegram_id)
231
232    return user

Обновляет тариф пользователя.

Функция обновляет стадию пользователя и создает транзакцию для списания комиссии.

Arguments:
  • user_id (str): Идентификатор пользователя.
  • stage (float): Новая стадия пользователя.
  • tax (float): Комиссия за смену тарифа. По умолчанию 0.
  • trial (bool): Указывает, является ли это активацией пробного периода. По умолчанию False.
Returns:

UserData: Объект данных пользователя, который был обновлен.

@async_speed_metric
async def mute_user(user_id):
235@async_speed_metric
236async def mute_user(user_id):
237    """Включает или выключает уведомления для пользователя.
238
239    Функция изменяет статус mute для пользователя.
240
241    Args:
242        user_id (str): Идентификатор пользователя.
243
244    Returns:
245        None
246    """
247    await clear_cash(user_id)
248
249    query = (
250        update(UserData).values(mute=not_(UserData.mute)).filter_by(telegram_id=user_id)
251    )
252    await execute_query(query)

Включает или выключает уведомления для пользователя.

Функция изменяет статус mute для пользователя.

Arguments:
  • user_id (str): Идентификатор пользователя.
Returns:

None

@async_speed_metric
async def clear_cash(user_id):
255@async_speed_metric
256async def clear_cash(user_id):
257    """Очищает кеш для пользователя.
258
259    Функция удаляет все кешированные данные для указанного пользователя.
260
261    Args:
262        user_id (str): Идентификатор пользователя.
263
264    Returns:
265        None
266    """
267    await CashManager(None).clear(user_id)

Очищает кеш для пользователя.

Функция удаляет все кешированные данные для указанного пользователя.

Arguments:
  • user_id (str): Идентификатор пользователя.
Returns:

None