Edit on GitHub

src.wg.connect

 1import asyncio
 2import logging
 3import logging.config
 4
 5import asyncssh
 6from asyncssh import SSHClientConnection
 7
 8from core.config import settings
 9
10logger = logging.getLogger("asyncssh")
11
12
13class WgConnection:
14    """Класс для управления SSH-соединением с сервером WireGuard.
15
16    Этот класс предоставляет методы для подключения к серверу WireGuard
17    с использованием SSH.
18    """
19
20    connection: SSHClientConnection
21
22    async def connect(self):
23        """Устанавливает соединение с сервером.
24
25        Эта функция создает асинхронное соединение с сервером WireGuard
26        и устанавливает интервал поддержания соединения.
27
28        Raises:
29            asyncssh.Error: Если возникла ошибка при установлении соединения.
30        """
31        await asyncio.wait([asyncio.create_task(self.__create_connection())])
32
33        self.connection.set_keepalive(interval=120)
34
35    async def __create_connection(self):
36        """Создает SSH-соединение с сервером WireGuard.
37
38        Эта функция выполняет фактическое создание SSH-соединения с использованием
39        параметров, заданных в конфигурации.
40
41        Raises:
42            asyncssh.Error: Если возникла ошибка при создании соединения.
43        """
44        self.connection = await asyncssh.connect(
45            settings.WG_HOST,
46            username=settings.WG_USER,
47            client_keys=settings.WG_KEY.get_secret_value(),
48            known_hosts=None,
49        )
logger = <Logger asyncssh (INFO)>
class WgConnection:
14class WgConnection:
15    """Класс для управления SSH-соединением с сервером WireGuard.
16
17    Этот класс предоставляет методы для подключения к серверу WireGuard
18    с использованием SSH.
19    """
20
21    connection: SSHClientConnection
22
23    async def connect(self):
24        """Устанавливает соединение с сервером.
25
26        Эта функция создает асинхронное соединение с сервером WireGuard
27        и устанавливает интервал поддержания соединения.
28
29        Raises:
30            asyncssh.Error: Если возникла ошибка при установлении соединения.
31        """
32        await asyncio.wait([asyncio.create_task(self.__create_connection())])
33
34        self.connection.set_keepalive(interval=120)
35
36    async def __create_connection(self):
37        """Создает SSH-соединение с сервером WireGuard.
38
39        Эта функция выполняет фактическое создание SSH-соединения с использованием
40        параметров, заданных в конфигурации.
41
42        Raises:
43            asyncssh.Error: Если возникла ошибка при создании соединения.
44        """
45        self.connection = await asyncssh.connect(
46            settings.WG_HOST,
47            username=settings.WG_USER,
48            client_keys=settings.WG_KEY.get_secret_value(),
49            known_hosts=None,
50        )

Класс для управления SSH-соединением с сервером WireGuard.

Этот класс предоставляет методы для подключения к серверу WireGuard с использованием SSH.

connection: asyncssh.connection.SSHClientConnection
async def connect(self):
23    async def connect(self):
24        """Устанавливает соединение с сервером.
25
26        Эта функция создает асинхронное соединение с сервером WireGuard
27        и устанавливает интервал поддержания соединения.
28
29        Raises:
30            asyncssh.Error: Если возникла ошибка при установлении соединения.
31        """
32        await asyncio.wait([asyncio.create_task(self.__create_connection())])
33
34        self.connection.set_keepalive(interval=120)

Устанавливает соединение с сервером.

Эта функция создает асинхронное соединение с сервером WireGuard и устанавливает интервал поддержания соединения.

Raises:
  • asyncssh.Error: Если возникла ошибка при установлении соединения.