Edit on GitHub

src.db.utils.redis

Функционал для работы с Redis.

  1"""Функционал для работы с Redis."""
  2
  3import logging
  4from collections.abc import Iterable, Mapping
  5from datetime import timedelta
  6from types import NoneType
  7from typing import Union
  8
  9from core.config import settings
 10from core.exceptions import RedisTypeError
 11from db.database import execute_redis_query, iter_redis_keys, redis_engine
 12from db.models import Transactions, UserData, WgConfig
 13
 14logger = logging.getLogger("redis")
 15
 16
 17class CashManager:
 18    """Менеджер для работы с кэшем в Redis."""
 19
 20    class user_id:
 21        """Класс для аннотации типов."""
 22
 23    redis_types = (bytes, str, int, float, NoneType)
 24    """Допустимые типы данных для хранения в Redis."""
 25
 26    def __init__(
 27        self, validation_model: Union[UserData, WgConfig, Transactions]
 28    ) -> None:
 29        """Инициализирует CashManager.
 30
 31        Args:
 32            validation_model (Union[UserData, WgConfig, Transactions]): Модель для валидации данных.
 33        """
 34        self.pipe = redis_engine.pipeline()
 35        self.model = validation_model
 36
 37    @property
 38    def cmd(self):
 39        """Доступ к redis pipeline для выполнения команд.
 40
 41        Returns:
 42            Pipeline: Текущий Redis pipeline.
 43        """
 44        return self.pipe
 45
 46    async def __call__(self):
 47        """Выполняет команды Redis и возвращает результаты.
 48
 49        Returns:
 50            list: Список объектов модели, представляющих результаты выполнения команд.
 51        """
 52        results = await execute_redis_query(self.pipe)
 53
 54        validated_results = []
 55        for result in results:
 56            if result and isinstance(result, (dict, str, tuple, list)):
 57                match result:
 58                    case Mapping():
 59                        validated_results.append(self.model(**result))
 60                    case Iterable():
 61                        validated_results.append(self.model(*result))
 62                    case _:
 63                        validated_results.append(self.model(result))
 64
 65        if len(validated_results):
 66            return validated_results
 67
 68    def converter(self, data):
 69        """Преобразует данные в допустимые типы для Redis.
 70
 71        Args:
 72            data (Union[dict, list, tuple, set]): Данные для преобразования.
 73
 74        Returns:
 75            Union[dict, list, set]: Преобразованные данные.
 76        """
 77        match data:
 78            case dict():
 79                return {
 80                    key: (value if type(value) in self.redis_types else str(value))
 81                    for key, value in data.items()
 82                }
 83            case list() | tuple():
 84                return [
 85                    value if type(value) in self.redis_types else str(value)
 86                    for value in data
 87                ]
 88            case set():
 89                return {
 90                    value if type(value) in self.redis_types else str(value)
 91                    for value in data
 92                }
 93            case _:
 94                return data
 95
 96    async def add(
 97        self,
 98        key_map: dict[user_id, dict | list | tuple | set | str | int | float] = None,
 99        **mapping: dict[user_id, dict | list | tuple | set | str | int | float],
100    ):
101        """Добавляет данные в Redis.
102
103        Args:
104            key_map (dict, optional): Словарь ключей и значений для добавления.
105            **mapping (dict): Дополнительные ключи и значения для добавления.
106
107        Examples:
108            add({user_id: {data}})
109        """
110        for key, item in key_map.items() if key_map else mapping.items():
111            if not item:
112                return
113            item = self.converter(item)
114            key = f"data:{self.model.__tablename__}:{key}"
115
116            match item:
117                case dict():
118                    self.pipe.hset(name=key, mapping=item)
119                case list() | tuple():
120                    self.pipe.rpush(key, *item)
121                case set():
122                    self.pipe.sadd(key, *item)
123                case str() | int() | float():
124                    self.pipe.set(name=key, value=item)
125
126                case _:
127                    raise RedisTypeError
128
129            self.pipe.expire(key, timedelta(hours=settings.cash_ttl))
130        await execute_redis_query(self.pipe)
131
132    async def get(self, *id_obj: dict | list | tuple | set | str | int | float):
133        """Получает данные из Redis по заданным ключам.
134
135        Args:
136            *id_obj (Union[dict, list, tuple, set, str, int, float]): Ключи для получения данных.
137
138        Returns:
139            list: Список объектов модели, представляющих полученные данные.
140        """
141        for key in id_obj:
142            match key:
143                case dict():
144                    self.pipe.hgetall(f"data:{self.model.__tablename__}:{list(key)[0]}")
145                case list() | tuple():
146                    self.pipe.lrange(f"data:{self.model.__tablename__}:{key[0]}", 0, -1)
147                case set():
148                    self.pipe.smembers(name=f"data:{self.model.__tablename__}:{key[0]}")
149                case str() | int() | float():
150                    self.pipe.get(name=f"data:{self.model.__tablename__}:{key}")
151
152                case _:
153                    raise RedisTypeError
154
155        return await self.__call__()
156
157    async def delete(self, *keys, fullkey=False):
158        """Удаляет данные из Redis по заданным ключам.
159
160        Args:
161            *keys (str): Ключи для удаления данных.
162            fullkey (bool, optional): Указывает, нужно ли использовать полные ключи. По умолчанию False.
163
164        Returns:
165            list: Список объектов модели, представляющих удаленные данные.
166        """
167        if not fullkey:
168            keys = [f"data:{self.model.__tablename__}:{key}" for key in keys]
169        if keys:
170            self.pipe.delete(*keys)
171            return await self.__call__()
172
173    async def clear(self, user_id, fullkey=False):
174        """Очищает данные из Redis для указанного пользователя.
175
176        Args:
177            user_id (str): Идентификатор пользователя.
178            fullkey (bool, optional): Указывает, нужно ли использовать полные ключи. По умолчанию False.
179
180        Returns:
181            list: Список объектов модели, представляющих очищенные данные.
182        """
183        if not fullkey:
184            rkeys = []
185            pattern = ":*"
186            for n in range(1, 4):
187                async for key in await iter_redis_keys(f"data{pattern * n}:{user_id}"):
188                    rkeys.append(key)
189        if rkeys:
190            self.pipe.delete(*rkeys)
191            return await self.__call__()
logger = <Logger redis (INFO)>
class CashManager:
 18class CashManager:
 19    """Менеджер для работы с кэшем в Redis."""
 20
 21    class user_id:
 22        """Класс для аннотации типов."""
 23
 24    redis_types = (bytes, str, int, float, NoneType)
 25    """Допустимые типы данных для хранения в Redis."""
 26
 27    def __init__(
 28        self, validation_model: Union[UserData, WgConfig, Transactions]
 29    ) -> None:
 30        """Инициализирует CashManager.
 31
 32        Args:
 33            validation_model (Union[UserData, WgConfig, Transactions]): Модель для валидации данных.
 34        """
 35        self.pipe = redis_engine.pipeline()
 36        self.model = validation_model
 37
 38    @property
 39    def cmd(self):
 40        """Доступ к redis pipeline для выполнения команд.
 41
 42        Returns:
 43            Pipeline: Текущий Redis pipeline.
 44        """
 45        return self.pipe
 46
 47    async def __call__(self):
 48        """Выполняет команды Redis и возвращает результаты.
 49
 50        Returns:
 51            list: Список объектов модели, представляющих результаты выполнения команд.
 52        """
 53        results = await execute_redis_query(self.pipe)
 54
 55        validated_results = []
 56        for result in results:
 57            if result and isinstance(result, (dict, str, tuple, list)):
 58                match result:
 59                    case Mapping():
 60                        validated_results.append(self.model(**result))
 61                    case Iterable():
 62                        validated_results.append(self.model(*result))
 63                    case _:
 64                        validated_results.append(self.model(result))
 65
 66        if len(validated_results):
 67            return validated_results
 68
 69    def converter(self, data):
 70        """Преобразует данные в допустимые типы для Redis.
 71
 72        Args:
 73            data (Union[dict, list, tuple, set]): Данные для преобразования.
 74
 75        Returns:
 76            Union[dict, list, set]: Преобразованные данные.
 77        """
 78        match data:
 79            case dict():
 80                return {
 81                    key: (value if type(value) in self.redis_types else str(value))
 82                    for key, value in data.items()
 83                }
 84            case list() | tuple():
 85                return [
 86                    value if type(value) in self.redis_types else str(value)
 87                    for value in data
 88                ]
 89            case set():
 90                return {
 91                    value if type(value) in self.redis_types else str(value)
 92                    for value in data
 93                }
 94            case _:
 95                return data
 96
 97    async def add(
 98        self,
 99        key_map: dict[user_id, dict | list | tuple | set | str | int | float] = None,
100        **mapping: dict[user_id, dict | list | tuple | set | str | int | float],
101    ):
102        """Добавляет данные в Redis.
103
104        Args:
105            key_map (dict, optional): Словарь ключей и значений для добавления.
106            **mapping (dict): Дополнительные ключи и значения для добавления.
107
108        Examples:
109            add({user_id: {data}})
110        """
111        for key, item in key_map.items() if key_map else mapping.items():
112            if not item:
113                return
114            item = self.converter(item)
115            key = f"data:{self.model.__tablename__}:{key}"
116
117            match item:
118                case dict():
119                    self.pipe.hset(name=key, mapping=item)
120                case list() | tuple():
121                    self.pipe.rpush(key, *item)
122                case set():
123                    self.pipe.sadd(key, *item)
124                case str() | int() | float():
125                    self.pipe.set(name=key, value=item)
126
127                case _:
128                    raise RedisTypeError
129
130            self.pipe.expire(key, timedelta(hours=settings.cash_ttl))
131        await execute_redis_query(self.pipe)
132
133    async def get(self, *id_obj: dict | list | tuple | set | str | int | float):
134        """Получает данные из Redis по заданным ключам.
135
136        Args:
137            *id_obj (Union[dict, list, tuple, set, str, int, float]): Ключи для получения данных.
138
139        Returns:
140            list: Список объектов модели, представляющих полученные данные.
141        """
142        for key in id_obj:
143            match key:
144                case dict():
145                    self.pipe.hgetall(f"data:{self.model.__tablename__}:{list(key)[0]}")
146                case list() | tuple():
147                    self.pipe.lrange(f"data:{self.model.__tablename__}:{key[0]}", 0, -1)
148                case set():
149                    self.pipe.smembers(name=f"data:{self.model.__tablename__}:{key[0]}")
150                case str() | int() | float():
151                    self.pipe.get(name=f"data:{self.model.__tablename__}:{key}")
152
153                case _:
154                    raise RedisTypeError
155
156        return await self.__call__()
157
158    async def delete(self, *keys, fullkey=False):
159        """Удаляет данные из Redis по заданным ключам.
160
161        Args:
162            *keys (str): Ключи для удаления данных.
163            fullkey (bool, optional): Указывает, нужно ли использовать полные ключи. По умолчанию False.
164
165        Returns:
166            list: Список объектов модели, представляющих удаленные данные.
167        """
168        if not fullkey:
169            keys = [f"data:{self.model.__tablename__}:{key}" for key in keys]
170        if keys:
171            self.pipe.delete(*keys)
172            return await self.__call__()
173
174    async def clear(self, user_id, fullkey=False):
175        """Очищает данные из Redis для указанного пользователя.
176
177        Args:
178            user_id (str): Идентификатор пользователя.
179            fullkey (bool, optional): Указывает, нужно ли использовать полные ключи. По умолчанию False.
180
181        Returns:
182            list: Список объектов модели, представляющих очищенные данные.
183        """
184        if not fullkey:
185            rkeys = []
186            pattern = ":*"
187            for n in range(1, 4):
188                async for key in await iter_redis_keys(f"data{pattern * n}:{user_id}"):
189                    rkeys.append(key)
190        if rkeys:
191            self.pipe.delete(*rkeys)
192            return await self.__call__()

