Edit on GitHub

src.wg.pywg

  1#!/usr/bin/python3
  2
  3import argparse
  4import functools
  5import logging
  6import pathlib
  7import platform
  8from ipaddress import IPv4Interface
  9from subprocess import Popen
 10from time import time
 11
 12if platform.system() == "Linux":
 13    import fcntl
 14
 15logger = logging.getLogger()
 16
 17
 18WIREGUARD_CONF = "/etc/wireguard/wg1.conf"
 19# WIREGUARD_CONF = "C:\\code\\vpn_dan_bot\\src\\wg\\wg1.conf"
 20WIREGUARD_LOG = "/home/zadira/Scripts/ErrorsPyWG"
 21
 22
 23class ConfigChanger:
 24    def __init__(self, public_key, allowed_ips=None, raises=False) -> None:
 25        self.public_key = public_key
 26        self.allowed_ips = allowed_ips
 27        self.raises = raises
 28
 29    def __find_key(self):
 30        lines = []
 31        for i, line in enumerate(self.config):
 32            if self.public_key in line:
 33                lines.append(i)
 34
 35        if not len(lines):
 36            raise Exception("PEER NOT FOUND")
 37
 38        validated_lines = []
 39        for i in lines:
 40            assert "peer" in self.config[i - 1].lower()
 41            validated_lines.extend((i - 1, i))
 42
 43            extra = 1
 44            while extra:
 45                try:
 46                    if (
 47                        "peer" in self.config[i + extra].lower()
 48                        or not self.config[i + extra]
 49                    ):
 50                        break
 51                except IndexError:
 52                    break
 53                else:
 54                    validated_lines.append(i + extra)
 55                    extra += 1
 56
 57        for i in validated_lines:
 58            yield i
 59
 60    def process(mode):
 61        def decorator(func):
 62            @functools.wraps(func)
 63            def conf_changer(self, *args, **kwargs):
 64                try:
 65                    start = time()
 66
 67                    with open(WIREGUARD_CONF, "r+") as file:
 68                        fcntl.flock(file, fcntl.LOCK_EX)
 69                        self.config = file.read().split("\n")
 70                        func(self, *args, **kwargs)
 71
 72                        file.seek(0)
 73                        file.truncate(0)
 74                        file.write("\n".join(self.config))
 75                        fcntl.flock(file, fcntl.LOCK_UN)
 76
 77                    proc = time() - start
 78
 79                    reload_cmd = " && ".join(
 80                        (
 81                            f'echo -n "$(date) :: {func.__name__} :: {self.public_key[:4]}... ::" >> {WIREGUARD_LOG}',
 82                            f"sudo systemctl reload wg-quick@wg1.service 2>> {WIREGUARD_LOG}",
 83                            f'echo " " >> {WIREGUARD_LOG}',
 84                        )
 85                    )
 86
 87                    error_code = Popen(
 88                        f"flock {WIREGUARD_LOG} --command '{reload_cmd}'", shell=True
 89                    ).returncode
 90                    if not error_code:
 91                        logger.info(f'Peer "{self.public_key[:4]}..." {mode}ed')
 92                    else:
 93                        raise Exception(
 94                            f"execute wireguard reload error. Exit code {error_code}"
 95                        )
 96
 97                except Exception as e:
 98                    logger.error(e.args[0])
 99                    if self.raises:
