src.wg.connect
1import asyncio 2import logging 3import logging.config 4 5import asyncssh 6from asyncssh import SSHClientConnection 7 8from core.config import settings 9 10logger = logging.getLogger("asyncssh") 11 12 13class WgConnection: 14 """Класс для управления SSH-соединением с сервером WireGuard. 15 16 Этот класс предоставляет методы для подключения к серверу WireGuard 17 с использованием SSH. 18 """ 19 20 connection: SSHClientConnection 21 22 async def connect(self): 23 """Устанавливает соединение с сервером. 24 25 Эта функция создает асинхронное соединение с сервером WireGuard 26 и устанавливает интервал поддержания соединения. 27 28 Raises: 29 asyncssh.Error: Если возникла ошибка при установлении соединения. 30 """ 31 await asyncio.wait([asyncio.create_task(self.__create_connection())]) 32 33 self.connection.set_keepalive(interval=120) 34 35 async def __create_connection(self): 36 """Создает SSH-соединение с сервером WireGuard. 37 38 Эта функция выполняет фактическое создание SSH-соединения с использованием 39 параметров, заданных в конфигурации. 40 41 Raises: 42 asyncssh.Error: Если возникла ошибка при создании соединения. 43 """ 44 self.connection = await asyncssh.connect( 45 settings.WG_HOST, 46 username=settings.WG_USER, 47 client_keys=settings.WG_KEY.get_secret_value(), 48 known_hosts=None, 49 )
logger =
<Logger asyncssh (INFO)>
class
WgConnection:
14class WgConnection: 15 """Класс для управления SSH-соединением с сервером WireGuard. 16 17 Этот класс предоставляет методы для подключения к серверу WireGuard 18 с использованием SSH. 19 """ 20 21 connection: SSHClientConnection 22 23 async def connect(self): 24 """Устанавливает соединение с сервером. 25 26 Эта функция создает асинхронное соединение с сервером WireGuard 27 и устанавливает интервал поддержания соединения. 28 29 Raises: 30 asyncssh.Error: Если возникла ошибка при установлении соединения. 31 """ 32 await asyncio.wait([asyncio.create_task(self.__create_connection())]) 33 34 self.connection.set_keepalive(interval=120) 35 36 async def __create_connection(self): 37 """Создает SSH-соединение с сервером WireGuard. 38 39 Эта функция выполняет фактическое создание SSH-соединения с использованием 40 параметров, заданных в конфигурации. 41 42 Raises: 43 asyncssh.Error: Если возникла ошибка при создании соединения. 44 """ 45 self.connection = await asyncssh.connect( 46 settings.WG_HOST, 47 username=settings.WG_USER, 48 client_keys=settings.WG_KEY.get_secret_value(), 49 known_hosts=None, 50 )
Класс для управления SSH-соединением с сервером WireGuard.
Этот класс предоставляет методы для подключения к серверу WireGuard с использованием SSH.
async def
connect(self):
23 async def connect(self): 24 """Устанавливает соединение с сервером. 25 26 Эта функция создает асинхронное соединение с сервером WireGuard 27 и устанавливает интервал поддержания соединения. 28 29 Raises: 30 asyncssh.Error: Если возникла ошибка при установлении соединения. 31 """ 32 await asyncio.wait([asyncio.create_task(self.__create_connection())]) 33 34 self.connection.set_keepalive(interval=120)
Устанавливает соединение с сервером.
Эта функция создает асинхронное соединение с сервером WireGuard и устанавливает интервал поддержания соединения.
Raises:
- asyncssh.Error: Если возникла ошибка при установлении соединения.