Edit on GitHub

src.wg.utils

  1import asyncio
  2import logging
  3import logging.config
  4import os
  5import sys
  6from re import escape
  7from time import time
  8from typing import Literal
  9
 10import asyncssh
 11from asyncssh import SSHClientConnection
 12from pydantic import validate_call
 13
 14sys.path.insert(1, os.path.join("C:\\code\\vpn_dan_bot\\src"))
 15
 16from core.config import settings
 17from core.exceptions import WireguardError
 18from core.metric import async_speed_metric
 19from wg.connect import WgConnection
 20
 21logger = logging.getLogger("asyncssh")
 22
 23SSH = WgConnection()
 24
 25
 26class WgServerTools:
 27    """Класс для управления пирами и состоянием сервера WireGuard.
 28
 29    Этот класс предоставляет методы для добавления, блокировки и разблокировки пиров,
 30    а также для получения информации о состоянии сервера WireGuard.
 31    """
 32
 33    peer_counter = "~/Scripts/LastPeerAddress"
 34
 35    def __init__(self) -> None:
 36        """Инициализирует экземпляр WgServerTools.
 37
 38        Устанавливает значения для приватного и публичного ключей, а также счетчика пиров.
 39        """
 40        self.private_key: str = None
 41        self.server_public_key: str = None
 42        self.countpeers: int = None
 43
 44    async def create_peer(self, conn: SSHClientConnection):
 45        """Создает нового пира на сервере WireGuard.
 46
 47        Эта функция генерирует ключи для нового пира и добавляет его в конфигурацию сервера.
 48
 49        Args:
 50            conn (SSHClientConnection): Установленное SSH-соединение с сервером.
 51
 52        Raises:
 53            WireguardError: Если возникла ошибка при добавлении пира.
 54        """
 55        try:
 56            cmd = (
 57                "tmp_private_key=$(wg genkey)",
 58                "tmp_public_key=$(echo $tmp_private_key | wg pubkey)",
 59                "echo $tmp_private_key",
 60                "echo $tmp_public_key",
 61                # f'flock {self.peer_counter} --command \'printf "%d" "$(cat {self.peer_counter})"+1 > {self.peer_counter} && cat {self.peer_counter}\'',
 62                f"flock {self.peer_counter} --command '~/Scripts/IPgen.py --file={self.peer_counter} && cat {self.peer_counter}' ",
 63                f'tmp_allowed_ips="$(cat {self.peer_counter})/32"',
 64                f"echo {escape(settings.WG_PASS.get_secret_value())} | sudo -S ~/Scripts/pywg.py $tmp_public_key -ips=$tmp_allowed_ips --raises",
 65            )
 66            completed_proc = await conn.run("\n" + "\n".join(cmd), check=True)
 67            keys = completed_proc.stdout.strip("\n").split("\n")
 68            self.private_key, self.server_public_key, self.countpeers, *_ = keys
 69            logger.info(completed_proc.stderr)
 70
 71        except (OSError, asyncssh.Error) as e:
 72            logger.exception(
 73                "Сбой при добавлении пира в конфигурацию сервера wireguard"
 74            )
 75            raise WireguardError from e
 76
 77    async def change_peer(
 78        self, conn: SSHClientConnection, cmd: Literal["ban", "unban", "del"]
 79    ):
 80        """Блокирует или разблокирует пира на сервере WireGuard.
 81
 82        Args:
 83            conn (SSHClientConnection): Установленное SSH-соединение с сервером.
 84            cmd (Literal["ban", "unban", "del"]): Команда для изменения состояния пира.
 85
 86        Raises:
 87            WireguardError: Если возникла ошибка при изменении состояния пира.
 88        """
 89
 90        try:
 91            cmd = f"echo {escape(settings.WG_PASS.get_secret_value())} | sudo -S ~/Scripts/pywg.py -m {cmd} {self.server_public_key} --raises"
 92            completed_proc = await conn.run(f"\n{cmd}", check=True)
 93            logger.info(completed_proc.stderr)
 94
 95        except (OSError, asyncssh.Error) as e:
 96            logger.exception("Сбой при изменении пира в конфигурации сервера wireguard")
 97            raise WireguardError from e
 98
 99    def create_db_wg_model(self, user_id):