100                        raise e
101                else:
102                    logger.debug(
103                        f"METRIC::{func.__name__}::proc::[  {int(proc)*1000}  ]msec"
104                    )
105                finally:
106                    logger.debug(
107                        f"METRIC::{func.__name__}::end::[  {int((time()-start)*1000)}  ]msec"
108                    )
109
110            return conf_changer
111
112        return decorator
113
114    @process("add")
115    def register(self):
116        new_user = (
117            "[Peer]",
118            f"PublicKey = {self.public_key}",
119            f"AllowedIPs = {self.allowed_ips}",
120            "PersistentKeepalive = 25",
121        )
122        config = "\n".join(self.config)
123        if self.public_key in config:
124            logger.warning("Peer already added")
125            raise Exception("NEW USER ERROR")
126
127        self.config.extend(new_user)
128
129    @process("delet")
130    def delete(self):
131        lines = self.__find_key()
132        for n in sorted(lines, reverse=True):
133            self.config.pop(n)
134
135    @process("bann")
136    def ban(self):
137        lines = self.__find_key()
138        for n in lines:
139            if self.config[n].startswith("#"):
140                logger.warning("Peer already banned")
141                raise Exception("BAN USER ERROR")
142            else:
143                self.config[n] = f"# {self.config[n]}"
144
145    @process("unbann")
146    def unban(self):
147        lines = self.__find_key()
148        for n in lines:
149            if not self.config[n].startswith("#"):
150                logger.warning("Peer already unbanned")
151                raise Exception("UNBAN USER ERROR")
152            else:
153                self.config[n] = self.config[n].strip("# ")
154
155
156def parse():
157    parser = argparse.ArgumentParser(
158        prog="Python Wireguard Utils",
159        description="Python tools for the wireguard server",
160        epilog="Text at the bottom of help",
161    )
162
163    parser.add_argument(
164        "pubkey", type=str, help="Public key of new(old) user", nargs="?"
165    )
166    parser.add_argument(
167        "-ips", "--allowed_ips", type=IPv4Interface, help="AllowedIPs for new peer"
168    )
169    parser.add_argument(
170        "-m",
171        "--mode",
172        choices=["new", "ban", "unban", "del"],
173        default="new",
174        help="Tool for change wireguard config (default 'new')",
175    )
176    parser.add_argument(
177        "--wgpath",
178        type=pathlib.Path,
179        default=WIREGUARD_CONF,
180        help=f"Path wireguard config file (default '{WIREGUARD_CONF}')",
181    )
182    parser.add_argument(
183        "--raises",
184        action="store_true",
185        help="Interruption in case of an error",
186    )
187    parser.add_argument(
188        "-l",
189        "--list",
190        action="store_true",
191        help="Print list of peers (banned too)",
192    )
193    args = parser.parse_args()
194
195    return args
196
197
198def main():
199    args = parse()
200
201    if args.list:
202        with open(WIREGUARD_CONF) as file:
203            peers = file.read().split("[Peer]")[1:]
204            for peer in peers:
205                print("\x1b[33m[Peer]\x1b[0m", end=" ")
206
207                if "\n#" in peer.strip("#\n "):
208                    print("\x1b[31;1m(BAN)\x1b[0m")
209                else:
210                    print("")
211
212                print(peer.strip(" #\n").replace("# ", ""))
213
214        return
215
216    cc = ConfigChanger(
217        public_key=args.pubkey, allowed_ips=args.allowed_ips, raises=args.raises
218    )
219    if args.mode == "new":
220        if not args.pubkey:
221            logger.error("Argument PUBKEY: empty value")
222        elif not args.allowed_ips:
223            logger.error("Argument ALLOWED_IPS: empty value")
224        else:
225            cc.register()
226    elif args.mode == "ban":
227        cc.ban()
228    elif args.mode == "unban":
229        cc.unban()
230    elif args.mode == "del":
231        cc.delete()
232
233
234if __name__ == "__main__":
235    logging.basicConfig(
236        level=logging.DEBUG,
237        format="[%(levelname)-8s] %(funcName)s(%(lineno)d): %(message)s",
238    )
239    logger.info("Start")
240    main()
241    logger.info("End")
logger = <RootLogger root (DEBUG)>
WIREGUARD_CONF = '/etc/wireguard/wg1.conf'
WIREGUARD_LOG = '/home/zadira/Scripts/ErrorsPyWG'
class ConfigChanger:
 24class ConfigChanger:
 25    def __init__(self, public_key, allowed_ips=None, raises=False) -> None:
 26        self.public_key = public_key
 27        self.allowed_ips = allowed_ips
 28        self.raises = raises
 29
 30    def __find_key(self):
 31        lines = []
 32        for i, line in enumerate(self.config):
 33            if self.public_key in line:
 34                lines.append(i)
 35
 36        if not len(lines):
 37            raise Exception("PEER NOT FOUND")
 38
 39        validated_lines = []
 40        for i in lines:
 41            assert "peer" in self.config[i - 1].lower()
 42            validated_lines.extend((i - 1, i))
 43
 44            extra = 1
 45            while extra:
 46                try:
 47                    if (
 48                        "peer" in self.config[i + extra].lower()
 49                        or not self.config[i + extra]
 50                    ):
 51                        break
 52                except IndexError:
 53                    break
 54                else:
 55                    validated_lines.append(i + extra)
 56                    extra += 1
 57
 58        for i in validated_lines:
 59            yield i
 60
 61    def process(mode):
 62        def decorator(func):
 63            @functools.wraps(func)
 64            def conf_changer(self, *args, **kwargs):
 65                try:
 66                    start = time()
 67
 68                    with open(WIREGUARD_CONF, "r+") as file:
 69                        fcntl.flock(file, fcntl.LOCK_EX)
 70                        self.config = file.read().split("\n")
 71                        func(self, *args, **kwargs)
 72
 73                        file.seek(0)
 74                        file.truncate(0)
 75                        file.write("\n".join(self.config))
 76                        fcntl.flock(file, fcntl.LOCK_UN)
 77
 78                    proc = time() - start
 79
 80                    reload_cmd = " && ".join(
 81                        (
 82                            f'echo -n "$(date) :: {func.__name__} :: {self.public_key[:4]}... ::" >> {WIREGUARD_LOG}',
 83                            f"sudo systemctl reload wg-quick@wg1.service 2>> {WIREGUARD_LOG}",
 84                            f'echo " " >> {WIREGUARD_LOG}',
 85                        )
 86                    )
 87
 88                    error_code = Popen(
 89                        f"flock {WIREGUARD_LOG} --command '{reload_cmd}'", shell=True
 90                    ).returncode
 91                    if not error_code:
 92                        logger.info(f'Peer "{self.public_key[:4]}..." {mode}ed')
 93                    else:
 94                        raise Exception(
 95                            f"execute wireguard reload error. Exit code {error_code}"
 96                        )
 97
 98                except Exception as e:
 99                    logger.error(e.args[0])
