src.db.utils.redis
Функционал для работы с Redis.
1"""Функционал для работы с Redis.""" 2 3import logging 4from collections.abc import Iterable, Mapping 5from datetime import timedelta 6from types import NoneType 7from typing import Union 8 9from core.config import settings 10from core.exceptions import RedisTypeError 11from db.database import execute_redis_query, iter_redis_keys, redis_engine 12from db.models import Transactions, UserData, WgConfig 13 14logger = logging.getLogger("redis") 15 16 17class CashManager: 18 """Менеджер для работы с кэшем в Redis.""" 19 20 class user_id: 21 """Класс для аннотации типов.""" 22 23 redis_types = (bytes, str, int, float, NoneType) 24 """Допустимые типы данных для хранения в Redis.""" 25 26 def __init__( 27 self, validation_model: Union[UserData, WgConfig, Transactions] 28 ) -> None: 29 """Инициализирует CashManager. 30 31 Args: 32 validation_model (Union[UserData, WgConfig, Transactions]): Модель для валидации данных. 33 """ 34 self.pipe = redis_engine.pipeline() 35 self.model = validation_model 36 37 @property 38 def cmd(self): 39 """Доступ к redis pipeline для выполнения команд. 40 41 Returns: 42 Pipeline: Текущий Redis pipeline. 43 """ 44 return self.pipe 45 46 async def __call__(self): 47 """Выполняет команды Redis и возвращает результаты. 48 49 Returns: 50 list: Список объектов модели, представляющих результаты выполнения команд. 51 """ 52 results = await execute_redis_query(self.pipe) 53 54 validated_results = [] 55 for result in results: 56 if result and isinstance(result, (dict, str, tuple, list)): 57 match result: 58 case Mapping(): 59 validated_results.append(self.model(**result)) 60 case Iterable(): 61 validated_results.append(self.model(*result)) 62 case _: 63 validated_results.append(self.model(result)) 64 65 if len(validated_results): 66 return validated_results 67 68 def converter(self, data): 69 """Преобразует данные в допустимые типы для Redis. 70 71 Args: 72 data (Union[dict, list, tuple, set]): Данные для преобразования. 73 74 Returns: 75 Union[dict, list, set]: Преобразованные данные. 76 """ 77 match data: 78 case dict(): 79 return { 80 key: (value if type(value) in self.redis_types else str(value)) 81 for key, value in data.items() 82 } 83 case list() | tuple(): 84 return [ 85 value if type(value) in self.redis_types else str(value) 86 for value in data 87 ] 88 case set(): 89 return { 90 value if type(value) in self.redis_types else str(value) 91 for value in data 92 } 93 case _: 94 return data 95 96 async def add( 97 self, 98 key_map: dict[user_id, dict | list | tuple | set | str | int | float] = None, 99 **mapping: dict[user_id, dict | list | tuple | set | str | int | float], 100 ): 101 """Добавляет данные в Redis. 102 103 Args: 104 key_map (dict, optional): Словарь ключей и значений для добавления. 105 **mapping (dict): Дополнительные ключи и значения для добавления. 106 107 Examples: 108 add({user_id: {data}}) 109 """ 110 for key, item in key_map.items() if key_map else mapping.items(): 111 if not item: 112 return 113 item = self.converter(item) 114 key = f"data:{self.model.__tablename__}:{key}" 115 116 match item: 117 case dict(): 118 self.pipe.hset(name=key, mapping=item) 119 case list() | tuple(): 120 self.pipe.rpush(key, *item) 121 case set(): 122 self.pipe.sadd(key, *item) 123 case str() | int() | float(): 124 self.pipe.set(name=key, value=item) 125 126 case _: 127 raise RedisTypeError 128 129 self.pipe.expire(key, timedelta(hours=settings.cash_ttl)) 130 await execute_redis_query(self.pipe) 131 132 async def get(self, *id_obj: dict | list | tuple | set | str | int | float): 133 """Получает данные из Redis по заданным ключам. 134 135 Args: 136 *id_obj (Union[dict, list, tuple, set, str, int, float]): Ключи для получения данных. 137 138 Returns: 139 list: Список объектов модели, представляющих полученные данные. 140 """ 141 for key in id_obj: 142 match key: 143 case dict(): 144 self.pipe.hgetall(f"data:{self.model.__tablename__}:{list(key)[0]}") 145 case list() | tuple(): 146 self.pipe.lrange(f"data:{self.model.__tablename__}:{key[0]}", 0, -1) 147 case set(): 148 self.pipe.smembers(name=f"data:{self.model.__tablename__}:{key[0]}") 149 case str() | int() | float(): 150 self.pipe.get(name=f"data:{self.model.__tablename__}:{key}") 151 152 case _: 153 raise RedisTypeError 154 155 return await self.__call__() 156 157 async def delete(self, *keys, fullkey=False): 158 """Удаляет данные из Redis по заданным ключам. 159 160 Args: 161 *keys (str): Ключи для удаления данных. 162 fullkey (bool, optional): Указывает, нужно ли использовать полные ключи. По умолчанию False. 163 164 Returns: 165 list: Список объектов модели, представляющих удаленные данные. 166 """ 167 if not fullkey: 168 keys = [f"data:{self.model.__tablename__}:{key}" for key in keys] 169 if keys: 170 self.pipe.delete(*keys) 171 return await self.__call__() 172 173 async def clear(self, user_id, fullkey=False): 174 """Очищает данные из Redis для указанного пользователя. 175 176 Args: 177 user_id (str): Идентификатор пользователя. 178 fullkey (bool, optional): Указывает, нужно ли использовать полные ключи. По умолчанию False. 179 180 Returns: 181 list: Список объектов модели, представляющих очищенные данные. 182 """ 183 if not fullkey: 184 rkeys = [] 185 pattern = ":*" 186 for n in range(1, 4): 187 async for key in await iter_redis_keys(f"data{pattern * n}:{user_id}"): 188 rkeys.append(key) 189 if rkeys: 190 self.pipe.delete(*rkeys) 191 return await self.__call__()
logger =
<Logger redis (INFO)>
class
CashManager:
18class CashManager: 19 """Менеджер для работы с кэшем в Redis.""" 20 21 class user_id: 22 """Класс для аннотации типов.""" 23 24 redis_types = (bytes, str, int, float, NoneType) 25 """Допустимые типы данных для хранения в Redis.""" 26 27 def __init__( 28 self, validation_model: Union[UserData, WgConfig, Transactions] 29 ) -> None: 30 """Инициализирует CashManager. 31 32 Args: 33 validation_model (Union[UserData, WgConfig, Transactions]): Модель для валидации данных. 34 """ 35 self.pipe = redis_engine.pipeline() 36 self.model = validation_model 37 38 @property 39 def cmd(self): 40 """Доступ к redis pipeline для выполнения команд. 41 42 Returns: 43 Pipeline: Текущий Redis pipeline. 44 """ 45 return self.pipe 46 47 async def __call__(self): 48 """Выполняет команды Redis и возвращает результаты. 49 50 Returns: 51 list: Список объектов модели, представляющих результаты выполнения команд. 52 """ 53 results = await execute_redis_query(self.pipe) 54 55 validated_results = [] 56 for result in results: 57 if result and isinstance(result, (dict, str, tuple, list)): 58 match result: 59 case Mapping(): 60 validated_results.append(self.model(**result)) 61 case Iterable(): 62 validated_results.append(self.model(*result)) 63 case _: 64 validated_results.append(self.model(result)) 65 66 if len(validated_results): 67 return validated_results 68 69 def converter(self, data): 70 """Преобразует данные в допустимые типы для Redis. 71 72 Args: 73 data (Union[dict, list, tuple, set]): Данные для преобразования. 74 75 Returns: 76 Union[dict, list, set]: Преобразованные данные. 77 """ 78 match data: 79 case dict(): 80 return { 81 key: (value if type(value) in self.redis_types else str(value)) 82 for key, value in data.items() 83 } 84 case list() | tuple(): 85 return [ 86 value if type(value) in self.redis_types else str(value) 87 for value in data 88 ] 89 case set(): 90 return { 91 value if type(value) in self.redis_types else str(value) 92 for value in data 93 } 94 case _: 95 return data 96 97 async def add( 98 self, 99 key_map: dict[user_id, dict | list | tuple | set | str | int | float] = None, 100 **mapping: dict[user_id, dict | list | tuple | set | str | int | float], 101 ): 102 """Добавляет данные в Redis. 103 104 Args: 105 key_map (dict, optional): Словарь ключей и значений для добавления. 106 **mapping (dict): Дополнительные ключи и значения для добавления. 107 108 Examples: 109 add({user_id: {data}}) 110 """ 111 for key, item in key_map.items() if key_map else mapping.items(): 112 if not item: 113 return 114 item = self.converter(item) 115 key = f"data:{self.model.__tablename__}:{key}" 116 117 match item: 118 case dict(): 119 self.pipe.hset(name=key, mapping=item) 120 case list() | tuple(): 121 self.pipe.rpush(key, *item) 122 case set(): 123 self.pipe.sadd(key, *item) 124 case str() | int() | float(): 125 self.pipe.set(name=key, value=item) 126 127 case _: 128 raise RedisTypeError 129 130 self.pipe.expire(key, timedelta(hours=settings.cash_ttl)) 131 await execute_redis_query(self.pipe) 132 133 async def get(self, *id_obj: dict | list | tuple | set | str | int | float): 134 """Получает данные из Redis по заданным ключам. 135 136 Args: 137 *id_obj (Union[dict, list, tuple, set, str, int, float]): Ключи для получения данных. 138 139 Returns: 140 list: Список объектов модели, представляющих полученные данные. 141 """ 142 for key in id_obj: 143 match key: 144 case dict(): 145 self.pipe.hgetall(f"data:{self.model.__tablename__}:{list(key)[0]}") 146 case list() | tuple(): 147 self.pipe.lrange(f"data:{self.model.__tablename__}:{key[0]}", 0, -1) 148 case set(): 149 self.pipe.smembers(name=f"data:{self.model.__tablename__}:{key[0]}") 150 case str() | int() | float(): 151 self.pipe.get(name=f"data:{self.model.__tablename__}:{key}") 152 153 case _: 154 raise RedisTypeError 155 156 return await self.__call__() 157 158 async def delete(self, *keys, fullkey=False): 159 """Удаляет данные из Redis по заданным ключам. 160 161 Args: 162 *keys (str): Ключи для удаления данных. 163 fullkey (bool, optional): Указывает, нужно ли использовать полные ключи. По умолчанию False. 164 165 Returns: 166 list: Список объектов модели, представляющих удаленные данные. 167 """ 168 if not fullkey: 169 keys = [f"data:{self.model.__tablename__}:{key}" for key in keys] 170 if keys: 171 self.pipe.delete(*keys) 172 return await self.__call__() 173 174 async def clear(self, user_id, fullkey=False): 175 """Очищает данные из Redis для указанного пользователя. 176 177 Args: 178 user_id (str): Идентификатор пользователя. 179 fullkey (bool, optional): Указывает, нужно ли использовать полные ключи. По умолчанию False. 180 181 Returns: 182 list: Список объектов модели, представляющих очищенные данные. 183 """ 184 if not fullkey: 185 rkeys = [] 186 pattern = ":*" 187 for n in range(1, 4): 188 async for key in await iter_redis_keys(f"data{pattern * n}:{user_id}"): 189 rkeys.append(key) 190 if rkeys: 191 self.pipe.delete(*rkeys) 192 return await self.__call__()
Менеджер для работы с кэшем в Redis.
CashManager( validation_model: Union[db.models.userdata.UserData, db.models.wg_config.WgConfig, db.models.transactions.Transactions])
27 def __init__( 28 self, validation_model: Union[UserData, WgConfig, Transactions] 29 ) -> None: 30 """Инициализирует CashManager. 31 32 Args: 33 validation_model (Union[UserData, WgConfig, Transactions]): Модель для валидации данных. 34 """ 35 self.pipe = redis_engine.pipeline() 36 self.model = validation_model
Инициализирует CashManager.
Arguments:
- validation_model (Union[UserData, WgConfig, Transactions]): Модель для валидации данных.
redis_types =
(<class 'bytes'>, <class 'str'>, <class 'int'>, <class 'float'>, <class 'NoneType'>)
Допустимые типы данных для хранения в Redis.
cmd
38 @property 39 def cmd(self): 40 """Доступ к redis pipeline для выполнения команд. 41 42 Returns: 43 Pipeline: Текущий Redis pipeline. 44 """ 45 return self.pipe
Доступ к redis pipeline для выполнения команд.
Returns:
Pipeline: Текущий Redis pipeline.
def
converter(self, data):
69 def converter(self, data): 70 """Преобразует данные в допустимые типы для Redis. 71 72 Args: 73 data (Union[dict, list, tuple, set]): Данные для преобразования. 74 75 Returns: 76 Union[dict, list, set]: Преобразованные данные. 77 """ 78 match data: 79 case dict(): 80 return { 81 key: (value if type(value) in self.redis_types else str(value)) 82 for key, value in data.items() 83 } 84 case list() | tuple(): 85 return [ 86 value if type(value) in self.redis_types else str(value) 87 for value in data 88 ] 89 case set(): 90 return { 91 value if type(value) in self.redis_types else str(value) 92 for value in data 93 } 94 case _: 95 return data
Преобразует данные в допустимые типы для Redis.
Arguments:
- data (Union[dict, list, tuple, set]): Данные для преобразования.
Returns:
Union[dict, list, set]: Преобразованные данные.
async def
add( self, key_map: dict[CashManager.user_id, dict | list | tuple | set | str | int | float] = None, **mapping: dict[CashManager.user_id, dict | list | tuple | set | str | int | float]):
97 async def add( 98 self, 99 key_map: dict[user_id, dict | list | tuple | set | str | int | float] = None, 100 **mapping: dict[user_id, dict | list | tuple | set | str | int | float], 101 ): 102 """Добавляет данные в Redis. 103 104 Args: 105 key_map (dict, optional): Словарь ключей и значений для добавления. 106 **mapping (dict): Дополнительные ключи и значения для добавления. 107 108 Examples: 109 add({user_id: {data}}) 110 """ 111 for key, item in key_map.items() if key_map else mapping.items(): 112 if not item: 113 return 114 item = self.converter(item) 115 key = f"data:{self.model.__tablename__}:{key}" 116 117 match item: 118 case dict(): 119 self.pipe.hset(name=key, mapping=item) 120 case list() | tuple(): 121 self.pipe.rpush(key, *item) 122 case set(): 123 self.pipe.sadd(key, *item) 124 case str() | int() | float(): 125 self.pipe.set(name=key, value=item) 126 127 case _: 128 raise RedisTypeError 129 130 self.pipe.expire(key, timedelta(hours=settings.cash_ttl)) 131 await execute_redis_query(self.pipe)
Добавляет данные в Redis.
Arguments:
- key_map (dict, optional): Словарь ключей и значений для добавления.
- **mapping (dict): Дополнительные ключи и значения для добавления.
Examples:
add({user_id: {data}})
async def
get(self, *id_obj: dict | list | tuple | set | str | int | float):
133 async def get(self, *id_obj: dict | list | tuple | set | str | int | float): 134 """Получает данные из Redis по заданным ключам. 135 136 Args: 137 *id_obj (Union[dict, list, tuple, set, str, int, float]): Ключи для получения данных. 138 139 Returns: 140 list: Список объектов модели, представляющих полученные данные. 141 """ 142 for key in id_obj: 143 match key: 144 case dict(): 145 self.pipe.hgetall(f"data:{self.model.__tablename__}:{list(key)[0]}") 146 case list() | tuple(): 147 self.pipe.lrange(f"data:{self.model.__tablename__}:{key[0]}", 0, -1) 148 case set(): 149 self.pipe.smembers(name=f"data:{self.model.__tablename__}:{key[0]}") 150 case str() | int() | float(): 151 self.pipe.get(name=f"data:{self.model.__tablename__}:{key}") 152 153 case _: 154 raise RedisTypeError 155 156 return await self.__call__()
Получает данные из Redis по заданным ключам.
Arguments:
- *id_obj (Union[dict, list, tuple, set, str, int, float]): Ключи для получения данных.
Returns:
list: Список объектов модели, представляющих полученные данные.
async def
delete(self, *keys, fullkey=False):
158 async def delete(self, *keys, fullkey=False): 159 """Удаляет данные из Redis по заданным ключам. 160 161 Args: 162 *keys (str): Ключи для удаления данных. 163 fullkey (bool, optional): Указывает, нужно ли использовать полные ключи. По умолчанию False. 164 165 Returns: 166 list: Список объектов модели, представляющих удаленные данные. 167 """ 168 if not fullkey: 169 keys = [f"data:{self.model.__tablename__}:{key}" for key in keys] 170 if keys: 171 self.pipe.delete(*keys) 172 return await self.__call__()
Удаляет данные из Redis по заданным ключам.
Arguments:
- *keys (str): Ключи для удаления данных.
- fullkey (bool, optional): Указывает, нужно ли использовать полные ключи. По умолчанию False.
Returns:
list: Список объектов модели, представляющих удаленные данные.
async def
clear(self, user_id, fullkey=False):
174 async def clear(self, user_id, fullkey=False): 175 """Очищает данные из Redis для указанного пользователя. 176 177 Args: 178 user_id (str): Идентификатор пользователя. 179 fullkey (bool, optional): Указывает, нужно ли использовать полные ключи. По умолчанию False. 180 181 Returns: 182 list: Список объектов модели, представляющих очищенные данные. 183 """ 184 if not fullkey: 185 rkeys = [] 186 pattern = ":*" 187 for n in range(1, 4): 188 async for key in await iter_redis_keys(f"data{pattern * n}:{user_id}"): 189 rkeys.append(key) 190 if rkeys: 191 self.pipe.delete(*rkeys) 192 return await self.__call__()
Очищает данные из Redis для указанного пользователя.
Arguments:
- user_id (str): Идентификатор пользователя.
- fullkey (bool, optional): Указывает, нужно ли использовать полные ключи. По умолчанию False.
Returns:
list: Список объектов модели, представляющих очищенные данные.
class
CashManager.user_id:
Класс для аннотации типов.