Edit on GitHub

src.app

  1import asyncio
  2import logging
  3from datetime import datetime, timedelta
  4
  5from aiogram import Bot, Dispatcher
  6from aiogram.client.default import DefaultBotProperties
  7from aiogram.enums import ParseMode
  8from aiogram.fsm.storage.redis import RedisStorage
  9from aiogram.utils.chat_action import ChatActionMiddleware
 10from aiohttp import ClientSession
 11from apscheduler.schedulers.asyncio import AsyncIOScheduler
 12
 13import handlers as hand
 14from core.config import settings
 15from core.err import exception_logging
 16from db import models, utils  # NOTE for subserver
 17from db.utils.tests import test_base, test_redis_base
 18from scheduler.balance import balance_decrement, users_notice
 19from scheduler.config_freezer import check_freeze_configs, validate_configs
 20from scheduler.dump import regular_dump
 21from scheduler.notices import send_notice
 22from wg.utils import SSH
 23
 24logger = logging.getLogger()
 25
 26
 27@exception_logging()
 28def create_bot():
 29    """Создает экземпляр бота и диспетчера.
 30
 31    Эта функция инициализирует бота с заданным токеном и
 32    создает диспетчер с использованием RedisStorage для хранения
 33    состояний.
 34
 35    Returns:
 36        tuple: Кортеж из экземпляра бота и диспетчера.
 37    """
 38    bot = Bot(
 39        token=settings.BOT_TOKEN.get_secret_value(),
 40        default=DefaultBotProperties(parse_mode=ParseMode.HTML),
 41    )
 42
 43    storage = RedisStorage.from_url(
 44        settings.CASHBASE_URL,
 45        state_ttl=timedelta(hours=settings.cash_ttl),
 46        data_ttl=timedelta(hours=settings.cash_ttl),
 47    )
 48
 49    # dp = Dispatcher(storage=MemoryStorage())  # Данные бота стираются при перезапуске
 50    dp = Dispatcher(storage=storage)
 51    dp.message.middleware(ChatActionMiddleware())
 52    dp.include_routers(
 53        hand.admin.router,
 54        hand.ban.router,
 55        hand.group.router,
 56        hand.account.router,
 57        hand.user_service.router,
 58        hand.info.router,
 59        hand.wg_service.router,
 60        hand.payment.router,
 61        hand.bug.router,
 62    )
 63
 64    return bot, dp
 65
 66
 67def create_scheduler(bot):
 68    """Создает планировщик для выполнения задач.
 69
 70    Эта функция инициализирует планировщик для выполнения
 71    различных периодических задач, таких как отправка уведомлений,
 72    уменьшение баланса и проверка конфигураций.
 73
 74    Args:
 75        bot (Bot): Экземпляр бота, используемый для отправки уведомлений.
 76
 77    Returns:
 78        AsyncIOScheduler: Экземпляр планировщика.
 79    """
 80    scheduler = AsyncIOScheduler()
 81    scheduler.add_job(
 82        send_notice,
 83        # trigger="cron",
 84        trigger="interval",
 85        seconds=3,
 86        # hour=00,
 87        # minute=00,
 88        # second=2,
 89        start_date=datetime.now() + timedelta(seconds=10),
 90        kwargs={"bot": bot},
 91    )
 92    scheduler.add_job(
 93        balance_decrement,
 94        trigger="interval",
 95        seconds=3600,
 96        start_date=datetime.now() + timedelta(seconds=20),
 97    )
 98    scheduler.add_job(
 99        users_notice,
100        trigger="interval",
101        seconds=3600,
102        start_date=datetime.now() + timedelta(seconds=30),
103        kwargs={"bot": bot},
104    )
105    scheduler.add_job(
106        check_freeze_configs,
107        trigger="interval",
108        seconds=60,
109        start_date=datetime.now() + timedelta(seconds=15),
110    )
111    scheduler.add_job(
112        validate_configs,
113        trigger="interval",
114        seconds=3600,
115        start_date=datetime.now() + timedelta(seconds=15),
116    )
117    scheduler.add_job(
118        regular_dump,
119        trigger="interval",
120        seconds=3600,
121        start_date=datetime.now() + timedelta(seconds=60),
122    )
123
124    return scheduler
125
126
127async def start_services(
128    bot: Bot, dp: Dispatcher, scheduler: AsyncIOScheduler, timeout: float = None
129):
130    """Запускает службы бота и планировщика.
131
132    Эта функция инициализирует и запускает подсистемы бота, а также
133    запускает планировщик для выполнения задач. Она управляет
134    асинхронным опросом обновлений от Telegram.
135
136    Args:
137        bot (Bot): Экземпляр бота, который будет использоваться для
138            взаимодействия с Telegram API.
139        dp (Dispatcher): Диспетчер, который обрабатывает обновления
140            и маршрутизирует их к соответствующим обработчикам.
141        scheduler (AsyncIOScheduler): Планировщик для управления
142            асинхронными задачами.
143        timeout (float, optional): Максимальное время ожидания в
144            секундах для выполнения задач. Если None, будет ожидать
145            бесконечно.
146
147    Returns:
148        set: Множество задач, которые были созданы для выполнения.
149
150    Raises:
151        RuntimeError: Если происходит ошибка при остановке опроса.
152    """
153    await test_subsystem()
154    scheduler.start()
155
156    tasks = await asyncio.wait(
157        [
158            asyncio.create_task(job)
159            for job in [
160                # bot.delete_webhook(drop_pending_updates=True),
161                dp.start_polling(
162                    bot,
163                    allowed_updates=dp.resolve_used_update_types(),
164                    started_at=datetime.now().strftime("%Y-%m-%d %H:%M"),
165                ),
166            ]
167        ],
168        timeout=timeout,
169    )
170    try:
171        await dp.stop_polling()
172    except RuntimeError:
173        pass
174    return tasks
175
176
177async def test_subsystem():
178    """Тестирует подключение к подсистемам.
179
180    Эта функция проверяет доступность баз данных и других
181    подсистем, необходимых для работы бота.
182
183    Raises:
184        Exception: Если возникает ошибка при подключении к БД.
185    """
186    bases = await test_base()
187    bases.extend(await test_redis_base())
188    bases.append(await test_subserver())
189    for base in bases:
190        try:
191            if isinstance(base, Exception):
192                raise base
193        except Exception:
194            logger.exception("Возникло исключение при подключении к БД")
195            raise
196
197
198async def test_subserver():
199    """Тестирует подключение к подсерверу.
200
201    Эта функция отправляет запрос к подсерверу для проверки его
202    доступности и корректности ответа.
203
204    Raises:
205        AssertionError: Если ответ от подсервера не соответствует ожиданиям.
206    """
207    async with ClientSession() as session:
208        url = f"{settings.subserver_url}test"
209        # url = "http://127.0.0.1:5000/test"
210        params = {"name": "test"}
211
212        async with session.get(url=url, params=params) as response:
213            result: dict = await response.json()
214            assert result.get("message") == "Hello test"
215
216
217async def start_bot():
218    """Запускает бота и его службы.
219
220    Эта функция инициализирует подключение по SSH к wireguard серверу,
221    создает бота, диспетчер и планировщик, а затем запускает все службы.
222
223    Returns:
224        set: Множество задач, которые были созданы для выполнения.
225    """
226    await SSH.connect()
227    bot, dp = create_bot()
228    scheduler = create_scheduler(bot)
229    return await start_services(bot, dp, scheduler)
logger = <RootLogger root (DEBUG)>
@exception_logging()
def create_bot():
28@exception_logging()
29def create_bot():
30    """Создает экземпляр бота и диспетчера.
31
32    Эта функция инициализирует бота с заданным токеном и
33    создает диспетчер с использованием RedisStorage для хранения
34    состояний.
35
36    Returns:
37        tuple: Кортеж из экземпляра бота и диспетчера.
38    """
39    bot = Bot(
40        token=settings.BOT_TOKEN.get_secret_value(),
41        default=DefaultBotProperties(parse_mode=ParseMode.HTML),
42    )
43
44    storage = RedisStorage.from_url(
45        settings.CASHBASE_URL,
46        state_ttl=timedelta(hours=settings.cash_ttl),
47        data_ttl=timedelta(hours=settings.cash_ttl),
48    )
49
50    # dp = Dispatcher(storage=MemoryStorage())  # Данные бота стираются при перезапуске
51    dp = Dispatcher(storage=storage)
52    dp.message.middleware(ChatActionMiddleware())
53    dp.include_routers(
54        hand.admin.router,
55        hand.ban.router,
56        hand.group.router,
57        hand.account.router,
58        hand.user_service.router,
59        hand.info.router,
60        hand.wg_service.router,
61        hand.payment.router,
62        hand.bug.router,
63    )
64
65    return bot, dp