100                    if self.raises:
101                        raise e
102                else:
103                    logger.debug(
104                        f"METRIC::{func.__name__}::proc::[  {int(proc)*1000}  ]msec"
105                    )
106                finally:
107                    logger.debug(
108                        f"METRIC::{func.__name__}::end::[  {int((time()-start)*1000)}  ]msec"
109                    )
110
111            return conf_changer
112
113        return decorator
114
115    @process("add")
116    def register(self):
117        new_user = (
118            "[Peer]",
119            f"PublicKey = {self.public_key}",
120            f"AllowedIPs = {self.allowed_ips}",
121            "PersistentKeepalive = 25",
122        )
123        config = "\n".join(self.config)
124        if self.public_key in config:
125            logger.warning("Peer already added")
126            raise Exception("NEW USER ERROR")
127
128        self.config.extend(new_user)
129
130    @process("delet")
131    def delete(self):
132        lines = self.__find_key()
133        for n in sorted(lines, reverse=True):
134            self.config.pop(n)
135
136    @process("bann")
137    def ban(self):
138        lines = self.__find_key()
139        for n in lines:
140            if self.config[n].startswith("#"):
141                logger.warning("Peer already banned")
142                raise Exception("BAN USER ERROR")
143            else:
144                self.config[n] = f"# {self.config[n]}"
145
146    @process("unbann")
147    def unban(self):
148        lines = self.__find_key()
149        for n in lines:
150            if not self.config[n].startswith("#"):
151                logger.warning("Peer already unbanned")
152                raise Exception("UNBAN USER ERROR")
153            else:
154                self.config[n] = self.config[n].strip("# ")
ConfigChanger(public_key, allowed_ips=None, raises=False)
25    def __init__(self, public_key, allowed_ips=None, raises=False) -> None:
26        self.public_key = public_key
27        self.allowed_ips = allowed_ips
28        self.raises = raises
public_key
allowed_ips
raises
def process(mode):
 61    def process(mode):
 62        def decorator(func):
 63            @functools.wraps(func)
 64            def conf_changer(self, *args, **kwargs):
 65                try:
 66                    start = time()
 67
 68                    with open(WIREGUARD_CONF, "r+") as file:
 69                        fcntl.flock(file, fcntl.LOCK_EX)
 70                        self.config = file.read().split("\n")
 71                        func(self, *args, **kwargs)
 72
 73                        file.seek(0)
 74                        file.truncate(0)
 75                        file.write("\n".join(self.config))
 76                        fcntl.flock(file, fcntl.LOCK_UN)
 77
 78                    proc = time() - start
 79
 80                    reload_cmd = " && ".join(
 81                        (
 82                            f'echo -n "$(date) :: {func.__name__} :: {self.public_key[:4]}... ::" >> {WIREGUARD_LOG}',
 83                            f"sudo systemctl reload wg-quick@wg1.service 2>> {WIREGUARD_LOG}",
 84                            f'echo " " >> {WIREGUARD_LOG}',
 85                        )
 86                    )
 87
 88                    error_code = Popen(
 89                        f"flock {WIREGUARD_LOG} --command '{reload_cmd}'", shell=True
 90                    ).returncode
 91                    if not error_code:
 92                        logger.info(f'Peer "{self.public_key[:4]}..." {mode}ed')
 93                    else:
 94                        raise Exception(
 95                            f"execute wireguard reload error. Exit code {error_code}"
 96                        )
 97
 98                except Exception as e:
 99                    logger.error(e.args[0])
