Edit on GitHub

src.db.utils.transactions

Функционал для работы с БД. Работа с транзакциями

  1"""Функционал для работы с БД. Работа с транзакциями"""
  2
  3import logging
  4from datetime import datetime, timezone
  5from uuid import uuid4
  6
  7from sqlalchemy import and_, desc, insert, select, update
  8
  9from core.config import settings
 10from core.metric import async_speed_metric
 11from db.database import execute_query, iter_redis_keys
 12from db.models import Transactions, UserActivity, UserData
 13from db.utils.redis import CashManager
 14
 15logger = logging.getLogger()
 16
 17
 18@async_speed_metric
 19async def get_cash_wg_transactions(user_id):
 20    """Получает транзакции пользователя из кеша.
 21
 22    Функция ищет транзакции пользователя в Redis, используя ключи,
 23    соответствующие его идентификатору.
 24
 25    Args:
 26        user_id (str): Идентификатор пользователя.
 27
 28    Returns:
 29        list: Список транзакций пользователя.
 30    """
 31    cash = CashManager(Transactions)
 32
 33    tr_keys = await iter_redis_keys(f"data:{Transactions.__tablename__}:*:{user_id}")
 34    async for trans_key in tr_keys:
 35        cash.cmd.hgetall(trans_key)
 36
 37    return await cash()
 38
 39
 40@async_speed_metric
 41async def delete_cash_transactions(user_id):
 42    """Удаляет кешированные транзакции пользователя.
 43
 44    Функция удаляет все кешированные транзакции пользователя из Redis.
 45
 46    Args:
 47        user_id (str): Идентификатор пользователя.
 48
 49    Returns:
 50        None
 51    """
 52    rkeys = await iter_redis_keys(f"data:{Transactions.__tablename__}:*:{user_id}")
 53    await CashManager(Transactions).delete(*[key async for key in rkeys], fullkey=True)
 54
 55
 56@async_speed_metric
 57async def insert_transaction(conf: dict):
 58    """Вставляет новую транзакцию в базу данных.
 59
 60    Функция сначала удаляет кешированные транзакции пользователя,
 61    а затем добавляет новую транзакцию в базу данных.
 62
 63    Args:
 64        conf (dict): Словарь, содержащий данные транзакции.
 65
 66    Returns:
 67        None
 68    """
 69    await delete_cash_transactions(conf["user_id"])
 70
 71    query = insert(Transactions).values(**conf)
 72    await execute_query(query)
 73
 74
 75async def confirm_success_pay(transaction: Transactions):
 76    """Подтверждает успешную оплату транзакции.
 77
 78    Функция проверяет, существует ли предыдущая транзакция с таким же
 79    ярлыком, и либо создает новую транзакцию, либо обновляет существующую.
 80
 81    Args:
 82        transaction (Transactions): Объект транзакции.
 83
 84    Returns:
 85        Transactions: Объект транзакции, который был создан или обновлен.
 86    """
 87    query = select(Transactions).where(Transactions.label == transaction.label)
 88    prev_result: Transactions = (await execute_query(query)).scalars().first()
 89
 90    if getattr(prev_result, "transaction_id", None):
 91        query = (
 92            insert(Transactions)
 93            .values(
 94                user_id=prev_result.user_id,
 95                date=transaction.date,
 96                amount=transaction.amount,
 97                label=transaction.label,
 98                transaction_id=transaction.transaction_id,
 99                sha1_hash=transaction.sha1_hash,
100                sender=transaction.sender,
101                withdraw_amount=transaction.withdraw_amount,
102                transaction_reference=prev_result.transaction_reference,
103            )
104            .returning(Transactions)
105        )
106
107    else:
108        query = (
109            update(Transactions)
110            .values(
111                date=transaction.date,
112                amount=transaction.amount,
113                transaction_id=transaction.transaction_id,
114                sha1_hash=transaction.sha1_hash,
115                sender=transaction.sender,
116                withdraw_amount=transaction.withdraw_amount,
117            )
118            .filter_by(label=transaction.label)
119            .returning(Transactions)
120        )
121
122    result: Transactions = (await execute_query(query)).scalar_one_or_none()
123
124    if result:
125        await delete_cash_transactions(result.user_id)
126        await CashManager(UserData).delete(result.user_id)
127
128    return result
129
130
131async def close_free_trial(user_id):
132    """Закрывает бесплатный пробный период пользователя.
133
134    Функция обновляет тариф пользователя, переводя его в состояние
135    завершенного пробного периода.
136
137    Args:
138        user_id (str): Идентификатор пользователя.
139
140    Returns:
141        UserData: Объект данных пользователя, обновленный после завершения пробного периода.
142    """
143    query = (
144        update(UserData)
145        .values(stage=1)
146        .where(and_(UserData.telegram_id == user_id, UserData.stage == 0.3))
147        .returning(UserData)
148    )
149
150    result: UserData = (await execute_query(query)).scalar_one_or_none()
151    # NOTE не удаляется из кеша потому что кеш чистится после пополнения баланса
152    return result
153
154
155async def get_user_transactions(user_id):
156    """Получает транзакции пользователя.
157
158    Функция сначала пытается получить транзакции из кеша,
159    а если они отсутствуют, то извлекает их из базы данных.
160
161    Args:
162        user_id (str): Идентификатор пользователя.
163
164    Returns:
165        list: Список транзакций пользователя.
166    """
167    trans: list[Transactions] = await get_cash_wg_transactions(user_id)
168
169    if trans:
170        return sorted(trans, key=lambda x: x.date, reverse=True)
171
172    query = (
173        select(Transactions)
174        .where(Transactions.user_id == user_id)
175        .order_by(desc(Transactions.date))
176    )
177
178    result: list[Transactions] = (await execute_query(query)).scalars().all()
179
180    if result:
181        await CashManager(Transactions).add(
182            **{f"{trans.id}:{user_id}": trans.__ustr_dict__ for trans in result}
183        )
184
185    return result
186
187
188@async_speed_metric
189async def raise_money():
190    """Списывает деньги с активных пользователей.
191
192    Функция выбирает всех активных пользователей и списывает
193    с их баланса определенную сумму в соответствии с их стадией.
194
195    Returns:
196        None
197    """
198    query = select(UserData).filter_by(active=UserActivity.active)
199    users: list[UserData] = (await execute_query(query)).scalars().all()
200
201    data = [
202        dict(
203            user_id=user.telegram_id,
204            date=datetime.now(timezone.utc),
205            amount=-1 * user.stage * settings.cost,
206            withdraw_amount=-1 * user.stage * settings.cost,
207            label=uuid4(),
208            transaction_id="Ежедневное списание",
209            transaction_reference="",
210        )
211        for user in users
212    ]
213
214    if data:
215        query = insert(Transactions).values(data)
216        await execute_query(query)
217
218        for user in users:
219            await delete_cash_transactions(user.telegram_id)
220            await CashManager(UserData).delete(user.telegram_id)
logger = <RootLogger root (DEBUG)>
@async_speed_metric
async def get_cash_wg_transactions(user_id):
19@async_speed_metric
20async def get_cash_wg_transactions(user_id):
21    """Получает транзакции пользователя из кеша.
22
23    Функция ищет транзакции пользователя в Redis, используя ключи,
24    соответствующие его идентификатору.
25
26    Args:
27        user_id (str): Идентификатор пользователя.
28
29    Returns:
30        list: Список транзакций пользователя.
31    """
32    cash = CashManager(Transactions)
33
34    tr_keys = await iter_redis_keys(f"data:{Transactions.__tablename__}:*:{user_id}")
35    async for trans_key in tr_keys:
36        cash.cmd.hgetall(trans_key)
37
38    return await cash()