100        """Создает модель базы данных для нового пира.
101
102        Args:
103            user_id (int): Идентификатор пользователя.
104
105        Returns:
106            dict: Словарь с данными о пользователе и пире.
107        """
108        self.user_config = dict(
109            user_id=user_id,
110            user_private_key=self.private_key,
111            address=f"{self.countpeers}/32",
112            server_public_key=self.server_public_key,
113        )
114        return self.user_config
115
116    async def check_connection(self):
117        """Проверяет и устанавливает соединение с сервером.
118
119        Если соединение закрыто, то повторно подключается к серверу.
120
121        Returns:
122            SSHClientConnection: Установленное SSH-соединение.
123
124        Raises:
125            WireguardError: Если не удалось установить соединение.
126        """
127        try:
128            conn = SSH.connection
129        except AttributeError:
130            await SSH.connect()
131            conn = SSH.connection
132
133        if conn.is_closed():
134            await SSH.connect()
135            conn = SSH.connection
136            try:
137                assert conn is SSH.connection
138                assert not conn.is_closed()
139            except AssertionError as e:
140                raise WireguardError from e
141        return conn
142
143    @async_speed_metric
144    @validate_call
145    async def move_user(
146        self,
147        move: Literal["add", "ban", "unban", "del"],
148        user_id: int = None,
149        server_pubkey: str = None,
150    ):
151        """Добавляет, блокирует или разблокирует пира.
152
153        Args:
154            move (Literal["add", "ban", "unban", "del"]): Действие для выполнения.
155            user_id (int, optional): Идентификатор пользователя.
156            server_pubkey (str, optional): Публичный ключ пользователя на сервере.
157
158        Returns:
159            dict: Конфигурация пользователя, если действие - добавление.
160
161        Raises:
162            WireguardError: Если возникла ошибка при выполнении действия.
163        """
164        conn = await self.check_connection()
165
166        match move:
167            case "add":
168                await self.create_peer(conn)
169                usr_cfg = self.create_db_wg_model(user_id)
170                logger.info(f"{usr_cfg['address']=}")
171                return usr_cfg
172            case "ban" | "unban" | "del":
173                self.server_public_key = server_pubkey
174                await self.change_peer(conn, cmd=move)
175
176    async def get_peer_list(self):
177        """Получает список пиров на сервере WireGuard.
178
179        Returns:
180            list: Список пиров с их конфигурациями.
181
182        Raises:
183            WireguardError: Если возникла ошибка при получении списка пиров.
184        """
185        conn = await self.check_connection()
186        try:
187            cmd = f"echo {escape(settings.WG_PASS.get_secret_value())} | sudo -S ~/Scripts/pywg.py -l --raises"
188            completed_proc = await conn.run(f"\n{cmd}", check=True)
189            peers = completed_proc.stdout.strip("\n").split("[Peer]")
190
191            clean_peers = []
192            for peer in peers:
193                clean_peer = {
194                    line[: line.find("=")].strip(" ").lower(): line[
195                        line.find("=") + 1 :
196                    ].strip(" ")
197                    for line in peer.splitlines()
198                    if "=" in line
199                }
200
201                if clean_peer:
202                    if "ban" in peer.lower():
203                        clean_peer["ban"] = True
204                    else:
205                        clean_peer["ban"] = False
206
207                    clean_peers.append(clean_peer)
208
209        except (OSError, asyncssh.Error) as e:
210            logger.exception("Сбой при получении списка пиров wireguard")
211            raise WireguardError from e
212        else:
213            logger.info(f"Got {len(clean_peers)} peer's")
214
215            return clean_peers
216
217    async def get_server_status(self):
218        """Получает статус сервера WireGuard.
219
220        Returns:
221            str: Статус сервера ("active" или "inactive").
222
223        Raises:
224            WireguardError: Если возникла ошибка при получении статуса сервера.
225        """
226        conn = await self.check_connection()
227        try:
228            cmd = f"echo {escape(settings.WG_PASS.get_secret_value())} | sudo -S systemctl status wg-quick@wg1.service | grep Active:"
229            completed_proc = await conn.run(f"\n{cmd}", check=True)
230            _, status, *_ = completed_proc.stdout.strip("\n ").split()
231
232        except (OSError, asyncssh.Error):
233            logger.exception("Сбой при получении статуса сервера wireguard")
234            logger.info("Server status: inactive")
235            return "inactive"
236        else:
237            logger.info(f"Server status: {status}")
238            return status
239
240    async def get_server_cpu_usage(self):
241        """Получает загрузку CPU сервера WireGuard.
242
243        Returns:
244            str: Процент загрузки CPU.
245
246        Raises:
247            WireguardError: Если возникла ошибка при получении загрузки CPU.
248        """
249        conn = await self.check_connection()
250        try:
251            cmd = "top -bn2 | grep '%Cpu' | tail -1 | grep -P '(....|...) id,'|awk '{print 100-$8 \"%\"}'"
252            completed_proc = await conn.run(f"\n{cmd}", check=True)
253            usage = completed_proc.stdout.strip("\n ")
254
255        except (OSError, asyncssh.Error) as e:
256            logger.exception("Сбой при получении загруженности сервера wireguard")
257            logger.info("Server status: inactive")
258            raise WireguardError from e
259        else:
260            logger.info(f"Server cpu usage: {usage}")
261            return usage
262
263
264async def test_100():
265    """Тестирует производительность методов WgServerTools.
266
267    Выполняет несколько асинхронных вызовов для проверки производительности
268    методов получения загрузки CPU сервера.
269
270    Raises:
271        Exception: Если возникает ошибка при выполнении теста.
272    """
273    wg = WgServerTools()
274    start = time()
275
276    await SSH.connect()
277    coros = [wg.get_server_cpu_usage() for _ in range(1)]
278    coros_gen = time() - start
279
280    await asyncio.gather(*coros)
281
282    end = time() - start - coros_gen
283
284    print(f"{coros_gen=}  {end=}")
285
286
287if __name__ == "__main__":
288    logging.config.fileConfig("log.ini", disable_existing_loggers=False)
289
290    loop = asyncio.get_event_loop()
291    loop.run_until_complete(test_100())
logger = <Logger asyncssh (INFO)>
SSH = <wg.connect.WgConnection object>
class WgServerTools:
 27class WgServerTools:
 28    """Класс для управления пирами и состоянием сервера WireGuard.
 29
 30    Этот класс предоставляет методы для добавления, блокировки и разблокировки пиров,
 31    а также для получения информации о состоянии сервера WireGuard.
 32    """
 33
 34    peer_counter = "~/Scripts/LastPeerAddress"
 35
 36    def __init__(self) -> None:
 37        """Инициализирует экземпляр WgServerTools.
 38
 39        Устанавливает значения для приватного и публичного ключей, а также счетчика пиров.
 40        """
 41        self.private_key: str = None
 42        self.server_public_key: str = None
 43        self.countpeers: int = None
 44
 45    async def create_peer(self, conn: SSHClientConnection):
 46        """Создает нового пира на сервере WireGuard.
 47
 48        Эта функция генерирует ключи для нового пира и добавляет его в конфигурацию сервера.
 49
 50        Args:
 51            conn (SSHClientConnection): Установленное SSH-соединение с сервером.
 52
 53        Raises:
 54            WireguardError: Если возникла ошибка при добавлении пира.
 55        """
 56        try:
 57            cmd = (
 58                "tmp_private_key=$(wg genkey)",
 59                "tmp_public_key=$(echo $tmp_private_key | wg pubkey)",
 60                "echo $tmp_private_key",
 61                "echo $tmp_public_key",
 62                # f'flock {self.peer_counter} --command \'printf "%d" "$(cat {self.peer_counter})"+1 > {self.peer_counter} && cat {self.peer_counter}\'',
 63                f"flock {self.peer_counter} --command '~/Scripts/IPgen.py --file={self.peer_counter} && cat {self.peer_counter}' ",
 64                f'tmp_allowed_ips="$(cat {self.peer_counter})/32"',
 65                f"echo {escape(settings.WG_PASS.get_secret_value())} | sudo -S ~/Scripts/pywg.py $tmp_public_key -ips=$tmp_allowed_ips --raises",
 66            )
 67            completed_proc = await conn.run("\n" + "\n".join(cmd), check=True)
 68            keys = completed_proc.stdout.strip("\n").split("\n")
 69            self.private_key, self.server_public_key, self.countpeers, *_ = keys
 70            logger.info(completed_proc.stderr)
 71
 72        except (OSError, asyncssh.Error) as e:
 73            logger.exception(
 74                "Сбой при добавлении пира в конфигурацию сервера wireguard"
 75            )
 76            raise WireguardError from e
 77
 78    async def change_peer(
 79        self, conn: SSHClientConnection, cmd: Literal["ban", "unban", "del"]
 80    ):
 81        """Блокирует или разблокирует пира на сервере WireGuard.
 82
 83        Args:
 84            conn (SSHClientConnection): Установленное SSH-соединение с сервером.
 85            cmd (Literal["ban", "unban", "del"]): Команда для изменения состояния пира.
 86
 87        Raises:
 88            WireguardError: Если возникла ошибка при изменении состояния пира.
 89        """
 90
 91        try:
 92            cmd = f"echo {escape(settings.WG_PASS.get_secret_value())} | sudo -S ~/Scripts/pywg.py -m {cmd} {self.server_public_key} --raises"
 93            completed_proc = await conn.run(f"\n{cmd}", check=True)
 94            logger.info(completed_proc.stderr)
 95
 96        except (OSError, asyncssh.Error) as e:
 97            logger.exception("Сбой при изменении пира в конфигурации сервера wireguard")
 98            raise WireguardError from e
 99
