В многозадачных Python приложениях, где управление сессиями необходимо для каждого потока или задачи, декораторы могут быть использованы для автоматизации процесса создания, инициализации и завершения сессий. Вот как это можно сделать:
Основная идея: Декоратор обертывает функцию, представляющую собой задачу или обработчик запроса, и обеспечивает создание сессии перед выполнением функции и закрытие (или коммит) сессии после ее выполнения.
Пример реализации:
import threading
from contextlib import contextmanager
# Гипотетическая SessionFactory (например, SQLAlchemy Session)
class SessionFactory:
def __init__(self):
self.local = threading.local()
@contextmanager
def session(self):
# Создаем сессию для текущего потока
if not hasattr(self.local, 'session'):
self.local.session = self.create_session() # Замените на вашу логику создания сессии
try:
yield self.local.session
self.local.session.commit() # Коммитим изменения
except Exception as e:
self.local.session.rollback() # Откатываем изменения в случае ошибки
raise e
finally:
self.local.session.close() # Закрываем сессию
del self.local.session # Удаляем сессию из threading.local
def create_session(self):
# Здесь должна быть логика создания вашей сессии (например, SQLAlchemy)
print(f"Создаю сессию для {threading.current_thread().name}")
return MockSession() # Заменяем на настоящую Session
class MockSession:
def commit(self):
print(f"Коммитим сессию для {threading.current_thread().name}")
def rollback(self):
print(f"Откатываем сессию для {threading.current_thread().name}")
def close(self):
print(f"Закрываем сессию для {threading.current_thread().name}")
session_factory = SessionFactory()
def with_session(func):
def wrapper(*args, **kwargs):
with session_factory.session() as session:
return func(session, *args, **kwargs) # Передаем сессию в функцию
return wrapper
# Пример использования
@with_session
def my_task(session, data):
print(f"Выполняю задачу с данными: {data} в {threading.current_thread().name} и сессией: {session}")
# Здесь можно взаимодействовать с базой данных через session
import threading
def run_task(data):
my_task(data)
threads = []
for i in range(3):
thread = threading.Thread(target=run_task, args=(f"data_{i}",), name=f"Thread-{i}")
threads.append(thread)
thread.start()
for thread in threads:
thread.join()
print("Все задачи завершены")
Пояснения:
SessionFactory
: Класс, отвечающий за создание сессий. Используется threading.local()
, чтобы гарантировать, что каждый поток получит свою собственную сессию. Контекстный менеджер session
обеспечивает автоматическое создание, коммит/откат и закрытие сессии.with_session
: Декоратор, который принимает функцию (func
) и возвращает новую функцию (wrapper
). Внутри wrapper
используется SessionFactory.session()
как контекстный менеджер. Это гарантирует, что сессия будет создана перед вызовом func
, и закрыта (или закоммичена/откачена) после его завершения. Сессия передается как первый аргумент в обернутую функцию.threading.local()
: Ключевой элемент для многозадачности. Он предоставляет хранилище данных, локальное для каждого потока. Каждый поток имеет свой собственный экземпляр данных, хранящихся в threading.local()
, что предотвращает конфликты между потоками.Преимущества:
Важно: Приведенный код - это пример. Реализация SessionFactory
и create_session()
должна соответствовать используемой вами библиотеке для работы с базами данных (например, SQLAlchemy, Django ORM). Особое внимание следует уделить обработке исключений и правильному закрытию сессий в любых ситуациях.