src.db.utils.wg
Функционал для работы с БД. Работа с конфигурациями
1"""Функционал для работы с БД. Работа с конфигурациями""" 2 3import logging 4 5from sqlalchemy import and_, delete, insert, select, update 6from sqlalchemy.orm import joinedload 7 8from core.exceptions import DatabaseError, UniquenessError 9from core.metric import async_speed_metric 10from db.database import execute_query, iter_redis_keys 11from db.models import FreezeSteps, UserData, WgConfig 12from db.utils import get_user 13from db.utils.redis import CashManager 14 15logger = logging.getLogger() 16 17 18@async_speed_metric 19async def get_cash_wg_configs(user_id): 20 """Получает кэшированные конфигурации WireGuard для указанного пользователя. 21 22 Args: 23 user_id (int): Идентификатор пользователя. 24 25 Returns: 26 list[WgConfig]: Список конфигураций WireGuard, связанных с пользователем. 27 """ 28 cash = CashManager(WgConfig) 29 30 wg_keys = await iter_redis_keys(f"data:{WgConfig.__tablename__}:*:{user_id}") 31 async for wg_conf_key in wg_keys: 32 cash.cmd.hgetall(wg_conf_key) 33 34 return await cash() 35 36 37@async_speed_metric 38async def delete_cash_configs(user_id): 39 """Удаляет кэшированные конфигурации WireGuard для указанного пользователя. 40 41 Args: 42 user_id (int): Идентификатор пользователя. 43 """ 44 rkeys = await iter_redis_keys(f"data:{WgConfig.__tablename__}:*:{user_id}") 45 await CashManager(WgConfig).delete( 46 *[key async for key in rkeys], 47 fullkey=True, 48 ) 49 50 51@async_speed_metric 52async def get_wg_config(user_id, cfg_id: str): 53 """Получает конфигурацию WireGuard по идентификатору для указанного пользователя. 54 55 Args: 56 user_id (int): Идентификатор пользователя. 57 cfg_id (str): Идентификатор конфигурации. 58 59 Returns: 60 WgConfig: Конфигурация WireGuard, если найдена, иначе None. 61 """ 62 wg_data: list[WgConfig] = await get_cash_wg_configs(user_id) 63 if wg_data: 64 for wg_conf in wg_data: 65 if wg_conf.user_private_key[:4] == cfg_id: 66 return wg_conf 67 68 query = select(WgConfig).where( 69 and_(WgConfig.user_id == user_id, WgConfig.user_private_key.contains(cfg_id)) 70 ) 71 72 return (await execute_query(query)).scalar_one_or_none() 73 74 75@async_speed_metric 76async def get_user_with_configs(user_id): 77 """Получает данные пользователя вместе с его конфигурациями WireGuard. 78 79 Args: 80 user_id (int): Идентификатор пользователя. 81 82 Returns: 83 UserData: Данные пользователя, включая конфигурации WireGuard. 84 """ 85 user_data: UserData = await get_user(user_id) 86 wg_data: list[WgConfig] = await get_cash_wg_configs(user_id) 87 if wg_data: 88 if user_data is not None: 89 user_data.configs = wg_data 90 91 return user_data 92 93 query = ( 94 select(UserData) 95 .where(UserData.telegram_id == user_id) 96 .options(joinedload(UserData.configs)) 97 ) 98 result: UserData = (await execute_query(query)).unique().scalar_one_or_none() 99 if result: 100 await CashManager(WgConfig).add( 101 **{ 102 f"{config.name}:{user_id}": config.__ustr_dict__ 103 for config in result.configs 104 } 105 ) 106 107 return result 108 109 110@async_speed_metric 111async def add_wg_config(conf: dict, user_id): 112 """Добавляет новую конфигурацию WireGuard для указанного пользователя. 113 114 Args: 115 conf (dict): Конфигурация WireGuard для добавления. 116 user_id (int): Идентификатор пользователя. 117 118 Returns: 119 WgConfig: Добавленная конфигурация WireGuard. 120 121 Raises: 122 DatabaseError: Если конфигурацию не удалось добавить из-за ошибки базы данных. 123 """ 124 query = insert(WgConfig).values(**conf).returning(WgConfig) 125 for _ in range(10): 126 try: 127 result: WgConfig = (await execute_query(query)).scalar_one_or_none() 128 except UniquenessError: 129 result = None 130 continue 131 else: 132 break 133 134 await delete_cash_configs(user_id) 135 136 if not result: 137 raise DatabaseError 138 139 return result 140 141 142@async_speed_metric 143async def delete_wg_config(config: WgConfig): 144 """Удаляет конфигурацию WireGuard. 145 146 Args: 147 config (WgConfig): Конфигурация WireGuard. 148 149 Raises: 150 DatabaseError: Если конфигурацию не удалось удалить из-за ошибки базы данных. 151 """ 152 query = delete(WgConfig).where(WgConfig.id == config.id) 153 await execute_query(query) 154 155 await delete_cash_configs(config.user_id) 156 157 158@async_speed_metric 159async def freeze_config(configs: list[WgConfig], freeze: FreezeSteps): 160 """Замораживает указанные конфигурации WireGuard. 161 162 Args: 163 configs (list[WgConfig]): Список конфигураций WireGuard для заморозки. 164 freeze (FreezeSteps): Шаг заморозки. 165 """ 166 query = ( 167 update(WgConfig) 168 .where(WgConfig.id.in_([config.id for config in configs])) 169 .values(freeze=freeze) 170 .returning(WgConfig) 171 ) 172 173 result: list[WgConfig] = (await execute_query(query)).scalars().all() 174 175 updated_users = {cfg.user_id for cfg in result} 176 for user_id in updated_users: 177 await delete_cash_configs(user_id) 178 179 180@async_speed_metric 181async def get_all_wg_configs(): 182 """Получает все конфигурации WireGuard. 183 184 Returns: 185 list[WgConfig]: Список всех конфигураций WireGuard. 186 """ 187 query = select(WgConfig) 188 189 result: list[WgConfig] = (await execute_query(query, echo=False)).scalars().all() 190 return result
19@async_speed_metric 20async def get_cash_wg_configs(user_id): 21 """Получает кэшированные конфигурации WireGuard для указанного пользователя. 22 23 Args: 24 user_id (int): Идентификатор пользователя. 25 26 Returns: 27 list[WgConfig]: Список конфигураций WireGuard, связанных с пользователем. 28 """ 29 cash = CashManager(WgConfig) 30 31 wg_keys = await iter_redis_keys(f"data:{WgConfig.__tablename__}:*:{user_id}") 32 async for wg_conf_key in wg_keys: 33 cash.cmd.hgetall(wg_conf_key) 34 35 return await cash()
Получает кэшированные конфигурации WireGuard для указанного пользователя.
Arguments:
- user_id (int): Идентификатор пользователя.
Returns:
list[WgConfig]: Список конфигураций WireGuard, связанных с пользователем.
38@async_speed_metric 39async def delete_cash_configs(user_id): 40 """Удаляет кэшированные конфигурации WireGuard для указанного пользователя. 41 42 Args: 43 user_id (int): Идентификатор пользователя. 44 """ 45 rkeys = await iter_redis_keys(f"data:{WgConfig.__tablename__}:*:{user_id}") 46 await CashManager(WgConfig).delete( 47 *[key async for key in rkeys], 48 fullkey=True, 49 )
Удаляет кэшированные конфигурации WireGuard для указанного пользователя.
Arguments:
- user_id (int): Идентификатор пользователя.
52@async_speed_metric 53async def get_wg_config(user_id, cfg_id: str): 54 """Получает конфигурацию WireGuard по идентификатору для указанного пользователя. 55 56 Args: 57 user_id (int): Идентификатор пользователя. 58 cfg_id (str): Идентификатор конфигурации. 59 60 Returns: 61 WgConfig: Конфигурация WireGuard, если найдена, иначе None. 62 """ 63 wg_data: list[WgConfig] = await get_cash_wg_configs(user_id) 64 if wg_data: 65 for wg_conf in wg_data: 66 if wg_conf.user_private_key[:4] == cfg_id: 67 return wg_conf 68 69 query = select(WgConfig).where( 70 and_(WgConfig.user_id == user_id, WgConfig.user_private_key.contains(cfg_id)) 71 ) 72 73 return (await execute_query(query)).scalar_one_or_none()
Получает конфигурацию WireGuard по идентификатору для указанного пользователя.
Arguments:
- user_id (int): Идентификатор пользователя.
- cfg_id (str): Идентификатор конфигурации.
Returns:
WgConfig: Конфигурация WireGuard, если найдена, иначе None.
76@async_speed_metric 77async def get_user_with_configs(user_id): 78 """Получает данные пользователя вместе с его конфигурациями WireGuard. 79 80 Args: 81 user_id (int): Идентификатор пользователя. 82 83 Returns: 84 UserData: Данные пользователя, включая конфигурации WireGuard. 85 """ 86 user_data: UserData = await get_user(user_id) 87 wg_data: list[WgConfig] = await get_cash_wg_configs(user_id) 88 if wg_data: 89 if user_data is not None: 90 user_data.configs = wg_data 91 92 return user_data 93 94 query = ( 95 select(UserData) 96 .where(UserData.telegram_id == user_id) 97 .options(joinedload(UserData.configs)) 98 ) 99 result: UserData = (await execute_query(query)).unique().scalar_one_or_none() 100 if result: 101 await CashManager(WgConfig).add( 102 **{ 103 f"{config.name}:{user_id}": config.__ustr_dict__ 104 for config in result.configs 105 } 106 ) 107 108 return result
Получает данные пользователя вместе с его конфигурациями WireGuard.
Arguments:
- user_id (int): Идентификатор пользователя.
Returns:
UserData: Данные пользователя, включая конфигурации WireGuard.
111@async_speed_metric 112async def add_wg_config(conf: dict, user_id): 113 """Добавляет новую конфигурацию WireGuard для указанного пользователя. 114 115 Args: 116 conf (dict): Конфигурация WireGuard для добавления. 117 user_id (int): Идентификатор пользователя. 118 119 Returns: 120 WgConfig: Добавленная конфигурация WireGuard. 121 122 Raises: 123 DatabaseError: Если конфигурацию не удалось добавить из-за ошибки базы данных. 124 """ 125 query = insert(WgConfig).values(**conf).returning(WgConfig) 126 for _ in range(10): 127 try: 128 result: WgConfig = (await execute_query(query)).scalar_one_or_none() 129 except UniquenessError: 130 result = None 131 continue 132 else: 133 break 134 135 await delete_cash_configs(user_id) 136 137 if not result: 138 raise DatabaseError 139 140 return result
Добавляет новую конфигурацию WireGuard для указанного пользователя.
Arguments:
- conf (dict): Конфигурация WireGuard для добавления.
- user_id (int): Идентификатор пользователя.
Returns:
WgConfig: Добавленная конфигурация WireGuard.
Raises:
- DatabaseError: Если конфигурацию не удалось добавить из-за ошибки базы данных.
143@async_speed_metric 144async def delete_wg_config(config: WgConfig): 145 """Удаляет конфигурацию WireGuard. 146 147 Args: 148 config (WgConfig): Конфигурация WireGuard. 149 150 Raises: 151 DatabaseError: Если конфигурацию не удалось удалить из-за ошибки базы данных. 152 """ 153 query = delete(WgConfig).where(WgConfig.id == config.id) 154 await execute_query(query) 155 156 await delete_cash_configs(config.user_id)
Удаляет конфигурацию WireGuard.
Arguments:
- config (WgConfig): Конфигурация WireGuard.
Raises:
- DatabaseError: Если конфигурацию не удалось удалить из-за ошибки базы данных.
159@async_speed_metric 160async def freeze_config(configs: list[WgConfig], freeze: FreezeSteps): 161 """Замораживает указанные конфигурации WireGuard. 162 163 Args: 164 configs (list[WgConfig]): Список конфигураций WireGuard для заморозки. 165 freeze (FreezeSteps): Шаг заморозки. 166 """ 167 query = ( 168 update(WgConfig) 169 .where(WgConfig.id.in_([config.id for config in configs])) 170 .values(freeze=freeze) 171 .returning(WgConfig) 172 ) 173 174 result: list[WgConfig] = (await execute_query(query)).scalars().all() 175 176 updated_users = {cfg.user_id for cfg in result} 177 for user_id in updated_users: 178 await delete_cash_configs(user_id)
Замораживает указанные конфигурации WireGuard.
Arguments:
- configs (list[WgConfig]): Список конфигураций WireGuard для заморозки.
- freeze (FreezeSteps): Шаг заморозки.
181@async_speed_metric 182async def get_all_wg_configs(): 183 """Получает все конфигурации WireGuard. 184 185 Returns: 186 list[WgConfig]: Список всех конфигураций WireGuard. 187 """ 188 query = select(WgConfig) 189 190 result: list[WgConfig] = (await execute_query(query, echo=False)).scalars().all() 191 return result
Получает все конфигурации WireGuard.
Returns:
list[WgConfig]: Список всех конфигураций WireGuard.