100    def create_db_wg_model(self, user_id):
101        """Создает модель базы данных для нового пира.
102
103        Args:
104            user_id (int): Идентификатор пользователя.
105
106        Returns:
107            dict: Словарь с данными о пользователе и пире.
108        """
109        self.user_config = dict(
110            user_id=user_id,
111            user_private_key=self.private_key,
112            address=f"{self.countpeers}/32",
113            server_public_key=self.server_public_key,
114        )
115        return self.user_config
116
117    async def check_connection(self):
118        """Проверяет и устанавливает соединение с сервером.
119
120        Если соединение закрыто, то повторно подключается к серверу.
121
122        Returns:
123            SSHClientConnection: Установленное SSH-соединение.
124
125        Raises:
126            WireguardError: Если не удалось установить соединение.
127        """
128        try:
129            conn = SSH.connection
130        except AttributeError:
131            await SSH.connect()
132            conn = SSH.connection
133
134        if conn.is_closed():
135            await SSH.connect()
136            conn = SSH.connection
137            try:
138                assert conn is SSH.connection
139                assert not conn.is_closed()
140            except AssertionError as e:
141                raise WireguardError from e
142        return conn
143
144    @async_speed_metric
145    @validate_call
146    async def move_user(
147        self,
148        move: Literal["add", "ban", "unban", "del"],
149        user_id: int = None,
150        server_pubkey: str = None,
151    ):
152        """Добавляет, блокирует или разблокирует пира.
153
154        Args:
155            move (Literal["add", "ban", "unban", "del"]): Действие для выполнения.
156            user_id (int, optional): Идентификатор пользователя.
157            server_pubkey (str, optional): Публичный ключ пользователя на сервере.
158
159        Returns:
160            dict: Конфигурация пользователя, если действие - добавление.
161
162        Raises:
163            WireguardError: Если возникла ошибка при выполнении действия.
164        """
165        conn = await self.check_connection()
166
167        match move:
168            case "add":
169                await self.create_peer(conn)
170                usr_cfg = self.create_db_wg_model(user_id)
171                logger.info(f"{usr_cfg['address']=}")
172                return usr_cfg
173            case "ban" | "unban" | "del":
174                self.server_public_key = server_pubkey
175                await self.change_peer(conn, cmd=move)
176
177    async def get_peer_list(self):
178        """Получает список пиров на сервере WireGuard.
179
180        Returns:
181            list: Список пиров с их конфигурациями.
182
183        Raises:
184            WireguardError: Если возникла ошибка при получении списка пиров.
185        """
186        conn = await self.check_connection()
187        try:
188            cmd = f"echo {escape(settings.WG_PASS.get_secret_value())} | sudo -S ~/Scripts/pywg.py -l --raises"
189            completed_proc = await conn.run(f"\n{cmd}", check=True)
190            peers = completed_proc.stdout.strip("\n").split("[Peer]")
191
192            clean_peers = []
193            for peer in peers:
194                clean_peer = {
195                    line[: line.find("=")].strip(" ").lower(): line[
196                        line.find("=") + 1 :
197                    ].strip(" ")
198                    for line in peer.splitlines()
199                    if "=" in line
200                }
201
202                if clean_peer:
203                    if "ban" in peer.lower():
204                        clean_peer["ban"] = True
205                    else:
206                        clean_peer["ban"] = False
207
208                    clean_peers.append(clean_peer)
209
210        except (OSError, asyncssh.Error) as e:
211            logger.exception("Сбой при получении списка пиров wireguard")
212            raise WireguardError from e
213        else:
214            logger.info(f"Got {len(clean_peers)} peer's")
215
216            return clean_peers
217
218    async def get_server_status(self):
219        """Получает статус сервера WireGuard.
220
221        Returns:
222            str: Статус сервера ("active" или "inactive").
223
224        Raises:
225            WireguardError: Если возникла ошибка при получении статуса сервера.
226        """
227        conn = await self.check_connection()
228        try:
229            cmd = f"echo {escape(settings.WG_PASS.get_secret_value())} | sudo -S systemctl status wg-quick@wg1.service | grep Active:"
230            completed_proc = await conn.run(f"\n{cmd}", check=True)
231            _, status, *_ = completed_proc.stdout.strip("\n ").split()
232
233        except (OSError, asyncssh.Error):
234            logger.exception("Сбой при получении статуса сервера wireguard")
235            logger.info("Server status: inactive")
236            return "inactive"
237        else:
238            logger.info(f"Server status: {status}")
239            return status
240
241    async def get_server_cpu_usage(self):
242        """Получает загрузку CPU сервера WireGuard.
243
244        Returns:
245            str: Процент загрузки CPU.
246
247        Raises:
248            WireguardError: Если возникла ошибка при получении загрузки CPU.
249        """
250        conn = await self.check_connection()
251        try:
252            cmd = "top -bn2 | grep '%Cpu' | tail -1 | grep -P '(....|...) id,'|awk '{print 100-$8 \"%\"}'"
253            completed_proc = await conn.run(f"\n{cmd}", check=True)
254            usage = completed_proc.stdout.strip("\n ")
255
256        except (OSError, asyncssh.Error) as e:
257            logger.exception("Сбой при получении загруженности сервера wireguard")
258            logger.info("Server status: inactive")
259            raise WireguardError from e
260        else:
261            logger.info(f"Server cpu usage: {usage}")
262            return usage