Создает экземпляр бота и диспетчера.

Эта функция инициализирует бота с заданным токеном и создает диспетчер с использованием RedisStorage для хранения состояний.

Returns:

tuple: Кортеж из экземпляра бота и диспетчера.

def create_scheduler(bot):
 68def create_scheduler(bot):
 69    """Создает планировщик для выполнения задач.
 70
 71    Эта функция инициализирует планировщик для выполнения
 72    различных периодических задач, таких как отправка уведомлений,
 73    уменьшение баланса и проверка конфигураций.
 74
 75    Args:
 76        bot (Bot): Экземпляр бота, используемый для отправки уведомлений.
 77
 78    Returns:
 79        AsyncIOScheduler: Экземпляр планировщика.
 80    """
 81    scheduler = AsyncIOScheduler()
 82    scheduler.add_job(
 83        send_notice,
 84        # trigger="cron",
 85        trigger="interval",
 86        seconds=3,
 87        # hour=00,
 88        # minute=00,
 89        # second=2,
 90        start_date=datetime.now() + timedelta(seconds=10),
 91        kwargs={"bot": bot},
 92    )
 93    scheduler.add_job(
 94        balance_decrement,
 95        trigger="interval",
 96        seconds=3600,
 97        start_date=datetime.now() + timedelta(seconds=20),
 98    )
 99    scheduler.add_job(
100        users_notice,
101        trigger="interval",
102        seconds=3600,
103        start_date=datetime.now() + timedelta(seconds=30),
104        kwargs={"bot": bot},
105    )
106    scheduler.add_job(
107        check_freeze_configs,
108        trigger="interval",
109        seconds=60,
110        start_date=datetime.now() + timedelta(seconds=15),
111    )
112    scheduler.add_job(
113        validate_configs,
114        trigger="interval",
115        seconds=3600,
116        start_date=datetime.now() + timedelta(seconds=15),
117    )
118    scheduler.add_job(
119        regular_dump,
120        trigger="interval",
121        seconds=3600,
122        start_date=datetime.now() + timedelta(seconds=60),
123    )
124
125    return scheduler

