src.db.utils.save
Функционал для работы с БД. Резервирование
1"""Функционал для работы с БД. Резервирование""" 2 3import asyncio 4import logging 5import os 6from datetime import datetime, timedelta 7 8from pandas import DataFrame, ExcelWriter 9from sqlalchemy import select 10 11from core.config import settings 12from core.exceptions import BackupError, DumpError 13from core.path import PATH 14from db.database import execute_query 15from db.models import TABLES_SCHEMA, Base 16 17logger = logging.getLogger() 18 19 20async def async_backup(): 21 """Создает резервную копию базы данных в формате Excel. 22 23 Функция извлекает данные из всех таблиц, определенных в TABLES_SCHEMA, 24 и сохраняет их в файл Excel. Даты в столбцах преобразуются в локальное время. 25 26 Returns: 27 str: Путь к созданному файлу резервной копии. 28 29 Raises: 30 BackupError: Если произошла ошибка при создании резервной копии. 31 """ 32 filename = os.path.join( 33 PATH, 34 "src", 35 "db", 36 "backups", 37 f"backup_{datetime.today().strftime('%d_%m_%Y-%H_%M')}.xlsx", 38 ) 39 with ExcelWriter(filename, engine="openpyxl", mode="w") as writer: 40 try: 41 for model in TABLES_SCHEMA.values(): 42 if model.__bases__[0] is Base: 43 query = select(model).order_by(model.id) 44 res = await execute_query(query) 45 46 df = DataFrame( 47 data=[line.__udict__ for line in res.scalars().all()] 48 ) 49 50 date_columns = df.select_dtypes( 51 include=["datetime64[ns, UTC]"] 52 ).columns 53 for date_column in date_columns: 54 df[date_column] = df[date_column].dt.tz_localize(None) 55 df[date_column] = df[date_column] + timedelta(hours=3) 56 57 df.to_excel(writer, sheet_name=model.__tablename__, index=False) 58 except Exception: 59 logger.exception("Ошибка создания дампа БД") 60 raise BackupError 61 else: 62 return filename 63 64 65async def dump(regular=False): 66 """Создает дамп базы данных в формате SQL. 67 68 Функция выполняет команду pg_dump для создания дампа базы данных. 69 Дамп сохраняется в файл с указанием, является ли он регулярным. 70 (НЕ РАБОТАЕТ НА WINDOWS - я хз как передавать пароль утилите pg_dump) 71 72 Args: 73 regular (bool): Указывает, является ли дамп регулярным. По умолчанию False. 74 75 Returns: 76 None 77 78 Raises: 79 DumpError: Если произошла ошибка при создании дампа. 80 """ 81 filename = os.path.join( 82 PATH, 83 "src", 84 "db", 85 "dumps", 86 f"{'regular_' if regular else ''}dump_{datetime.today().strftime('%d_%m_%Y-%H_%M')}.sql", 87 ) 88 89 cmd = ( 90 f"PGPASSWORD={settings.DB_PASS.get_secret_value()} " 91 f"pg_dump -U {settings.DB_USER} {settings.DB_NAME} " 92 f"-p {settings.DB_PORT} -h {settings.DB_HOST} > {filename}" 93 ) 94 95 logger.info("Создание дампа БД") 96 97 try: 98 proc = await asyncio.create_subprocess_shell( 99 cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE 100 ) 101 stdout, stderr = await proc.communicate() 102 103 logger.info(f"pg_dump exited with {proc.returncode}") 104 105 if proc.returncode != 0: 106 # Попробуйте декодировать с игнорированием ошибок 107 error_message = stderr.decode("utf-8", errors="ignore") 108 logger.error(f"Ошибка создания дампа БД: {error_message}") 109 raise DumpError(error_message) 110 else: 111 logger.info("Дамп БД успешно создан", extra={"dumpname": filename}) 112 113 except Exception as e: 114 logger.exception("Произошла ошибка при создании дампа БД") 115 raise DumpError(str(e))
logger =
<RootLogger root (DEBUG)>
async def
async_backup():
21async def async_backup(): 22 """Создает резервную копию базы данных в формате Excel. 23 24 Функция извлекает данные из всех таблиц, определенных в TABLES_SCHEMA, 25 и сохраняет их в файл Excel. Даты в столбцах преобразуются в локальное время. 26 27 Returns: 28 str: Путь к созданному файлу резервной копии. 29 30 Raises: 31 BackupError: Если произошла ошибка при создании резервной копии. 32 """ 33 filename = os.path.join( 34 PATH, 35 "src", 36 "db", 37 "backups", 38 f"backup_{datetime.today().strftime('%d_%m_%Y-%H_%M')}.xlsx", 39 ) 40 with ExcelWriter(filename, engine="openpyxl", mode="w") as writer: 41 try: 42 for model in TABLES_SCHEMA.values(): 43 if model.__bases__[0] is Base: 44 query = select(model).order_by(model.id) 45 res = await execute_query(query) 46 47 df = DataFrame( 48 data=[line.__udict__ for line in res.scalars().all()] 49 ) 50 51 date_columns = df.select_dtypes( 52 include=["datetime64[ns, UTC]"] 53 ).columns 54 for date_column in date_columns: 55 df[date_column] = df[date_column].dt.tz_localize(None) 56 df[date_column] = df[date_column] + timedelta(hours=3) 57 58 df.to_excel(writer, sheet_name=model.__tablename__, index=False) 59 except Exception: 60 logger.exception("Ошибка создания дампа БД") 61 raise BackupError 62 else: 63 return filename
Создает резервную копию базы данных в формате Excel.
Функция извлекает данные из всех таблиц, определенных в TABLES_SCHEMA, и сохраняет их в файл Excel. Даты в столбцах преобразуются в локальное время.
Returns:
str: Путь к созданному файлу резервной копии.
Raises:
- BackupError: Если произошла ошибка при создании резервной копии.
async def
dump(regular=False):
66async def dump(regular=False): 67 """Создает дамп базы данных в формате SQL. 68 69 Функция выполняет команду pg_dump для создания дампа базы данных. 70 Дамп сохраняется в файл с указанием, является ли он регулярным. 71 (НЕ РАБОТАЕТ НА WINDOWS - я хз как передавать пароль утилите pg_dump) 72 73 Args: 74 regular (bool): Указывает, является ли дамп регулярным. По умолчанию False. 75 76 Returns: 77 None 78 79 Raises: 80 DumpError: Если произошла ошибка при создании дампа. 81 """ 82 filename = os.path.join( 83 PATH, 84 "src", 85 "db", 86 "dumps", 87 f"{'regular_' if regular else ''}dump_{datetime.today().strftime('%d_%m_%Y-%H_%M')}.sql", 88 ) 89 90 cmd = ( 91 f"PGPASSWORD={settings.DB_PASS.get_secret_value()} " 92 f"pg_dump -U {settings.DB_USER} {settings.DB_NAME} " 93 f"-p {settings.DB_PORT} -h {settings.DB_HOST} > {filename}" 94 ) 95 96 logger.info("Создание дампа БД") 97 98 try: 99 proc = await asyncio.create_subprocess_shell( 100 cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE 101 ) 102 stdout, stderr = await proc.communicate() 103 104 logger.info(f"pg_dump exited with {proc.returncode}") 105 106 if proc.returncode != 0: 107 # Попробуйте декодировать с игнорированием ошибок 108 error_message = stderr.decode("utf-8", errors="ignore") 109 logger.error(f"Ошибка создания дампа БД: {error_message}") 110 raise DumpError(error_message) 111 else: 112 logger.info("Дамп БД успешно создан", extra={"dumpname": filename}) 113 114 except Exception as e: 115 logger.exception("Произошла ошибка при создании дампа БД") 116 raise DumpError(str(e))
Создает дамп базы данных в формате SQL.
Функция выполняет команду pg_dump для создания дампа базы данных. Дамп сохраняется в файл с указанием, является ли он регулярным. (НЕ РАБОТАЕТ НА WINDOWS - я хз как передавать пароль утилите pg_dump)
Arguments:
- regular (bool): Указывает, является ли дамп регулярным. По умолчанию False.
Returns:
None
Raises:
- DumpError: Если произошла ошибка при создании дампа.