import asyncio
    class AsyncTaskManager:
        def __init__(self, limit):
            self.semaphore = asyncio.Semaphore(limit)
        async def __aenter__(self):
            await self.semaphore.acquire()
            return self
        async def __aexit__(self, exc_type, exc_val, exc_tb):
            self.semaphore.release()
  Создание эффективного пользовательского контекстного менеджера для работы с многозадачностью в Python требует понимания нескольких ключевых аспектов. Основная цель - обеспечить безопасное и предсказуемое управление ресурсами (например, потоками, блокировками, соединениями) в многопоточной или асинхронной среде.
Основные принципы и подходы:
threading.Lock, threading.RLock или других примитивов синхронизации: Контекстный менеджер должен гарантировать, что общий ресурс (например, файл, база данных, сетевое соединение) защищен от одновременного доступа из разных потоков или корутин.  Классический пример - использование threading.Lock:
      
import threading
class ThreadSafeCounter:
    def __init__(self):
        self._value = 0
        self._lock = threading.Lock()  # Создаем блокировку
    def increment(self):
        with self._lock:  # Блокировка при входе в контекст
            self._value += 1
    def get_value(self):
        with self._lock:
            return self._value
self._lock используется в качестве контекстного менеджера с помощью with self._lock:.  Это гарантирует, что только один поток одновременно может выполнять операции инкремента или чтения значения.  При выходе из блока with блокировка автоматически освобождается.async with):  Для асинхронного программирования (asyncio) используйте async with и асинхронные версии примитивов синхронизации (например, asyncio.Lock).
      
import asyncio
class AsyncResource:
    async def acquire(self):
        print("Acquiring resource...")
        await asyncio.sleep(0.1)  # Имитация длительной операции
        print("Resource acquired.")
    async def release(self):
        print("Releasing resource...")
        await asyncio.sleep(0.1)  # Имитация длительной операции
        print("Resource released.")
class AsyncContextManager:
    def __init__(self, resource):
        self.resource = resource
    async def __aenter__(self):
        await self.resource.acquire()
        return self.resource
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        await self.resource.release()
async def main():
    resource = AsyncResource()
    async with AsyncContextManager(resource) as r:
        print("Using resource inside context.")
        # await asyncio.sleep(0.5)
        print("Done using resource.")
if __name__ == "__main__":
    asyncio.run(main())
AsyncContextManager является асинхронным контекстным менеджером, который использует методы __aenter__ и __aexit__ для асинхронного захвата и освобождения ресурса.
    contextlib: Модуль contextlib предоставляет полезные инструменты, такие как contextmanager и asynccontextmanager (Python 3.7+), для упрощения создания контекстных менеджеров.  Они основаны на генераторах.
      
import contextlib
import threading
@contextlib.contextmanager
def thread_safe_print(lock):
    try:
        lock.acquire()
        yield
    finally:
        lock.release()
lock = threading.Lock()
with thread_safe_print(lock):
    print("This is a thread-safe print statement.")
import contextlib
import asyncio
@contextlib.asynccontextmanager
async def async_resource_manager():
    # Acquire the resource
    print("Acquiring resource...")
    await asyncio.sleep(0.1)
    resource = "some resource"
    try:
        yield resource
    finally:
        # Release the resource
        print("Releasing resource...")
        await asyncio.sleep(0.1)
async def main():
    async with async_resource_manager() as res:
        print(f"Using {res} inside the context.")
if __name__ == "__main__":
    asyncio.run(main())
@contextlib.contextmanager, становится контекстным менеджером.  Код до yield выполняется при входе в контекст (__enter__), код после yield выполняется при выходе (__exit__).  Аналогично для асинхронного контекстного менеджера используется @contextlib.asynccontextmanager.__exit__ (или __aexit__).  Если исключение произошло внутри блока with, __exit__ получит информацию об исключении (exc_type, exc_val, exc_tb).  Если __exit__ вернет True, исключение будет подавлено; в противном случае оно будет переброшено.with произойдет исключение.  Именно для этого и используется блок finally.Пример сложного сценария: пул соединений к базе данных
Представьте себе пул соединений к базе данных, который нужно использовать в многопоточной среде. Контекстный менеджер может обеспечить получение соединения из пула при входе в контекст и возвращение соединения в пул при выходе, обеспечивая блокировку для защиты пула соединений.
import threading
import queue
class ConnectionPool:
    def __init__(self, size):
        self._size = size
        self._connections = queue.Queue(maxsize=size)
        self._lock = threading.Lock()  # Защита доступа к пулу
        for _ in range(size):
            self._connections.put(self._create_connection())
    def _create_connection(self):
        # Simulate creating a database connection
        print("Creating a new connection...")
        return "Database Connection"
    def get_connection(self):
        with self._lock:
            return self._connections.get()
    def release_connection(self, connection):
        with self._lock:
            self._connections.put(connection)
class ConnectionContextManager:
    def __init__(self, pool):
        self._pool = pool
        self._connection = None
    def __enter__(self):
        self._connection = self._pool.get_connection()
        return self._connection
    def __exit__(self, exc_type, exc_val, exc_tb):
        if self._connection:
            self._pool.release_connection(self._connection)
# Example usage:
pool = ConnectionPool(size=5)
def worker(i):
    with ConnectionContextManager(pool) as conn:
        print(f"Thread {i}: Using connection {conn}")
        # Simulate database operation
        #raise Exception("Oops")  # uncomment this to see that release happens despite exception
threads = []
for i in range(3):
    t = threading.Thread(target=worker, args=(i,))
    threads.append(t)
    t.start()
for t in threads:
    t.join()
print("Done.")
Ключевые моменты при проектировании:
В заключение, создание эффективного контекстного менеджера для многозадачности требует глубокого понимания примитивов синхронизации, правильной обработки исключений и тщательного проектирования, чтобы обеспечить безопасный и эффективный доступ к общим ресурсам.