Edit on GitHub

src.db.utils.wg

Функционал для работы с БД. Работа с конфигурациями

  1"""Функционал для работы с БД. Работа с конфигурациями"""
  2
  3import logging
  4
  5from sqlalchemy import and_, delete, insert, select, update
  6from sqlalchemy.orm import joinedload
  7
  8from core.exceptions import DatabaseError, UniquenessError
  9from core.metric import async_speed_metric
 10from db.database import execute_query, iter_redis_keys
 11from db.models import FreezeSteps, UserData, WgConfig
 12from db.utils import get_user
 13from db.utils.redis import CashManager
 14
 15logger = logging.getLogger()
 16
 17
 18@async_speed_metric
 19async def get_cash_wg_configs(user_id):
 20    """Получает кэшированные конфигурации WireGuard для указанного пользователя.
 21
 22    Args:
 23        user_id (int): Идентификатор пользователя.
 24
 25    Returns:
 26        list[WgConfig]: Список конфигураций WireGuard, связанных с пользователем.
 27    """
 28    cash = CashManager(WgConfig)
 29
 30    wg_keys = await iter_redis_keys(f"data:{WgConfig.__tablename__}:*:{user_id}")
 31    async for wg_conf_key in wg_keys:
 32        cash.cmd.hgetall(wg_conf_key)
 33
 34    return await cash()
 35
 36
 37@async_speed_metric
 38async def delete_cash_configs(user_id):
 39    """Удаляет кэшированные конфигурации WireGuard для указанного пользователя.
 40
 41    Args:
 42        user_id (int): Идентификатор пользователя.
 43    """
 44    rkeys = await iter_redis_keys(f"data:{WgConfig.__tablename__}:*:{user_id}")
 45    await CashManager(WgConfig).delete(
 46        *[key async for key in rkeys],
 47        fullkey=True,
 48    )
 49
 50
 51@async_speed_metric
 52async def get_wg_config(user_id, cfg_id: str):
 53    """Получает конфигурацию WireGuard по идентификатору для указанного пользователя.
 54
 55    Args:
 56        user_id (int): Идентификатор пользователя.
 57        cfg_id (str): Идентификатор конфигурации.
 58
 59    Returns:
 60        WgConfig: Конфигурация WireGuard, если найдена, иначе None.
 61    """
 62    wg_data: list[WgConfig] = await get_cash_wg_configs(user_id)
 63    if wg_data:
 64        for wg_conf in wg_data:
 65            if wg_conf.user_private_key[:4] == cfg_id:
 66                return wg_conf
 67
 68    query = select(WgConfig).where(
 69        and_(WgConfig.user_id == user_id, WgConfig.user_private_key.contains(cfg_id))
 70    )
 71
 72    return (await execute_query(query)).scalar_one_or_none()
 73
 74
 75@async_speed_metric
 76async def get_user_with_configs(user_id):
 77    """Получает данные пользователя вместе с его конфигурациями WireGuard.
 78
 79    Args:
 80        user_id (int): Идентификатор пользователя.
 81
 82    Returns:
 83        UserData: Данные пользователя, включая конфигурации WireGuard.
 84    """
 85    user_data: UserData = await get_user(user_id)
 86    wg_data: list[WgConfig] = await get_cash_wg_configs(user_id)
 87    if wg_data:
 88        if user_data is not None:
 89            user_data.configs = wg_data
 90
 91        return user_data
 92
 93    query = (
 94        select(UserData)
 95        .where(UserData.telegram_id == user_id)
 96        .options(joinedload(UserData.configs))
 97    )
 98    result: UserData = (await execute_query(query)).unique().scalar_one_or_none()
 99    if result:
100        await CashManager(WgConfig).add(
101            **{
102                f"{config.name}:{user_id}": config.__ustr_dict__
103                for config in result.configs
104            }
105        )
106
107    return result
108
109
110@async_speed_metric
111async def add_wg_config(conf: dict, user_id):
112    """Добавляет новую конфигурацию WireGuard для указанного пользователя.
113
114    Args:
115        conf (dict): Конфигурация WireGuard для добавления.
116        user_id (int): Идентификатор пользователя.
117
118    Returns:
119        WgConfig: Добавленная конфигурация WireGuard.
120
121    Raises:
122        DatabaseError: Если конфигурацию не удалось добавить из-за ошибки базы данных.
123    """
124    query = insert(WgConfig).values(**conf).returning(WgConfig)
125    for _ in range(10):
126        try:
127            result: WgConfig = (await execute_query(query)).scalar_one_or_none()
128        except UniquenessError:
129            result = None
130            continue
131        else:
132            break
133
134    await delete_cash_configs(user_id)
135
136    if not result:
137        raise DatabaseError
138
139    return result
140
141
142@async_speed_metric
143async def delete_wg_config(config: WgConfig):
144    """Удаляет конфигурацию WireGuard.
145
146    Args:
147        config (WgConfig): Конфигурация WireGuard.
148
149    Raises:
150        DatabaseError: Если конфигурацию не удалось удалить из-за ошибки базы данных.
151    """
152    query = delete(WgConfig).where(WgConfig.id == config.id)
153    await execute_query(query)
154
155    await delete_cash_configs(config.user_id)
156
157
158@async_speed_metric
159async def freeze_config(configs: list[WgConfig], freeze: FreezeSteps):
160    """Замораживает указанные конфигурации WireGuard.
161
162    Args:
163        configs (list[WgConfig]): Список конфигураций WireGuard для заморозки.
164        freeze (FreezeSteps): Шаг заморозки.
165    """
166    query = (
167        update(WgConfig)
168        .where(WgConfig.id.in_([config.id for config in configs]))
169        .values(freeze=freeze)
170        .returning(WgConfig)
171    )
172
173    result: list[WgConfig] = (await execute_query(query)).scalars().all()
174
175    updated_users = {cfg.user_id for cfg in result}
176    for user_id in updated_users:
177        await delete_cash_configs(user_id)
178
179
180@async_speed_metric
181async def get_all_wg_configs():
182    """Получает все конфигурации WireGuard.
183
184    Returns:
185        list[WgConfig]: Список всех конфигураций WireGuard.
186    """
187    query = select(WgConfig)
188
189    result: list[WgConfig] = (await execute_query(query, echo=False)).scalars().all()
190    return result
logger = <RootLogger root (DEBUG)>
@async_speed_metric
async def get_cash_wg_configs(user_id):
19@async_speed_metric
20async def get_cash_wg_configs(user_id):
21    """Получает кэшированные конфигурации WireGuard для указанного пользователя.
22
23    Args:
24        user_id (int): Идентификатор пользователя.
25
26    Returns:
27        list[WgConfig]: Список конфигураций WireGuard, связанных с пользователем.
28    """
29    cash = CashManager(WgConfig)
30
31    wg_keys = await iter_redis_keys(f"data:{WgConfig.__tablename__}:*:{user_id}")
32    async for wg_conf_key in wg_keys:
33        cash.cmd.hgetall(wg_conf_key)
34
35    return await cash()

Получает кэшированные конфигурации WireGuard для указанного пользователя.

Arguments:
  • user_id (int): Идентификатор пользователя.
Returns:

list[WgConfig]: Список конфигураций WireGuard, связанных с пользователем.

@async_speed_metric
async def delete_cash_configs(user_id):
38@async_speed_metric
39async def delete_cash_configs(user_id):
40    """Удаляет кэшированные конфигурации WireGuard для указанного пользователя.
41
42    Args:
43        user_id (int): Идентификатор пользователя.
44    """
45    rkeys = await iter_redis_keys(f"data:{WgConfig.__tablename__}:*:{user_id}")
46    await CashManager(WgConfig).delete(
47        *[key async for key in rkeys],
48        fullkey=True,
49    )

Удаляет кэшированные конфигурации WireGuard для указанного пользователя.

Arguments:
  • user_id (int): Идентификатор пользователя.
@async_speed_metric
async def get_wg_config(user_id, cfg_id: str):
52@async_speed_metric
53async def get_wg_config(user_id, cfg_id: str):
54    """Получает конфигурацию WireGuard по идентификатору для указанного пользователя.
55
56    Args:
57        user_id (int): Идентификатор пользователя.
58        cfg_id (str): Идентификатор конфигурации.
59
60    Returns:
61        WgConfig: Конфигурация WireGuard, если найдена, иначе None.
62    """
63    wg_data: list[WgConfig] = await get_cash_wg_configs(user_id)
64    if wg_data:
65        for wg_conf in wg_data:
66            if wg_conf.user_private_key[:4] == cfg_id:
67                return wg_conf
68
69    query = select(WgConfig).where(
70        and_(WgConfig.user_id == user_id, WgConfig.user_private_key.contains(cfg_id))
71    )
72
73    return (await execute_query(query)).scalar_one_or_none()

Получает конфигурацию WireGuard по идентификатору для указанного пользователя.

Arguments:
  • user_id (int): Идентификатор пользователя.
  • cfg_id (str): Идентификатор конфигурации.
Returns:

WgConfig: Конфигурация WireGuard, если найдена, иначе None.