Создает планировщик для выполнения задач.

Эта функция инициализирует планировщик для выполнения различных периодических задач, таких как отправка уведомлений, уменьшение баланса и проверка конфигураций.

Arguments:
  • bot (Bot): Экземпляр бота, используемый для отправки уведомлений.
Returns:

AsyncIOScheduler: Экземпляр планировщика.

async def start_services( bot: aiogram.client.bot.Bot, dp: aiogram.dispatcher.dispatcher.Dispatcher, scheduler: apscheduler.schedulers.asyncio.AsyncIOScheduler, timeout: float = None):
128async def start_services(
129    bot: Bot, dp: Dispatcher, scheduler: AsyncIOScheduler, timeout: float = None
130):
131    """Запускает службы бота и планировщика.
132
133    Эта функция инициализирует и запускает подсистемы бота, а также
134    запускает планировщик для выполнения задач. Она управляет
135    асинхронным опросом обновлений от Telegram.
136
137    Args:
138        bot (Bot): Экземпляр бота, который будет использоваться для
139            взаимодействия с Telegram API.
140        dp (Dispatcher): Диспетчер, который обрабатывает обновления
141            и маршрутизирует их к соответствующим обработчикам.
142        scheduler (AsyncIOScheduler): Планировщик для управления
143            асинхронными задачами.
144        timeout (float, optional): Максимальное время ожидания в
145            секундах для выполнения задач. Если None, будет ожидать
146            бесконечно.
147
148    Returns:
149        set: Множество задач, которые были созданы для выполнения.
150
151    Raises:
152        RuntimeError: Если происходит ошибка при остановке опроса.
153    """
154    await test_subsystem()
155    scheduler.start()
156
157    tasks = await asyncio.wait(
158        [
159            asyncio.create_task(job)
160            for job in [
161                # bot.delete_webhook(drop_pending_updates=True),
162                dp.start_polling(
163                    bot,
164                    allowed_updates=dp.resolve_used_update_types(),
165                    started_at=datetime.now().strftime("%Y-%m-%d %H:%M"),
166                ),
167            ]
168        ],
169        timeout=timeout,
170    )
171    try:
172        await dp.stop_polling()
173    except RuntimeError:
174        pass
175    return tasks