Получает транзакции пользователя из кеша.

Функция ищет транзакции пользователя в Redis, используя ключи, соответствующие его идентификатору.

Arguments:
  • user_id (str): Идентификатор пользователя.
Returns:

list: Список транзакций пользователя.

@async_speed_metric
async def delete_cash_transactions(user_id):
41@async_speed_metric
42async def delete_cash_transactions(user_id):
43    """Удаляет кешированные транзакции пользователя.
44
45    Функция удаляет все кешированные транзакции пользователя из Redis.
46
47    Args:
48        user_id (str): Идентификатор пользователя.
49
50    Returns:
51        None
52    """
53    rkeys = await iter_redis_keys(f"data:{Transactions.__tablename__}:*:{user_id}")
54    await CashManager(Transactions).delete(*[key async for key in rkeys], fullkey=True)

Удаляет кешированные транзакции пользователя.

Функция удаляет все кешированные транзакции пользователя из Redis.

Arguments:
  • user_id (str): Идентификатор пользователя.
Returns:

None

@async_speed_metric
async def insert_transaction(conf: dict):
57@async_speed_metric
58async def insert_transaction(conf: dict):
59    """Вставляет новую транзакцию в базу данных.
60
61    Функция сначала удаляет кешированные транзакции пользователя,
62    а затем добавляет новую транзакцию в базу данных.
63
64    Args:
65        conf (dict): Словарь, содержащий данные транзакции.
66
67    Returns:
68        None
69    """
70    await delete_cash_transactions(conf["user_id"])
71
72    query = insert(Transactions).values(**conf)
73    await execute_query(query)

Вставляет новую транзакцию в базу данных.

Функция сначала удаляет кешированные транзакции пользователя, а затем добавляет новую транзакцию в базу данных.

Arguments:
  • conf (dict): Словарь, содержащий данные транзакции.
Returns:

None

