import asyncio
class AsyncTaskManager:
def __init__(self, limit):
self.semaphore = asyncio.Semaphore(limit)
async def __aenter__(self):
await self.semaphore.acquire()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
self.semaphore.release()
Создание эффективного пользовательского контекстного менеджера для работы с многозадачностью в Python требует понимания нескольких ключевых аспектов. Основная цель - обеспечить безопасное и предсказуемое управление ресурсами (например, потоками, блокировками, соединениями) в многопоточной или асинхронной среде.
Основные принципы и подходы:
threading.Lock
, threading.RLock
или других примитивов синхронизации: Контекстный менеджер должен гарантировать, что общий ресурс (например, файл, база данных, сетевое соединение) защищен от одновременного доступа из разных потоков или корутин. Классический пример - использование threading.Lock
:
import threading
class ThreadSafeCounter:
def __init__(self):
self._value = 0
self._lock = threading.Lock() # Создаем блокировку
def increment(self):
with self._lock: # Блокировка при входе в контекст
self._value += 1
def get_value(self):
with self._lock:
return self._value
В этом примере self._lock
используется в качестве контекстного менеджера с помощью with self._lock:
. Это гарантирует, что только один поток одновременно может выполнять операции инкремента или чтения значения. При выходе из блока with
блокировка автоматически освобождается.async with
): Для асинхронного программирования (asyncio
) используйте async with
и асинхронные версии примитивов синхронизации (например, asyncio.Lock
).
import asyncio
class AsyncResource:
async def acquire(self):
print("Acquiring resource...")
await asyncio.sleep(0.1) # Имитация длительной операции
print("Resource acquired.")
async def release(self):
print("Releasing resource...")
await asyncio.sleep(0.1) # Имитация длительной операции
print("Resource released.")
class AsyncContextManager:
def __init__(self, resource):
self.resource = resource
async def __aenter__(self):
await self.resource.acquire()
return self.resource
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.resource.release()
async def main():
resource = AsyncResource()
async with AsyncContextManager(resource) as r:
print("Using resource inside context.")
# await asyncio.sleep(0.5)
print("Done using resource.")
if __name__ == "__main__":
asyncio.run(main())
В этом примере AsyncContextManager
является асинхронным контекстным менеджером, который использует методы __aenter__
и __aexit__
для асинхронного захвата и освобождения ресурса.
contextlib
: Модуль contextlib
предоставляет полезные инструменты, такие как contextmanager
и asynccontextmanager
(Python 3.7+), для упрощения создания контекстных менеджеров. Они основаны на генераторах.
import contextlib
import threading
@contextlib.contextmanager
def thread_safe_print(lock):
try:
lock.acquire()
yield
finally:
lock.release()
lock = threading.Lock()
with thread_safe_print(lock):
print("This is a thread-safe print statement.")
import contextlib
import asyncio
@contextlib.asynccontextmanager
async def async_resource_manager():
# Acquire the resource
print("Acquiring resource...")
await asyncio.sleep(0.1)
resource = "some resource"
try:
yield resource
finally:
# Release the resource
print("Releasing resource...")
await asyncio.sleep(0.1)
async def main():
async with async_resource_manager() as res:
print(f"Using {res} inside the context.")
if __name__ == "__main__":
asyncio.run(main())
Функция, помеченная декоратором @contextlib.contextmanager
, становится контекстным менеджером. Код до yield
выполняется при входе в контекст (__enter__
), код после yield
выполняется при выходе (__exit__
). Аналогично для асинхронного контекстного менеджера используется @contextlib.asynccontextmanager
.__exit__
(или __aexit__
). Если исключение произошло внутри блока with
, __exit__
получит информацию об исключении (exc_type
, exc_val
, exc_tb
). Если __exit__
вернет True
, исключение будет подавлено; в противном случае оно будет переброшено.with
произойдет исключение. Именно для этого и используется блок finally
.Пример сложного сценария: пул соединений к базе данных
Представьте себе пул соединений к базе данных, который нужно использовать в многопоточной среде. Контекстный менеджер может обеспечить получение соединения из пула при входе в контекст и возвращение соединения в пул при выходе, обеспечивая блокировку для защиты пула соединений.
import threading
import queue
class ConnectionPool:
def __init__(self, size):
self._size = size
self._connections = queue.Queue(maxsize=size)
self._lock = threading.Lock() # Защита доступа к пулу
for _ in range(size):
self._connections.put(self._create_connection())
def _create_connection(self):
# Simulate creating a database connection
print("Creating a new connection...")
return "Database Connection"
def get_connection(self):
with self._lock:
return self._connections.get()
def release_connection(self, connection):
with self._lock:
self._connections.put(connection)
class ConnectionContextManager:
def __init__(self, pool):
self._pool = pool
self._connection = None
def __enter__(self):
self._connection = self._pool.get_connection()
return self._connection
def __exit__(self, exc_type, exc_val, exc_tb):
if self._connection:
self._pool.release_connection(self._connection)
# Example usage:
pool = ConnectionPool(size=5)
def worker(i):
with ConnectionContextManager(pool) as conn:
print(f"Thread {i}: Using connection {conn}")
# Simulate database operation
#raise Exception("Oops") # uncomment this to see that release happens despite exception
threads = []
for i in range(3):
t = threading.Thread(target=worker, args=(i,))
threads.append(t)
t.start()
for t in threads:
t.join()
print("Done.")
Ключевые моменты при проектировании:
В заключение, создание эффективного контекстного менеджера для многозадачности требует глубокого понимания примитивов синхронизации, правильной обработки исключений и тщательного проектирования, чтобы обеспечить безопасный и эффективный доступ к общим ресурсам.