Класс для управления пирами и состоянием сервера WireGuard.

Этот класс предоставляет методы для добавления, блокировки и разблокировки пиров, а также для получения информации о состоянии сервера WireGuard.

WgServerTools()
36    def __init__(self) -> None:
37        """Инициализирует экземпляр WgServerTools.
38
39        Устанавливает значения для приватного и публичного ключей, а также счетчика пиров.
40        """
41        self.private_key: str = None
42        self.server_public_key: str = None
43        self.countpeers: int = None

Инициализирует экземпляр WgServerTools.

Устанавливает значения для приватного и публичного ключей, а также счетчика пиров.

peer_counter = '~/Scripts/LastPeerAddress'
private_key: str
server_public_key: str
countpeers: int
async def create_peer(self, conn: asyncssh.connection.SSHClientConnection):
45    async def create_peer(self, conn: SSHClientConnection):
46        """Создает нового пира на сервере WireGuard.
47
48        Эта функция генерирует ключи для нового пира и добавляет его в конфигурацию сервера.
49
50        Args:
51            conn (SSHClientConnection): Установленное SSH-соединение с сервером.
52
53        Raises:
54            WireguardError: Если возникла ошибка при добавлении пира.
55        """
56        try:
57            cmd = (
58                "tmp_private_key=$(wg genkey)",
59                "tmp_public_key=$(echo $tmp_private_key | wg pubkey)",
60                "echo $tmp_private_key",
61                "echo $tmp_public_key",
62                # f'flock {self.peer_counter} --command \'printf "%d" "$(cat {self.peer_counter})"+1 > {self.peer_counter} && cat {self.peer_counter}\'',
63                f"flock {self.peer_counter} --command '~/Scripts/IPgen.py --file={self.peer_counter} && cat {self.peer_counter}' ",
64                f'tmp_allowed_ips="$(cat {self.peer_counter})/32"',
65                f"echo {escape(settings.WG_PASS.get_secret_value())} | sudo -S ~/Scripts/pywg.py $tmp_public_key -ips=$tmp_allowed_ips --raises",
66            )
67            completed_proc = await conn.run("\n" + "\n".join(cmd), check=True)
68            keys = completed_proc.stdout.strip("\n").split("\n")
69            self.private_key, self.server_public_key, self.countpeers, *_ = keys
70            logger.info(completed_proc.stderr)
71
72        except (OSError, asyncssh.Error) as e:
73            logger.exception(
74                "Сбой при добавлении пира в конфигурацию сервера wireguard"
75            )
76            raise WireguardError from e