Менеджер для работы с кэшем в Redis.

CashManager( validation_model: Union[db.models.userdata.UserData, db.models.wg_config.WgConfig, db.models.transactions.Transactions])
27    def __init__(
28        self, validation_model: Union[UserData, WgConfig, Transactions]
29    ) -> None:
30        """Инициализирует CashManager.
31
32        Args:
33            validation_model (Union[UserData, WgConfig, Transactions]): Модель для валидации данных.
34        """
35        self.pipe = redis_engine.pipeline()
36        self.model = validation_model

Инициализирует CashManager.

Arguments:
  • validation_model (Union[UserData, WgConfig, Transactions]): Модель для валидации данных.
redis_types = (<class 'bytes'>, <class 'str'>, <class 'int'>, <class 'float'>, <class 'NoneType'>)

Допустимые типы данных для хранения в Redis.

pipe
model
cmd
38    @property
39    def cmd(self):
40        """Доступ к redis pipeline для выполнения команд.
41
42        Returns:
43            Pipeline: Текущий Redis pipeline.
44        """
45        return self.pipe

Доступ к redis pipeline для выполнения команд.

Returns:

Pipeline: Текущий Redis pipeline.

def converter(self, data):
69    def converter(self, data):
70        """Преобразует данные в допустимые типы для Redis.
71
72        Args:
73            data (Union[dict, list, tuple, set]): Данные для преобразования.
74
75        Returns:
76            Union[dict, list, set]: Преобразованные данные.
77        """
78        match data:
79            case dict():
80                return {
81                    key: (value if type(value) in self.redis_types else str(value))
82                    for key, value in data.items()
83                }
84            case list() | tuple():
85                return [
86                    value if type(value) in self.redis_types else str(value)
87                    for value in data
88                ]
89            case set():
90                return {
91                    value if type(value) in self.redis_types else str(value)
92                    for value in data
93                }
94            case _:
95                return data