async def confirm_success_pay(transaction: db.models.transactions.Transactions):
 76async def confirm_success_pay(transaction: Transactions):
 77    """Подтверждает успешную оплату транзакции.
 78
 79    Функция проверяет, существует ли предыдущая транзакция с таким же
 80    ярлыком, и либо создает новую транзакцию, либо обновляет существующую.
 81
 82    Args:
 83        transaction (Transactions): Объект транзакции.
 84
 85    Returns:
 86        Transactions: Объект транзакции, который был создан или обновлен.
 87    """
 88    query = select(Transactions).where(Transactions.label == transaction.label)
 89    prev_result: Transactions = (await execute_query(query)).scalars().first()
 90
 91    if getattr(prev_result, "transaction_id", None):
 92        query = (
 93            insert(Transactions)
 94            .values(
 95                user_id=prev_result.user_id,
 96                date=transaction.date,
 97                amount=transaction.amount,
 98                label=transaction.label,
 99                transaction_id=transaction.transaction_id,
100                sha1_hash=transaction.sha1_hash,
101                sender=transaction.sender,
102                withdraw_amount=transaction.withdraw_amount,
103                transaction_reference=prev_result.transaction_reference,
104            )
105            .returning(Transactions)
106        )
107
108    else:
109        query = (
110            update(Transactions)
111            .values(
112                date=transaction.date,
113                amount=transaction.amount,
114                transaction_id=transaction.transaction_id,
115                sha1_hash=transaction.sha1_hash,
116                sender=transaction.sender,
117                withdraw_amount=transaction.withdraw_amount,
118            )
119            .filter_by(label=transaction.label)
120            .returning(Transactions)
121        )
122
123    result: Transactions = (await execute_query(query)).scalar_one_or_none()
124
125    if result:
126        await delete_cash_transactions(result.user_id)
127        await CashManager(UserData).delete(result.user_id)
128
129    return result

Подтверждает успешную оплату транзакции.

Функция проверяет, существует ли предыдущая транзакция с таким же ярлыком, и либо создает новую транзакцию, либо обновляет существующую.

Arguments:
  • transaction (Transactions): Объект транзакции.
Returns:

Transactions: Объект транзакции, который был создан или обновлен.

async def close_free_trial(user_id):
132async def close_free_trial(user_id):
133    """Закрывает бесплатный пробный период пользователя.
134
135    Функция обновляет тариф пользователя, переводя его в состояние
136    завершенного пробного периода.
137
138    Args:
139        user_id (str): Идентификатор пользователя.
140
141    Returns:
142        UserData: Объект данных пользователя, обновленный после завершения пробного периода.
143    """
144    query = (
145        update(UserData)
146        .values(stage=1)
147        .where(and_(UserData.telegram_id == user_id, UserData.stage == 0.3))
148        .returning(UserData)
149    )
150
151    result: UserData = (await execute_query(query)).scalar_one_or_none()
152    # NOTE не удаляется из кеша потому что кеш чистится после пополнения баланса
153    return result

Закрывает бесплатный пробный период пользователя.

Функция обновляет тариф пользователя, переводя его в состояние завершенного пробного периода.

Arguments:
  • user_id (str): Идентификатор пользователя.
Returns:

UserData: Объект данных пользователя, обновленный после завершения пробного периода.

async def get_user_transactions(user_id):
156async def get_user_transactions(user_id):
157    """Получает транзакции пользователя.
158
159    Функция сначала пытается получить транзакции из кеша,
160    а если они отсутствуют, то извлекает их из базы данных.
161
162    Args:
163        user_id (str): Идентификатор пользователя.
164
165    Returns:
166        list: Список транзакций пользователя.
167    """
168    trans: list[Transactions] = await get_cash_wg_transactions(user_id)
169
170    if trans:
171        return sorted(trans, key=lambda x: x.date, reverse=True)
172
173    query = (
174        select(Transactions)
175        .where(Transactions.user_id == user_id)
176        .order_by(desc(Transactions.date))
177    )
178
179    result: list[Transactions] = (await execute_query(query)).scalars().all()
180
181    if result:
182        await CashManager(Transactions).add(
183            **{f"{trans.id}:{user_id}": trans.__ustr_dict__ for trans in result}
184        )
185
186    return result

Получает транзакции пользователя.

Функция сначала пытается получить транзакции из кеша, а если они отсутствуют, то извлекает их из базы данных.

Arguments:
  • user_id (str): Идентификатор пользователя.
Returns:

list: Список транзакций пользователя.

@async_speed_metric
async def raise_money():
189@async_speed_metric
190async def raise_money():
191    """Списывает деньги с активных пользователей.
192
193    Функция выбирает всех активных пользователей и списывает
194    с их баланса определенную сумму в соответствии с их стадией.
195
196    Returns:
197        None
198    """
199    query = select(UserData).filter_by(active=UserActivity.active)
200    users: list[UserData] = (await execute_query(query)).scalars().all()
201
202    data = [
203        dict(
204            user_id=user.telegram_id,
205            date=datetime.now(timezone.utc),
206            amount=-1 * user.stage * settings.cost,
207            withdraw_amount=-1 * user.stage * settings.cost,
208            label=uuid4(),
209            transaction_id="Ежедневное списание",
210            transaction_reference="",
211        )
212        for user in users
213    ]
214
215    if data:
216        query = insert(Transactions).values(data)
217        await execute_query(query)
218
219        for user in users:
220            await delete_cash_transactions(user.telegram_id)
221            await CashManager(UserData).delete(user.telegram_id)

Списывает деньги с активных пользователей.

Функция выбирает всех активных пользователей и списывает с их баланса определенную сумму в соответствии с их стадией.

Returns:

None