Создает нового пира на сервере WireGuard.

Эта функция генерирует ключи для нового пира и добавляет его в конфигурацию сервера.

Arguments:
  • conn (SSHClientConnection): Установленное SSH-соединение с сервером.
Raises:
  • WireguardError: Если возникла ошибка при добавлении пира.
async def change_peer( self, conn: asyncssh.connection.SSHClientConnection, cmd: Literal['ban', 'unban', 'del']):
78    async def change_peer(
79        self, conn: SSHClientConnection, cmd: Literal["ban", "unban", "del"]
80    ):
81        """Блокирует или разблокирует пира на сервере WireGuard.
82
83        Args:
84            conn (SSHClientConnection): Установленное SSH-соединение с сервером.
85            cmd (Literal["ban", "unban", "del"]): Команда для изменения состояния пира.
86
87        Raises:
88            WireguardError: Если возникла ошибка при изменении состояния пира.
89        """
90
91        try:
92            cmd = f"echo {escape(settings.WG_PASS.get_secret_value())} | sudo -S ~/Scripts/pywg.py -m {cmd} {self.server_public_key} --raises"
93            completed_proc = await conn.run(f"\n{cmd}", check=True)
94            logger.info(completed_proc.stderr)
95
96        except (OSError, asyncssh.Error) as e:
97            logger.exception("Сбой при изменении пира в конфигурации сервера wireguard")
98            raise WireguardError from e