Преобразует данные в допустимые типы для Redis.

Arguments:
  • data (Union[dict, list, tuple, set]): Данные для преобразования.
Returns:

Union[dict, list, set]: Преобразованные данные.

async def add( self, key_map: dict[CashManager.user_id, dict | list | tuple | set | str | int | float] = None, **mapping: dict[CashManager.user_id, dict | list | tuple | set | str | int | float]):
 97    async def add(
 98        self,
 99        key_map: dict[user_id, dict | list | tuple | set | str | int | float] = None,
100        **mapping: dict[user_id, dict | list | tuple | set | str | int | float],
101    ):
102        """Добавляет данные в Redis.
103
104        Args:
105            key_map (dict, optional): Словарь ключей и значений для добавления.
106            **mapping (dict): Дополнительные ключи и значения для добавления.
107
108        Examples:
109            add({user_id: {data}})
110        """
111        for key, item in key_map.items() if key_map else mapping.items():
112            if not item:
113                return
114            item = self.converter(item)
115            key = f"data:{self.model.__tablename__}:{key}"
116
117            match item:
118                case dict():
119                    self.pipe.hset(name=key, mapping=item)
120                case list() | tuple():
121                    self.pipe.rpush(key, *item)
122                case set():
123                    self.pipe.sadd(key, *item)
124                case str() | int() | float():
125                    self.pipe.set(name=key, value=item)
126
127                case _:
128                    raise RedisTypeError
129
130            self.pipe.expire(key, timedelta(hours=settings.cash_ttl))
131        await execute_redis_query(self.pipe)

