src.wg.pywg
1#!/usr/bin/python3 2 3import argparse 4import functools 5import logging 6import pathlib 7import platform 8from ipaddress import IPv4Interface 9from subprocess import Popen 10from time import time 11 12if platform.system() == "Linux": 13 import fcntl 14 15logger = logging.getLogger() 16 17 18WIREGUARD_CONF = "/etc/wireguard/wg1.conf" 19# WIREGUARD_CONF = "C:\\code\\vpn_dan_bot\\src\\wg\\wg1.conf" 20WIREGUARD_LOG = "/home/zadira/Scripts/ErrorsPyWG" 21 22 23class ConfigChanger: 24 def __init__(self, public_key, allowed_ips=None, raises=False) -> None: 25 self.public_key = public_key 26 self.allowed_ips = allowed_ips 27 self.raises = raises 28 29 def __find_key(self): 30 lines = [] 31 for i, line in enumerate(self.config): 32 if self.public_key in line: 33 lines.append(i) 34 35 if not len(lines): 36 raise Exception("PEER NOT FOUND") 37 38 validated_lines = [] 39 for i in lines: 40 assert "peer" in self.config[i - 1].lower() 41 validated_lines.extend((i - 1, i)) 42 43 extra = 1 44 while extra: 45 try: 46 if ( 47 "peer" in self.config[i + extra].lower() 48 or not self.config[i + extra] 49 ): 50 break 51 except IndexError: 52 break 53 else: 54 validated_lines.append(i + extra) 55 extra += 1 56 57 for i in validated_lines: 58 yield i 59 60 def process(mode): 61 def decorator(func): 62 @functools.wraps(func) 63 def conf_changer(self, *args, **kwargs): 64 try: 65 start = time() 66 67 with open(WIREGUARD_CONF, "r+") as file: 68 fcntl.flock(file, fcntl.LOCK_EX) 69 self.config = file.read().split("\n") 70 func(self, *args, **kwargs) 71 72 file.seek(0) 73 file.truncate(0) 74 file.write("\n".join(self.config)) 75 fcntl.flock(file, fcntl.LOCK_UN) 76 77 proc = time() - start 78 79 reload_cmd = " && ".join( 80 ( 81 f'echo -n "$(date) :: {func.__name__} :: {self.public_key[:4]}... ::" >> {WIREGUARD_LOG}', 82 f"sudo systemctl reload wg-quick@wg1.service 2>> {WIREGUARD_LOG}", 83 f'echo " " >> {WIREGUARD_LOG}', 84 ) 85 ) 86 87 error_code = Popen( 88 f"flock {WIREGUARD_LOG} --command '{reload_cmd}'", shell=True 89 ).returncode 90 if not error_code: 91 logger.info(f'Peer "{self.public_key[:4]}..." {mode}ed') 92 else: 93 raise Exception( 94 f"execute wireguard reload error. Exit code {error_code}" 95 ) 96 97 except Exception as e: 98 logger.error(e.args[0]) 99 if self.raises: 100 raise e 101 else: 102 logger.debug( 103 f"METRIC::{func.__name__}::proc::[ {int(proc)*1000} ]msec" 104 ) 105 finally: 106 logger.debug( 107 f"METRIC::{func.__name__}::end::[ {int((time()-start)*1000)} ]msec" 108 ) 109 110 return conf_changer 111 112 return decorator 113 114 @process("add") 115 def register(self): 116 new_user = ( 117 "[Peer]", 118 f"PublicKey = {self.public_key}", 119 f"AllowedIPs = {self.allowed_ips}", 120 "PersistentKeepalive = 25", 121 ) 122 config = "\n".join(self.config) 123 if self.public_key in config: 124 logger.warning("Peer already added") 125 raise Exception("NEW USER ERROR") 126 127 self.config.extend(new_user) 128 129 @process("delet") 130 def delete(self): 131 lines = self.__find_key() 132 for n in sorted(lines, reverse=True): 133 self.config.pop(n) 134 135 @process("bann") 136 def ban(self): 137 lines = self.__find_key() 138 for n in lines: 139 if self.config[n].startswith("#"): 140 logger.warning("Peer already banned") 141 raise Exception("BAN USER ERROR") 142 else: 143 self.config[n] = f"# {self.config[n]}" 144 145 @process("unbann") 146 def unban(self): 147 lines = self.__find_key() 148 for n in lines: 149 if not self.config[n].startswith("#"): 150 logger.warning("Peer already unbanned") 151 raise Exception("UNBAN USER ERROR") 152 else: 153 self.config[n] = self.config[n].strip("# ") 154 155 156def parse(): 157 parser = argparse.ArgumentParser( 158 prog="Python Wireguard Utils", 159 description="Python tools for the wireguard server", 160 epilog="Text at the bottom of help", 161 ) 162 163 parser.add_argument( 164 "pubkey", type=str, help="Public key of new(old) user", nargs="?" 165 ) 166 parser.add_argument( 167 "-ips", "--allowed_ips", type=IPv4Interface, help="AllowedIPs for new peer" 168 ) 169 parser.add_argument( 170 "-m", 171 "--mode", 172 choices=["new", "ban", "unban", "del"], 173 default="new", 174 help="Tool for change wireguard config (default 'new')", 175 ) 176 parser.add_argument( 177 "--wgpath", 178 type=pathlib.Path, 179 default=WIREGUARD_CONF, 180 help=f"Path wireguard config file (default '{WIREGUARD_CONF}')", 181 ) 182 parser.add_argument( 183 "--raises", 184 action="store_true", 185 help="Interruption in case of an error", 186 ) 187 parser.add_argument( 188 "-l", 189 "--list", 190 action="store_true", 191 help="Print list of peers (banned too)", 192 ) 193 args = parser.parse_args() 194 195 return args 196 197 198def main(): 199 args = parse() 200 201 if args.list: 202 with open(WIREGUARD_CONF) as file: 203 peers = file.read().split("[Peer]")[1:] 204 for peer in peers: 205 print("\x1b[33m[Peer]\x1b[0m", end=" ") 206 207 if "\n#" in peer.strip("#\n "): 208 print("\x1b[31;1m(BAN)\x1b[0m") 209 else: 210 print("") 211 212 print(peer.strip(" #\n").replace("# ", "")) 213 214 return 215 216 cc = ConfigChanger( 217 public_key=args.pubkey, allowed_ips=args.allowed_ips, raises=args.raises 218 ) 219 if args.mode == "new": 220 if not args.pubkey: 221 logger.error("Argument PUBKEY: empty value") 222 elif not args.allowed_ips: 223 logger.error("Argument ALLOWED_IPS: empty value") 224 else: 225 cc.register() 226 elif args.mode == "ban": 227 cc.ban() 228 elif args.mode == "unban": 229 cc.unban() 230 elif args.mode == "del": 231 cc.delete() 232 233 234if __name__ == "__main__": 235 logging.basicConfig( 236 level=logging.DEBUG, 237 format="[%(levelname)-8s] %(funcName)s(%(lineno)d): %(message)s", 238 ) 239 logger.info("Start") 240 main() 241 logger.info("End")
logger =
<RootLogger root (DEBUG)>
WIREGUARD_CONF =
'/etc/wireguard/wg1.conf'
WIREGUARD_LOG =
'/home/zadira/Scripts/ErrorsPyWG'
class
ConfigChanger:
24class ConfigChanger: 25 def __init__(self, public_key, allowed_ips=None, raises=False) -> None: 26 self.public_key = public_key 27 self.allowed_ips = allowed_ips 28 self.raises = raises 29 30 def __find_key(self): 31 lines = [] 32 for i, line in enumerate(self.config): 33 if self.public_key in line: 34 lines.append(i) 35 36 if not len(lines): 37 raise Exception("PEER NOT FOUND") 38 39 validated_lines = [] 40 for i in lines: 41 assert "peer" in self.config[i - 1].lower() 42 validated_lines.extend((i - 1, i)) 43 44 extra = 1 45 while extra: 46 try: 47 if ( 48 "peer" in self.config[i + extra].lower() 49 or not self.config[i + extra] 50 ): 51 break 52 except IndexError: 53 break 54 else: 55 validated_lines.append(i + extra) 56 extra += 1 57 58 for i in validated_lines: 59 yield i 60 61 def process(mode): 62 def decorator(func): 63 @functools.wraps(func) 64 def conf_changer(self, *args, **kwargs): 65 try: 66 start = time() 67 68 with open(WIREGUARD_CONF, "r+") as file: 69 fcntl.flock(file, fcntl.LOCK_EX) 70 self.config = file.read().split("\n") 71 func(self, *args, **kwargs) 72 73 file.seek(0) 74 file.truncate(0) 75 file.write("\n".join(self.config)) 76 fcntl.flock(file, fcntl.LOCK_UN) 77 78 proc = time() - start 79 80 reload_cmd = " && ".join( 81 ( 82 f'echo -n "$(date) :: {func.__name__} :: {self.public_key[:4]}... ::" >> {WIREGUARD_LOG}', 83 f"sudo systemctl reload wg-quick@wg1.service 2>> {WIREGUARD_LOG}", 84 f'echo " " >> {WIREGUARD_LOG}', 85 ) 86 ) 87 88 error_code = Popen( 89 f"flock {WIREGUARD_LOG} --command '{reload_cmd}'", shell=True 90 ).returncode 91 if not error_code: 92 logger.info(f'Peer "{self.public_key[:4]}..." {mode}ed') 93 else: 94 raise Exception( 95 f"execute wireguard reload error. Exit code {error_code}" 96 ) 97 98 except Exception as e: 99 logger.error(e.args[0]) 100 if self.raises: 101 raise e 102 else: 103 logger.debug( 104 f"METRIC::{func.__name__}::proc::[ {int(proc)*1000} ]msec" 105 ) 106 finally: 107 logger.debug( 108 f"METRIC::{func.__name__}::end::[ {int((time()-start)*1000)} ]msec" 109 ) 110 111 return conf_changer 112 113 return decorator 114 115 @process("add") 116 def register(self): 117 new_user = ( 118 "[Peer]", 119 f"PublicKey = {self.public_key}", 120 f"AllowedIPs = {self.allowed_ips}", 121 "PersistentKeepalive = 25", 122 ) 123 config = "\n".join(self.config) 124 if self.public_key in config: 125 logger.warning("Peer already added") 126 raise Exception("NEW USER ERROR") 127 128 self.config.extend(new_user) 129 130 @process("delet") 131 def delete(self): 132 lines = self.__find_key() 133 for n in sorted(lines, reverse=True): 134 self.config.pop(n) 135 136 @process("bann") 137 def ban(self): 138 lines = self.__find_key() 139 for n in lines: 140 if self.config[n].startswith("#"): 141 logger.warning("Peer already banned") 142 raise Exception("BAN USER ERROR") 143 else: 144 self.config[n] = f"# {self.config[n]}" 145 146 @process("unbann") 147 def unban(self): 148 lines = self.__find_key() 149 for n in lines: 150 if not self.config[n].startswith("#"): 151 logger.warning("Peer already unbanned") 152 raise Exception("UNBAN USER ERROR") 153 else: 154 self.config[n] = self.config[n].strip("# ")
def
process(mode):
61 def process(mode): 62 def decorator(func): 63 @functools.wraps(func) 64 def conf_changer(self, *args, **kwargs): 65 try: 66 start = time() 67 68 with open(WIREGUARD_CONF, "r+") as file: 69 fcntl.flock(file, fcntl.LOCK_EX) 70 self.config = file.read().split("\n") 71 func(self, *args, **kwargs) 72 73 file.seek(0) 74 file.truncate(0) 75 file.write("\n".join(self.config)) 76 fcntl.flock(file, fcntl.LOCK_UN) 77 78 proc = time() - start 79 80 reload_cmd = " && ".join( 81 ( 82 f'echo -n "$(date) :: {func.__name__} :: {self.public_key[:4]}... ::" >> {WIREGUARD_LOG}', 83 f"sudo systemctl reload wg-quick@wg1.service 2>> {WIREGUARD_LOG}", 84 f'echo " " >> {WIREGUARD_LOG}', 85 ) 86 ) 87 88 error_code = Popen( 89 f"flock {WIREGUARD_LOG} --command '{reload_cmd}'", shell=True 90 ).returncode 91 if not error_code: 92 logger.info(f'Peer "{self.public_key[:4]}..." {mode}ed') 93 else: 94 raise Exception( 95 f"execute wireguard reload error. Exit code {error_code}" 96 ) 97 98 except Exception as e: 99 logger.error(e.args[0]) 100 if self.raises: 101 raise e 102 else: 103 logger.debug( 104 f"METRIC::{func.__name__}::proc::[ {int(proc)*1000} ]msec" 105 ) 106 finally: 107 logger.debug( 108 f"METRIC::{func.__name__}::end::[ {int((time()-start)*1000)} ]msec" 109 ) 110 111 return conf_changer 112 113 return decorator
@process('add')
def
register(self):
115 @process("add") 116 def register(self): 117 new_user = ( 118 "[Peer]", 119 f"PublicKey = {self.public_key}", 120 f"AllowedIPs = {self.allowed_ips}", 121 "PersistentKeepalive = 25", 122 ) 123 config = "\n".join(self.config) 124 if self.public_key in config: 125 logger.warning("Peer already added") 126 raise Exception("NEW USER ERROR") 127 128 self.config.extend(new_user)
def
parse():
157def parse(): 158 parser = argparse.ArgumentParser( 159 prog="Python Wireguard Utils", 160 description="Python tools for the wireguard server", 161 epilog="Text at the bottom of help", 162 ) 163 164 parser.add_argument( 165 "pubkey", type=str, help="Public key of new(old) user", nargs="?" 166 ) 167 parser.add_argument( 168 "-ips", "--allowed_ips", type=IPv4Interface, help="AllowedIPs for new peer" 169 ) 170 parser.add_argument( 171 "-m", 172 "--mode", 173 choices=["new", "ban", "unban", "del"], 174 default="new", 175 help="Tool for change wireguard config (default 'new')", 176 ) 177 parser.add_argument( 178 "--wgpath", 179 type=pathlib.Path, 180 default=WIREGUARD_CONF, 181 help=f"Path wireguard config file (default '{WIREGUARD_CONF}')", 182 ) 183 parser.add_argument( 184 "--raises", 185 action="store_true", 186 help="Interruption in case of an error", 187 ) 188 parser.add_argument( 189 "-l", 190 "--list", 191 action="store_true", 192 help="Print list of peers (banned too)", 193 ) 194 args = parser.parse_args() 195 196 return args
def
main():
199def main(): 200 args = parse() 201 202 if args.list: 203 with open(WIREGUARD_CONF) as file: 204 peers = file.read().split("[Peer]")[1:] 205 for peer in peers: 206 print("\x1b[33m[Peer]\x1b[0m", end=" ") 207 208 if "\n#" in peer.strip("#\n "): 209 print("\x1b[31;1m(BAN)\x1b[0m") 210 else: 211 print("") 212 213 print(peer.strip(" #\n").replace("# ", "")) 214 215 return 216 217 cc = ConfigChanger( 218 public_key=args.pubkey, allowed_ips=args.allowed_ips, raises=args.raises 219 ) 220 if args.mode == "new": 221 if not args.pubkey: 222 logger.error("Argument PUBKEY: empty value") 223 elif not args.allowed_ips: 224 logger.error("Argument ALLOWED_IPS: empty value") 225 else: 226 cc.register() 227 elif args.mode == "ban": 228 cc.ban() 229 elif args.mode == "unban": 230 cc.unban() 231 elif args.mode == "del": 232 cc.delete()