Блокирует или разблокирует пира на сервере WireGuard.

Arguments:
  • conn (SSHClientConnection): Установленное SSH-соединение с сервером.
  • cmd (Literal["ban", "unban", "del"]): Команда для изменения состояния пира.
Raises:
  • WireguardError: Если возникла ошибка при изменении состояния пира.
def create_db_wg_model(self, user_id):
100    def create_db_wg_model(self, user_id):
101        """Создает модель базы данных для нового пира.
102
103        Args:
104            user_id (int): Идентификатор пользователя.
105
106        Returns:
107            dict: Словарь с данными о пользователе и пире.
108        """
109        self.user_config = dict(
110            user_id=user_id,
111            user_private_key=self.private_key,
112            address=f"{self.countpeers}/32",
113            server_public_key=self.server_public_key,
114        )
115        return self.user_config

Создает модель базы данных для нового пира.

Arguments:
  • user_id (int): Идентификатор пользователя.
Returns:

dict: Словарь с данными о пользователе и пире.

async def check_connection(self):
117    async def check_connection(self):
118        """Проверяет и устанавливает соединение с сервером.
119
120        Если соединение закрыто, то повторно подключается к серверу.
121
122        Returns:
123            SSHClientConnection: Установленное SSH-соединение.
124
125        Raises:
126            WireguardError: Если не удалось установить соединение.
127        """
128        try:
129            conn = SSH.connection
130        except AttributeError:
131            await SSH.connect()
132            conn = SSH.connection
133
134        if conn.is_closed():
135            await SSH.connect()
136            conn = SSH.connection
137            try:
138                assert conn is SSH.connection
139                assert not conn.is_closed()
140            except AssertionError as e:
141                raise WireguardError from e
142        return conn