100                    if self.raises:
101                        raise e
102                else:
103                    logger.debug(
104                        f"METRIC::{func.__name__}::proc::[  {int(proc)*1000}  ]msec"
105                    )
106                finally:
107                    logger.debug(
108                        f"METRIC::{func.__name__}::end::[  {int((time()-start)*1000)}  ]msec"
109                    )
110
111            return conf_changer
112
113        return decorator
@process('add')
def register(self):
115    @process("add")
116    def register(self):
117        new_user = (
118            "[Peer]",
119            f"PublicKey = {self.public_key}",
120            f"AllowedIPs = {self.allowed_ips}",
121            "PersistentKeepalive = 25",
122        )
123        config = "\n".join(self.config)
124        if self.public_key in config:
125            logger.warning("Peer already added")
126            raise Exception("NEW USER ERROR")
127
128        self.config.extend(new_user)
@process('delet')
def delete(self):
130    @process("delet")
131    def delete(self):
132        lines = self.__find_key()
133        for n in sorted(lines, reverse=True):
134            self.config.pop(n)
@process('bann')
def ban(self):
136    @process("bann")
137    def ban(self):
138        lines = self.__find_key()
139        for n in lines:
140            if self.config[n].startswith("#"):
141                logger.warning("Peer already banned")
142                raise Exception("BAN USER ERROR")
143            else:
144                self.config[n] = f"# {self.config[n]}"
@process('unbann')
def unban(self):
146    @process("unbann")
147    def unban(self):
148        lines = self.__find_key()
149        for n in lines:
150            if not self.config[n].startswith("#"):
151                logger.warning("Peer already unbanned")
152                raise Exception("UNBAN USER ERROR")
153            else:
154                self.config[n] = self.config[n].strip("# ")
def parse():
157def parse():
158    parser = argparse.ArgumentParser(
159        prog="Python Wireguard Utils",
160        description="Python tools for the wireguard server",
161        epilog="Text at the bottom of help",
162    )
163
164    parser.add_argument(
165        "pubkey", type=str, help="Public key of new(old) user", nargs="?"
166    )
167    parser.add_argument(
168        "-ips", "--allowed_ips", type=IPv4Interface, help="AllowedIPs for new peer"
169    )
170    parser.add_argument(
171        "-m",
172        "--mode",
173        choices=["new", "ban", "unban", "del"],
174        default="new",
175        help="Tool for change wireguard config (default 'new')",
176    )
177    parser.add_argument(
178        "--wgpath",
179        type=pathlib.Path,
180        default=WIREGUARD_CONF,
181        help=f"Path wireguard config file (default '{WIREGUARD_CONF}')",
182    )
183    parser.add_argument(
184        "--raises",
185        action="store_true",
186        help="Interruption in case of an error",
187    )
188    parser.add_argument(
189        "-l",
190        "--list",
191        action="store_true",
192        help="Print list of peers (banned too)",
193    )
194    args = parser.parse_args()
195
196    return args
def main():
199def main():
200    args = parse()
201
202    if args.list:
203        with open(WIREGUARD_CONF) as file:
204            peers = file.read().split("[Peer]")[1:]
205            for peer in peers:
206                print("\x1b[33m[Peer]\x1b[0m", end=" ")
207
208                if "\n#" in peer.strip("#\n "):
209                    print("\x1b[31;1m(BAN)\x1b[0m")
210                else:
211                    print("")
212
213                print(peer.strip(" #\n").replace("# ", ""))
214
215        return
216
217    cc = ConfigChanger(
218        public_key=args.pubkey, allowed_ips=args.allowed_ips, raises=args.raises
219    )
220    if args.mode == "new":
221        if not args.pubkey:
222            logger.error("Argument PUBKEY: empty value")
223        elif not args.allowed_ips:
224            logger.error("Argument ALLOWED_IPS: empty value")
225        else:
226            cc.register()
227    elif args.mode == "ban":
228        cc.ban()
229    elif args.mode == "unban":
230        cc.unban()
231    elif args.mode == "del":
232        cc.delete()