Добавляет данные в Redis.

Arguments:
  • key_map (dict, optional): Словарь ключей и значений для добавления.
  • **mapping (dict): Дополнительные ключи и значения для добавления.
Examples:

add({user_id: {data}})

async def get(self, *id_obj: dict | list | tuple | set | str | int | float):
133    async def get(self, *id_obj: dict | list | tuple | set | str | int | float):
134        """Получает данные из Redis по заданным ключам.
135
136        Args:
137            *id_obj (Union[dict, list, tuple, set, str, int, float]): Ключи для получения данных.
138
139        Returns:
140            list: Список объектов модели, представляющих полученные данные.
141        """
142        for key in id_obj:
143            match key:
144                case dict():
145                    self.pipe.hgetall(f"data:{self.model.__tablename__}:{list(key)[0]}")
146                case list() | tuple():
147                    self.pipe.lrange(f"data:{self.model.__tablename__}:{key[0]}", 0, -1)
148                case set():
149                    self.pipe.smembers(name=f"data:{self.model.__tablename__}:{key[0]}")
150                case str() | int() | float():
151                    self.pipe.get(name=f"data:{self.model.__tablename__}:{key}")
152
153                case _:
154                    raise RedisTypeError
155
156        return await self.__call__()

Получает данные из Redis по заданным ключам.

Arguments:
  • *id_obj (Union[dict, list, tuple, set, str, int, float]): Ключи для получения данных.
Returns:

list: Список объектов модели, представляющих полученные данные.

async def delete(self, *keys, fullkey=False):
158    async def delete(self, *keys, fullkey=False):
159        """Удаляет данные из Redis по заданным ключам.
160
161        Args:
162            *keys (str): Ключи для удаления данных.
163            fullkey (bool, optional): Указывает, нужно ли использовать полные ключи. По умолчанию False.
164
165        Returns:
166            list: Список объектов модели, представляющих удаленные данные.
167        """
168        if not fullkey:
169            keys = [f"data:{self.model.__tablename__}:{key}" for key in keys]
170        if keys:
171            self.pipe.delete(*keys)
172            return await self.__call__()

Удаляет данные из Redis по заданным ключам.

Arguments:
  • *keys (str): Ключи для удаления данных.
  • fullkey (bool, optional): Указывает, нужно ли использовать полные ключи. По умолчанию False.
Returns:

list: Список объектов модели, представляющих удаленные данные.

async def clear(self, user_id, fullkey=False):
174    async def clear(self, user_id, fullkey=False):
175        """Очищает данные из Redis для указанного пользователя.
176
177        Args:
178            user_id (str): Идентификатор пользователя.
179            fullkey (bool, optional): Указывает, нужно ли использовать полные ключи. По умолчанию False.
180
181        Returns:
182            list: Список объектов модели, представляющих очищенные данные.
183        """
184        if not fullkey:
185            rkeys = []
186            pattern = ":*"
187            for n in range(1, 4):
188                async for key in await iter_redis_keys(f"data{pattern * n}:{user_id}"):
189                    rkeys.append(key)
190        if rkeys:
191            self.pipe.delete(*rkeys)
192            return await self.__call__()

Очищает данные из Redis для указанного пользователя.

Arguments:
  • user_id (str): Идентификатор пользователя.
  • fullkey (bool, optional): Указывает, нужно ли использовать полные ключи. По умолчанию False.
Returns:

list: Список объектов модели, представляющих очищенные данные.

class CashManager.user_id:
21    class user_id:
22        """Класс для аннотации типов."""

Класс для аннотации типов.