Проверяет и устанавливает соединение с сервером.

Если соединение закрыто, то повторно подключается к серверу.

Returns:

SSHClientConnection: Установленное SSH-соединение.

Raises:
  • WireguardError: Если не удалось установить соединение.
@async_speed_metric
@validate_call
def move_user( self, move: Literal['add', 'ban', 'unban', 'del'], user_id: int = None, server_pubkey: str = None):
144    @async_speed_metric
145    @validate_call
146    async def move_user(
147        self,
148        move: Literal["add", "ban", "unban", "del"],
149        user_id: int = None,
150        server_pubkey: str = None,
151    ):
152        """Добавляет, блокирует или разблокирует пира.
153
154        Args:
155            move (Literal["add", "ban", "unban", "del"]): Действие для выполнения.
156            user_id (int, optional): Идентификатор пользователя.
157            server_pubkey (str, optional): Публичный ключ пользователя на сервере.
158
159        Returns:
160            dict: Конфигурация пользователя, если действие - добавление.
161
162        Raises:
163            WireguardError: Если возникла ошибка при выполнении действия.
164        """
165        conn = await self.check_connection()
166
167        match move:
168            case "add":
169                await self.create_peer(conn)
170                usr_cfg = self.create_db_wg_model(user_id)
171                logger.info(f"{usr_cfg['address']=}")
172                return usr_cfg
173            case "ban" | "unban" | "del":
174                self.server_public_key = server_pubkey
175                await self.change_peer(conn, cmd=move)

Добавляет, блокирует или разблокирует пира.

Arguments:
  • move (Literal["add", "ban", "unban", "del"]): Действие для выполнения.
  • user_id (int, optional): Идентификатор пользователя.
  • server_pubkey (str, optional): Публичный ключ пользователя на сервере.
Returns:

dict: Конфигурация пользователя, если действие - добавление.

Raises:
  • WireguardError: Если возникла ошибка при выполнении действия.