@async_speed_metric
async def get_user_with_configs(user_id):
 76@async_speed_metric
 77async def get_user_with_configs(user_id):
 78    """Получает данные пользователя вместе с его конфигурациями WireGuard.
 79
 80    Args:
 81        user_id (int): Идентификатор пользователя.
 82
 83    Returns:
 84        UserData: Данные пользователя, включая конфигурации WireGuard.
 85    """
 86    user_data: UserData = await get_user(user_id)
 87    wg_data: list[WgConfig] = await get_cash_wg_configs(user_id)
 88    if wg_data:
 89        if user_data is not None:
 90            user_data.configs = wg_data
 91
 92        return user_data
 93
 94    query = (
 95        select(UserData)
 96        .where(UserData.telegram_id == user_id)
 97        .options(joinedload(UserData.configs))
 98    )
 99    result: UserData = (await execute_query(query)).unique().scalar_one_or_none()
100    if result:
101        await CashManager(WgConfig).add(
102            **{
103                f"{config.name}:{user_id}": config.__ustr_dict__
104                for config in result.configs
105            }
106        )
107
108    return result

Получает данные пользователя вместе с его конфигурациями WireGuard.

Arguments:
  • user_id (int): Идентификатор пользователя.
Returns:

UserData: Данные пользователя, включая конфигурации WireGuard.

@async_speed_metric
async def add_wg_config(conf: dict, user_id):
111@async_speed_metric
112async def add_wg_config(conf: dict, user_id):
113    """Добавляет новую конфигурацию WireGuard для указанного пользователя.
114
115    Args:
116        conf (dict): Конфигурация WireGuard для добавления.
117        user_id (int): Идентификатор пользователя.
118
119    Returns:
120        WgConfig: Добавленная конфигурация WireGuard.
121
122    Raises:
123        DatabaseError: Если конфигурацию не удалось добавить из-за ошибки базы данных.
124    """
125    query = insert(WgConfig).values(**conf).returning(WgConfig)
126    for _ in range(10):
127        try:
128            result: WgConfig = (await execute_query(query)).scalar_one_or_none()
129        except UniquenessError:
130            result = None
131            continue
132        else:
133            break
134
135    await delete_cash_configs(user_id)
136
137    if not result:
138        raise DatabaseError
139
140    return result

Добавляет новую конфигурацию WireGuard для указанного пользователя.

Arguments:
  • conf (dict): Конфигурация WireGuard для добавления.
  • user_id (int): Идентификатор пользователя.
Returns:

WgConfig: Добавленная конфигурация WireGuard.

Raises:
  • DatabaseError: Если конфигурацию не удалось добавить из-за ошибки базы данных.
@async_speed_metric
async def delete_wg_config(config: db.models.wg_config.WgConfig):
143@async_speed_metric
144async def delete_wg_config(config: WgConfig):
145    """Удаляет конфигурацию WireGuard.
146
147    Args:
148        config (WgConfig): Конфигурация WireGuard.
149
150    Raises:
151        DatabaseError: Если конфигурацию не удалось удалить из-за ошибки базы данных.
152    """
153    query = delete(WgConfig).where(WgConfig.id == config.id)
154    await execute_query(query)
155
156    await delete_cash_configs(config.user_id)

Удаляет конфигурацию WireGuard.

Arguments:
  • config (WgConfig): Конфигурация WireGuard.
Raises:
  • DatabaseError: Если конфигурацию не удалось удалить из-за ошибки базы данных.
@async_speed_metric
async def freeze_config( configs: list[db.models.wg_config.WgConfig], freeze: db.models.enums.FreezeSteps):
159@async_speed_metric
160async def freeze_config(configs: list[WgConfig], freeze: FreezeSteps):
161    """Замораживает указанные конфигурации WireGuard.
162
163    Args:
164        configs (list[WgConfig]): Список конфигураций WireGuard для заморозки.
165        freeze (FreezeSteps): Шаг заморозки.
166    """
167    query = (
168        update(WgConfig)
169        .where(WgConfig.id.in_([config.id for config in configs]))
170        .values(freeze=freeze)
171        .returning(WgConfig)
172    )
173
174    result: list[WgConfig] = (await execute_query(query)).scalars().all()
175
176    updated_users = {cfg.user_id for cfg in result}
177    for user_id in updated_users:
178        await delete_cash_configs(user_id)

Замораживает указанные конфигурации WireGuard.

Arguments:
  • configs (list[WgConfig]): Список конфигураций WireGuard для заморозки.
  • freeze (FreezeSteps): Шаг заморозки.
@async_speed_metric
async def get_all_wg_configs():
181@async_speed_metric
182async def get_all_wg_configs():
183    """Получает все конфигурации WireGuard.
184
185    Returns:
186        list[WgConfig]: Список всех конфигураций WireGuard.
187    """
188    query = select(WgConfig)
189
190    result: list[WgConfig] = (await execute_query(query, echo=False)).scalars().all()
191    return result

Получает все конфигурации WireGuard.

Returns:

list[WgConfig]: Список всех конфигураций WireGuard.