src.handlers.admin
Функционал администратора
1"""Функционал администратора""" 2 3import logging 4 5from aiogram import Bot, F, Router 6from aiogram.filters.command import Command 7from aiogram.fsm.context import FSMContext 8from aiogram.types import FSInputFile, Message 9from aiogram.utils.formatting import Bold, as_marked_section 10from pytils.numeral import get_plural 11 12import text 13from core import exceptions as exc 14from core.config import settings 15from core.err import bot_except 16from core.metric import async_speed_metric 17from db import utils 18from db.models import UserData 19from states import AdminService 20 21logger = logging.getLogger() 22router = Router() 23router.message.filter(F.chat.type == "private") 24 25 26@router.message(Command("admin")) 27@async_speed_metric 28@bot_except 29async def admin_actions(message: Message): 30 """Обрабатывает команду /admin и предоставляет список команд администратора. 31 32 Args: 33 message (Message): Сообщение от пользователя. 34 """ 35 try: 36 user_data: UserData = await utils.get_user(message.from_user.id) 37 if getattr(user_data, "admin", False): 38 help_t = as_marked_section( 39 Bold("Функционал администратора телеграмм бота"), 40 "/admin - список команд администратора", 41 "/backup - выгрузить бэкап БД в виде excel таблицы", 42 "/send - рассылка сообщения всем зарегистрированным пользователям", 43 "/close - уведомление пользователей о технических работах на сервере", 44 "/open - уведомление пользователей об окончании технических работ на сервере", 45 marker="~ ", 46 ) 47 48 await message.answer(**help_t.as_kwargs()) 49 50 else: 51 await message.answer(text.only_admin) 52 except exc.DatabaseError: 53 await message.answer(text.DB_ERROR) 54 55 56@router.message(F.text == settings.ADMIN_PASS.get_secret_value()) 57@async_speed_metric 58@bot_except 59async def become_an_admin(message: Message, bot: Bot): 60 """Позволяет пользователю стать администратором, если введен правильный пароль. 61 62 Args: 63 message (Message): Сообщение от пользователя. 64 bot (Bot): Экземпляр бота. 65 """ 66 try: 67 user_data: UserData = await utils.set_admin(message.from_user.id) 68 except exc.DatabaseError: 69 await message.answer(text.DB_ERROR) 70 else: 71 if getattr(user_data, "admin", False): 72 await message.answer("Вы успешно зарегистрированы как администратор!") 73 else: 74 await message.answer( 75 "Пароль верный! Однако для начала необходимо зарегистрироваться! Попробуйте ввести команду /reg" 76 ) 77 finally: 78 await bot.delete_message(message.from_user.id, message.message_id) 79 80 81@router.message(Command("backup")) 82@async_speed_metric 83@bot_except 84async def get_backup(message: Message): 85 """Создает резервную копию базы данных и отправляет ее пользователю. 86 87 Args: 88 message (Message): Сообщение от пользователя. 89 """ 90 try: 91 user_data: UserData = await utils.get_user(message.from_user.id) 92 if getattr(user_data, "admin", False): 93 backup = await utils.async_backup() 94 await message.answer_document(FSInputFile(backup)) 95 else: 96 await message.answer(text.only_admin) 97 except exc.BackupError as e: 98 await message.answer(e.args[0]) 99 except exc.DatabaseError: 100 await message.answer(text.DB_ERROR) 101 102 103@router.message(Command("send")) 104@async_speed_metric 105@bot_except 106async def admin_mailing_start(message: Message, state: FSMContext): 107 """Запускает процесс рассылки сообщения всем зарегистрированным пользователям. 108 109 Args: 110 message (Message): Сообщение от пользователя. 111 state (FSMContext): Контекст состояния для управления состоянием бота. 112 """ 113 try: 114 user_data: UserData = await utils.get_user(message.from_user.id) 115 if getattr(user_data, "admin", False): 116 await state.set_state(AdminService.mailing_confirm) 117 await message.answer( 118 "Введите сообщение для рассылки зарегистрированным пользователям" 119 ) 120 else: 121 await message.answer(text.only_admin) 122 except exc.DatabaseError: 123 await message.answer(text.DB_ERROR) 124 125 126@router.message(AdminService.mailing_confirm, F.text) 127@async_speed_metric 128@bot_except 129async def admin_mailing_confirm(message: Message, state: FSMContext): 130 """Подтверждает сообщение для рассылки и запрашивает подтверждение статуса администратора. 131 132 Args: 133 message (Message): Сообщение от пользователя. 134 state (FSMContext): Контекст состояния для управления состоянием бота. 135 """ 136 try: 137 user_data: UserData = await utils.get_user(message.from_user.id) 138 if getattr(user_data, "admin", False): 139 await state.set_state(AdminService.mailing_message) 140 await message.answer(f"Сообщение для рассылки:\n<b>{message.text}</b>") 141 await message.answer( 142 "Вы уверены, что хотите отправить это <b>ВСЕМ</b> пользователям? " 143 "Для отмены нажмите /cancel или введите `нет`" 144 ) 145 await state.update_data({"mailing_message": message.text}) 146 147 else: 148 await message.answer(text.only_admin) 149 except exc.DatabaseError: 150 await message.answer(text.DB_ERROR) 151 152 153@router.message(AdminService.mailing_message, F.text.lower().in_(text.yes)) 154@async_speed_metric 155@bot_except 156async def admin_mailing_finish(message: Message, bot: Bot, state: FSMContext): 157 """Завершает процесс рассылки сообщения всем пользователям. 158 159 Args: 160 message (Message): Сообщение от пользователя. 161 bot (Bot): Экземпляр бота. 162 state (FSMContext): Контекст состояния для управления состоянием бота. 163 """ 164 try: 165 user_data: UserData = await utils.get_user(message.from_user.id) 166 if getattr(user_data, "admin", False): 167 mailing_message = (await state.get_data())["mailing_message"] 168 169 all_users_data: list[UserData] = await utils.get_valid_users( 170 message.from_user.id 171 ) 172 173 for user_data in all_users_data: 174 await bot.send_message(user_data.telegram_id, mailing_message) 175 await message.answer( 176 f"Сообщение отправлено {get_plural(len(all_users_data), 'пользователю, пользователям, пользователям')}" 177 ) 178 179 else: 180 await message.answer(text.only_admin) 181 182 await state.clear() 183 await state.set_state() 184 185 except exc.DatabaseError: 186 await message.answer(text.DB_ERROR) 187 188 189@router.message(AdminService.mailing_message, Command("cancel")) 190@router.message(AdminService.mailing_message, F.text.lower().in_(text.no)) 191@async_speed_metric 192@bot_except 193async def admin_mailing_cancel(message: Message, state: FSMContext): 194 """Отменяет процесс рассылки сообщения. 195 196 Args: 197 message (Message): Сообщение от пользователя. 198 state (FSMContext): Контекст состояния для управления состоянием бота. 199 """ 200 await message.answer("Отменено") 201 202 await state.clear() 203 await state.set_state() 204 205 206@router.message(AdminService.mailing_message, F.text) 207@async_speed_metric 208@bot_except 209async def admin_mailing_repeat(message: Message): 210 """Запрашивает повторное подтверждение от администратора. 211 212 Args: 213 message (Message): Сообщение от пользователя. 214 """ 215 await message.answer("<b>ДА ИЛИ НЕТ?!</b>") 216 217 218@router.message(Command("close")) 219@bot_except 220async def admin_mailing_stop_server(message: Message, bot: Bot): 221 """Уведомляет пользователей о технических работах на сервере. 222 223 Args: 224 message (Message): Сообщение от пользователя. 225 bot (Bot): Экземпляр бота. 226 """ 227 try: 228 user_data: UserData = await utils.get_user(message.from_user.id) 229 if getattr(user_data, "admin", False): 230 all_users_data: list[UserData] = await utils.get_valid_users( 231 message.from_user.id 232 ) 233 234 for user_data in all_users_data: 235 await bot.send_message(user_data.telegram_id, text.SERVER_STOPPED) 236 await message.answer( 237 f"Сообщение отправлено {get_plural(len(all_users_data), 'пользователю, пользователям, пользователям')}" 238 ) 239 240 else: 241 await message.answer(text.only_admin) 242 243 except exc.DatabaseError: 244 await message.answer(text.DB_ERROR) 245 246 247@router.message(Command("open")) 248@bot_except 249async def admin_mailing_start_server(message: Message, bot: Bot): 250 """Уведомляет пользователей об окончании технических работ на сервере. 251 252 Args: 253 message (Message): Сообщение от пользователя. 254 bot (Bot): Экземпляр бота. 255 """ 256 try: 257 user_data: UserData = await utils.get_user(message.from_user.id) 258 if getattr(user_data, "admin", False): 259 all_users_data: list[UserData] = await utils.get_valid_users( 260 message.from_user.id 261 ) 262 263 for user_data in all_users_data: 264 await bot.send_message(user_data.telegram_id, text.SERVER_STARTED) 265 await message.answer( 266 f"Сообщение отправлено {get_plural(len(all_users_data), 'пользователю, пользователям, пользователям')}" 267 ) 268 269 else: 270 await message.answer(text.only_admin) 271 272 except exc.DatabaseError: 273 await message.answer(text.DB_ERROR)
logger =
<RootLogger root (DEBUG)>
router =
<Router '0x7f23dac096a0'>
@async_speed_metric
@bot_except
async def
admin_actions(message: aiogram.types.message.Message):
27@router.message(Command("admin")) 28@async_speed_metric 29@bot_except 30async def admin_actions(message: Message): 31 """Обрабатывает команду /admin и предоставляет список команд администратора. 32 33 Args: 34 message (Message): Сообщение от пользователя. 35 """ 36 try: 37 user_data: UserData = await utils.get_user(message.from_user.id) 38 if getattr(user_data, "admin", False): 39 help_t = as_marked_section( 40 Bold("Функционал администратора телеграмм бота"), 41 "/admin - список команд администратора", 42 "/backup - выгрузить бэкап БД в виде excel таблицы", 43 "/send - рассылка сообщения всем зарегистрированным пользователям", 44 "/close - уведомление пользователей о технических работах на сервере", 45 "/open - уведомление пользователей об окончании технических работ на сервере", 46 marker="~ ", 47 ) 48 49 await message.answer(**help_t.as_kwargs()) 50 51 else: 52 await message.answer(text.only_admin) 53 except exc.DatabaseError: 54 await message.answer(text.DB_ERROR)
Обрабатывает команду /admin и предоставляет список команд администратора.
Arguments:
- message (Message): Сообщение от пользователя.
@async_speed_metric
@bot_except
async def
become_an_admin(message: aiogram.types.message.Message, bot: aiogram.client.bot.Bot):
57@router.message(F.text == settings.ADMIN_PASS.get_secret_value()) 58@async_speed_metric 59@bot_except 60async def become_an_admin(message: Message, bot: Bot): 61 """Позволяет пользователю стать администратором, если введен правильный пароль. 62 63 Args: 64 message (Message): Сообщение от пользователя. 65 bot (Bot): Экземпляр бота. 66 """ 67 try: 68 user_data: UserData = await utils.set_admin(message.from_user.id) 69 except exc.DatabaseError: 70 await message.answer(text.DB_ERROR) 71 else: 72 if getattr(user_data, "admin", False): 73 await message.answer("Вы успешно зарегистрированы как администратор!") 74 else: 75 await message.answer( 76 "Пароль верный! Однако для начала необходимо зарегистрироваться! Попробуйте ввести команду /reg" 77 ) 78 finally: 79 await bot.delete_message(message.from_user.id, message.message_id)
Позволяет пользователю стать администратором, если введен правильный пароль.
Arguments:
- message (Message): Сообщение от пользователя.
- bot (Bot): Экземпляр бота.
@async_speed_metric
@bot_except
async def
get_backup(message: aiogram.types.message.Message):
82@router.message(Command("backup")) 83@async_speed_metric 84@bot_except 85async def get_backup(message: Message): 86 """Создает резервную копию базы данных и отправляет ее пользователю. 87 88 Args: 89 message (Message): Сообщение от пользователя. 90 """ 91 try: 92 user_data: UserData = await utils.get_user(message.from_user.id) 93 if getattr(user_data, "admin", False): 94 backup = await utils.async_backup() 95 await message.answer_document(FSInputFile(backup)) 96 else: 97 await message.answer(text.only_admin) 98 except exc.BackupError as e: 99 await message.answer(e.args[0]) 100 except exc.DatabaseError: 101 await message.answer(text.DB_ERROR)
Создает резервную копию базы данных и отправляет ее пользователю.
Arguments:
- message (Message): Сообщение от пользователя.
@async_speed_metric
@bot_except
async def
admin_mailing_start( message: aiogram.types.message.Message, state: aiogram.fsm.context.FSMContext):
104@router.message(Command("send")) 105@async_speed_metric 106@bot_except 107async def admin_mailing_start(message: Message, state: FSMContext): 108 """Запускает процесс рассылки сообщения всем зарегистрированным пользователям. 109 110 Args: 111 message (Message): Сообщение от пользователя. 112 state (FSMContext): Контекст состояния для управления состоянием бота. 113 """ 114 try: 115 user_data: UserData = await utils.get_user(message.from_user.id) 116 if getattr(user_data, "admin", False): 117 await state.set_state(AdminService.mailing_confirm) 118 await message.answer( 119 "Введите сообщение для рассылки зарегистрированным пользователям" 120 ) 121 else: 122 await message.answer(text.only_admin) 123 except exc.DatabaseError: 124 await message.answer(text.DB_ERROR)
Запускает процесс рассылки сообщения всем зарегистрированным пользователям.
Arguments:
- message (Message): Сообщение от пользователя.
- state (FSMContext): Контекст состояния для управления состоянием бота.
@async_speed_metric
@bot_except
async def
admin_mailing_confirm( message: aiogram.types.message.Message, state: aiogram.fsm.context.FSMContext):
127@router.message(AdminService.mailing_confirm, F.text) 128@async_speed_metric 129@bot_except 130async def admin_mailing_confirm(message: Message, state: FSMContext): 131 """Подтверждает сообщение для рассылки и запрашивает подтверждение статуса администратора. 132 133 Args: 134 message (Message): Сообщение от пользователя. 135 state (FSMContext): Контекст состояния для управления состоянием бота. 136 """ 137 try: 138 user_data: UserData = await utils.get_user(message.from_user.id) 139 if getattr(user_data, "admin", False): 140 await state.set_state(AdminService.mailing_message) 141 await message.answer(f"Сообщение для рассылки:\n<b>{message.text}</b>") 142 await message.answer( 143 "Вы уверены, что хотите отправить это <b>ВСЕМ</b> пользователям? " 144 "Для отмены нажмите /cancel или введите `нет`" 145 ) 146 await state.update_data({"mailing_message": message.text}) 147 148 else: 149 await message.answer(text.only_admin) 150 except exc.DatabaseError: 151 await message.answer(text.DB_ERROR)
Подтверждает сообщение для рассылки и запрашивает подтверждение статуса администратора.
Arguments:
- message (Message): Сообщение от пользователя.
- state (FSMContext): Контекст состояния для управления состоянием бота.
@async_speed_metric
@bot_except
async def
admin_mailing_finish( message: aiogram.types.message.Message, bot: aiogram.client.bot.Bot, state: aiogram.fsm.context.FSMContext):
154@router.message(AdminService.mailing_message, F.text.lower().in_(text.yes)) 155@async_speed_metric 156@bot_except 157async def admin_mailing_finish(message: Message, bot: Bot, state: FSMContext): 158 """Завершает процесс рассылки сообщения всем пользователям. 159 160 Args: 161 message (Message): Сообщение от пользователя. 162 bot (Bot): Экземпляр бота. 163 state (FSMContext): Контекст состояния для управления состоянием бота. 164 """ 165 try: 166 user_data: UserData = await utils.get_user(message.from_user.id) 167 if getattr(user_data, "admin", False): 168 mailing_message = (await state.get_data())["mailing_message"] 169 170 all_users_data: list[UserData] = await utils.get_valid_users( 171 message.from_user.id 172 ) 173 174 for user_data in all_users_data: 175 await bot.send_message(user_data.telegram_id, mailing_message) 176 await message.answer( 177 f"Сообщение отправлено {get_plural(len(all_users_data), 'пользователю, пользователям, пользователям')}" 178 ) 179 180 else: 181 await message.answer(text.only_admin) 182 183 await state.clear() 184 await state.set_state() 185 186 except exc.DatabaseError: 187 await message.answer(text.DB_ERROR)
Завершает процесс рассылки сообщения всем пользователям.
Arguments:
- message (Message): Сообщение от пользователя.
- bot (Bot): Экземпляр бота.
- state (FSMContext): Контекст состояния для управления состоянием бота.
@async_speed_metric
@bot_except
async def
admin_mailing_cancel( message: aiogram.types.message.Message, state: aiogram.fsm.context.FSMContext):
190@router.message(AdminService.mailing_message, Command("cancel")) 191@router.message(AdminService.mailing_message, F.text.lower().in_(text.no)) 192@async_speed_metric 193@bot_except 194async def admin_mailing_cancel(message: Message, state: FSMContext): 195 """Отменяет процесс рассылки сообщения. 196 197 Args: 198 message (Message): Сообщение от пользователя. 199 state (FSMContext): Контекст состояния для управления состоянием бота. 200 """ 201 await message.answer("Отменено") 202 203 await state.clear() 204 await state.set_state()
Отменяет процесс рассылки сообщения.
Arguments:
- message (Message): Сообщение от пользователя.
- state (FSMContext): Контекст состояния для управления состоянием бота.
@async_speed_metric
@bot_except
async def
admin_mailing_repeat(message: aiogram.types.message.Message):
207@router.message(AdminService.mailing_message, F.text) 208@async_speed_metric 209@bot_except 210async def admin_mailing_repeat(message: Message): 211 """Запрашивает повторное подтверждение от администратора. 212 213 Args: 214 message (Message): Сообщение от пользователя. 215 """ 216 await message.answer("<b>ДА ИЛИ НЕТ?!</b>")
Запрашивает повторное подтверждение от администратора.
Arguments:
- message (Message): Сообщение от пользователя.
@bot_except
async def
admin_mailing_stop_server(message: aiogram.types.message.Message, bot: aiogram.client.bot.Bot):
219@router.message(Command("close")) 220@bot_except 221async def admin_mailing_stop_server(message: Message, bot: Bot): 222 """Уведомляет пользователей о технических работах на сервере. 223 224 Args: 225 message (Message): Сообщение от пользователя. 226 bot (Bot): Экземпляр бота. 227 """ 228 try: 229 user_data: UserData = await utils.get_user(message.from_user.id) 230 if getattr(user_data, "admin", False): 231 all_users_data: list[UserData] = await utils.get_valid_users( 232 message.from_user.id 233 ) 234 235 for user_data in all_users_data: 236 await bot.send_message(user_data.telegram_id, text.SERVER_STOPPED) 237 await message.answer( 238 f"Сообщение отправлено {get_plural(len(all_users_data), 'пользователю, пользователям, пользователям')}" 239 ) 240 241 else: 242 await message.answer(text.only_admin) 243 244 except exc.DatabaseError: 245 await message.answer(text.DB_ERROR)
Уведомляет пользователей о технических работах на сервере.
Arguments:
- message (Message): Сообщение от пользователя.
- bot (Bot): Экземпляр бота.
@bot_except
async def
admin_mailing_start_server(message: aiogram.types.message.Message, bot: aiogram.client.bot.Bot):
248@router.message(Command("open")) 249@bot_except 250async def admin_mailing_start_server(message: Message, bot: Bot): 251 """Уведомляет пользователей об окончании технических работ на сервере. 252 253 Args: 254 message (Message): Сообщение от пользователя. 255 bot (Bot): Экземпляр бота. 256 """ 257 try: 258 user_data: UserData = await utils.get_user(message.from_user.id) 259 if getattr(user_data, "admin", False): 260 all_users_data: list[UserData] = await utils.get_valid_users( 261 message.from_user.id 262 ) 263 264 for user_data in all_users_data: 265 await bot.send_message(user_data.telegram_id, text.SERVER_STARTED) 266 await message.answer( 267 f"Сообщение отправлено {get_plural(len(all_users_data), 'пользователю, пользователям, пользователям')}" 268 ) 269 270 else: 271 await message.answer(text.only_admin) 272 273 except exc.DatabaseError: 274 await message.answer(text.DB_ERROR)
Уведомляет пользователей об окончании технических работ на сервере.
Arguments:
- message (Message): Сообщение от пользователя.
- bot (Bot): Экземпляр бота.