Запускает службы бота и планировщика.

Эта функция инициализирует и запускает подсистемы бота, а также запускает планировщик для выполнения задач. Она управляет асинхронным опросом обновлений от Telegram.

Arguments:
  • bot (Bot): Экземпляр бота, который будет использоваться для взаимодействия с Telegram API.
  • dp (Dispatcher): Диспетчер, который обрабатывает обновления и маршрутизирует их к соответствующим обработчикам.
  • scheduler (AsyncIOScheduler): Планировщик для управления асинхронными задачами.
  • timeout (float, optional): Максимальное время ожидания в секундах для выполнения задач. Если None, будет ожидать бесконечно.
Returns:

set: Множество задач, которые были созданы для выполнения.

Raises:
  • RuntimeError: Если происходит ошибка при остановке опроса.
async def test_subsystem():
178async def test_subsystem():
179    """Тестирует подключение к подсистемам.
180
181    Эта функция проверяет доступность баз данных и других
182    подсистем, необходимых для работы бота.
183
184    Raises:
185        Exception: Если возникает ошибка при подключении к БД.
186    """
187    bases = await test_base()
188    bases.extend(await test_redis_base())
189    bases.append(await test_subserver())
190    for base in bases:
191        try:
192            if isinstance(base, Exception):
193                raise base
194        except Exception:
195            logger.exception("Возникло исключение при подключении к БД")
196            raise

Тестирует подключение к подсистемам.

Эта функция проверяет доступность баз данных и других подсистем, необходимых для работы бота.

Raises:
  • Exception: Если возникает ошибка при подключении к БД.
async def test_subserver():
199async def test_subserver():
200    """Тестирует подключение к подсерверу.
201
202    Эта функция отправляет запрос к подсерверу для проверки его
203    доступности и корректности ответа.
204
205    Raises:
206        AssertionError: Если ответ от подсервера не соответствует ожиданиям.
207    """
208    async with ClientSession() as session:
209        url = f"{settings.subserver_url}test"
210        # url = "http://127.0.0.1:5000/test"
211        params = {"name": "test"}
212
213        async with session.get(url=url, params=params) as response:
214            result: dict = await response.json()
215            assert result.get("message") == "Hello test"

Тестирует подключение к подсерверу.

Эта функция отправляет запрос к подсерверу для проверки его доступности и корректности ответа.

Raises:
  • AssertionError: Если ответ от подсервера не соответствует ожиданиям.
async def start_bot():
218async def start_bot():
219    """Запускает бота и его службы.
220
221    Эта функция инициализирует подключение по SSH к wireguard серверу,
222    создает бота, диспетчер и планировщик, а затем запускает все службы.
223
224    Returns:
225        set: Множество задач, которые были созданы для выполнения.
226    """
227    await SSH.connect()
228    bot, dp = create_bot()
229    scheduler = create_scheduler(bot)
230    return await start_services(bot, dp, scheduler)

Запускает бота и его службы.

Эта функция инициализирует подключение по SSH к wireguard серверу, создает бота, диспетчер и планировщик, а затем запускает все службы.

Returns:

set: Множество задач, которые были созданы для выполнения.