async def get_peer_list(self):
177    async def get_peer_list(self):
178        """Получает список пиров на сервере WireGuard.
179
180        Returns:
181            list: Список пиров с их конфигурациями.
182
183        Raises:
184            WireguardError: Если возникла ошибка при получении списка пиров.
185        """
186        conn = await self.check_connection()
187        try:
188            cmd = f"echo {escape(settings.WG_PASS.get_secret_value())} | sudo -S ~/Scripts/pywg.py -l --raises"
189            completed_proc = await conn.run(f"\n{cmd}", check=True)
190            peers = completed_proc.stdout.strip("\n").split("[Peer]")
191
192            clean_peers = []
193            for peer in peers:
194                clean_peer = {
195                    line[: line.find("=")].strip(" ").lower(): line[
196                        line.find("=") + 1 :
197                    ].strip(" ")
198                    for line in peer.splitlines()
199                    if "=" in line
200                }
201
202                if clean_peer:
203                    if "ban" in peer.lower():
204                        clean_peer["ban"] = True
205                    else:
206                        clean_peer["ban"] = False
207
208                    clean_peers.append(clean_peer)
209
210        except (OSError, asyncssh.Error) as e:
211            logger.exception("Сбой при получении списка пиров wireguard")
212            raise WireguardError from e
213        else:
214            logger.info(f"Got {len(clean_peers)} peer's")
215
216            return clean_peers

Получает список пиров на сервере WireGuard.

Returns:

list: Список пиров с их конфигурациями.

Raises:
  • WireguardError: Если возникла ошибка при получении списка пиров.
async def get_server_status(self):
218    async def get_server_status(self):
219        """Получает статус сервера WireGuard.
220
221        Returns:
222            str: Статус сервера ("active" или "inactive").
223
224        Raises:
225            WireguardError: Если возникла ошибка при получении статуса сервера.
226        """
227        conn = await self.check_connection()
228        try:
229            cmd = f"echo {escape(settings.WG_PASS.get_secret_value())} | sudo -S systemctl status wg-quick@wg1.service | grep Active:"
230            completed_proc = await conn.run(f"\n{cmd}", check=True)
231            _, status, *_ = completed_proc.stdout.strip("\n ").split()
232
233        except (OSError, asyncssh.Error):
234            logger.exception("Сбой при получении статуса сервера wireguard")
235            logger.info("Server status: inactive")
236            return "inactive"
237        else:
238            logger.info(f"Server status: {status}")
239            return status

Получает статус сервера WireGuard.

Returns:

str: Статус сервера ("active" или "inactive").

Raises:
  • WireguardError: Если возникла ошибка при получении статуса сервера.
async def get_server_cpu_usage(self):
241    async def get_server_cpu_usage(self):
242        """Получает загрузку CPU сервера WireGuard.
243
244        Returns:
245            str: Процент загрузки CPU.
246
247        Raises:
248            WireguardError: Если возникла ошибка при получении загрузки CPU.
249        """
250        conn = await self.check_connection()
251        try:
252            cmd = "top -bn2 | grep '%Cpu' | tail -1 | grep -P '(....|...) id,'|awk '{print 100-$8 \"%\"}'"
253            completed_proc = await conn.run(f"\n{cmd}", check=True)
254            usage = completed_proc.stdout.strip("\n ")
255
256        except (OSError, asyncssh.Error) as e:
257            logger.exception("Сбой при получении загруженности сервера wireguard")
258            logger.info("Server status: inactive")
259            raise WireguardError from e
260        else:
261            logger.info(f"Server cpu usage: {usage}")
262            return usage

Получает загрузку CPU сервера WireGuard.

Returns:

str: Процент загрузки CPU.

Raises:
  • WireguardError: Если возникла ошибка при получении загрузки CPU.
async def test_100():
265async def test_100():
266    """Тестирует производительность методов WgServerTools.
267
268    Выполняет несколько асинхронных вызовов для проверки производительности
269    методов получения загрузки CPU сервера.
270
271    Raises:
272        Exception: Если возникает ошибка при выполнении теста.
273    """
274    wg = WgServerTools()
275    start = time()
276
277    await SSH.connect()
278    coros = [wg.get_server_cpu_usage() for _ in range(1)]
279    coros_gen = time() - start
280
281    await asyncio.gather(*coros)
282
283    end = time() - start - coros_gen
284
285    print(f"{coros_gen=}  {end=}")

Тестирует производительность методов WgServerTools.

Выполняет несколько асинхронных вызовов для проверки производительности методов получения загрузки CPU сервера.

Raises:
  • Exception: Если возникает ошибка при выполнении теста.