asyncio
в многозадачных приложениях требует внимания к блокирующим операциям. Если задача блокирует EventLoop, это нарушает асинхронность и снижает производительность. Вот как правильно:
asyncio.to_thread
или concurrent.futures
: Для запуска блокирующих операций в отдельном потоке. Это позволяет EventLoop оставаться отзывчивым. await asyncio.to_thread(blocking_function, *args)
.aiofiles
для асинхронной работы с файлами, aiohttp
для асинхронных HTTP-запросов.В многозадачных приложениях, использующих asyncio
, ключевая проблема возникает, когда некоторые задачи, выполняемые внутри корутин, блокируют EventLoop. Это происходит, когда задача выполняет синхронные операции ввода-вывода, длительные вычисления или ожидает ресурсы, не использующие асинхронные примитивы.
Последствия блокировки EventLoop:
Стратегии предотвращения блокировки EventLoop:
Предпочитайте асинхронные версии библиотек для операций ввода-вывода (например, aiohttp
вместо requests
, aiopg
вместо psycopg2
). Эти библиотеки разработаны для работы с EventLoop и не блокируют его при ожидании ввода-вывода. Используйте их для работы с сетью, базами данных, файлами и другими внешними ресурсами.
Пример:
import asyncio
import aiohttp
async def fetch_url(url):
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
return await response.text()
async def main():
result = await fetch_url("https://example.com")
print(result)
if __name__ == "__main__":
asyncio.run(main())
Если необходимо выполнить синхронную, блокирующую операцию, используйте loop.run_in_executor()
для выполнения этой операции в отдельном потоке или процессе из пула потоков/процессов. Это позволяет разгрузить EventLoop и избежать его блокировки.
Пример:
import asyncio
import time
import concurrent.futures
def blocking_io():
# Выполняем длительную синхронную операцию (например, sleep)
print(f"Начало блокирующей операции в потоке {threading.current_thread().name}")
time.sleep(2)
print(f"Конец блокирующей операции в потоке {threading.current_thread().name}")
return "Результат блокирующей операции"
async def main():
loop = asyncio.get_running_loop()
# Использовать стандартный ThreadPoolExecutor:
with concurrent.futures.ThreadPoolExecutor() as pool:
result = await loop.run_in_executor(pool, blocking_io)
print(f"Получен результат: {result}")
if __name__ == "__main__":
import threading
asyncio.run(main())
В этом примере blocking_io
выполняется в отдельном потоке, не блокируя EventLoop.
Разбивайте длительные вычислительные задачи на более мелкие корутины, которые будут периодически уступать управление EventLoop с помощью await asyncio.sleep(0)
или await asyncio.shield(asyncio.sleep(0))
. Это дает другим корутинам шанс выполниться и избежать блокировки. asyncio.sleep(0)
позволяет EventLoop выполнить другие ожидающие задачи. asyncio.shield
предотвращает отмену текущей задачи во время этого "паузы".
Пример:
import asyncio
async def long_running_task():
for i in range(10):
print(f"Шаг {i}")
await asyncio.sleep(0) # Уступаем управление EventLoop
async def main():
await asyncio.gather(long_running_task(), long_running_task())
if __name__ == "__main__":
asyncio.run(main())
Вместо блокирующих операций, используйте неблокирующие альтернативы, где это возможно. Например, если вам нужно прочитать данные из сокета, используйте asyncio.create_task()
или asyncio.wait()
в сочетании с асинхронными сокетами.
Используйте инструменты профилирования и мониторинга, такие как asyncio.get_running_loop().slow_callback_duration
и инструменты сторонних разработчиков (например, инструменты APM), чтобы выявлять узкие места в производительности и задачи, которые могут блокировать EventLoop. Мониторинг времени выполнения корутин может помочь выявить проблемные места.
Используйте asyncio.wait_for()
чтобы ограничить время выполнения каждой корутины. Если корутина занимает слишком много времени, её можно отменить (asyncio.CancelledError
) и обработать эту ситуацию. Это предотвращает "зависание" EventLoop из-за одной проблемной задачи.
import asyncio
async def potentially_long_task():
try:
await asyncio.sleep(10) # Симулируем долгую операцию
return "Задача завершена"
except asyncio.CancelledError:
print("Задача отменена!")
return "Задача отменена"
async def main():
try:
result = await asyncio.wait_for(potentially_long_task(), timeout=2) # Ждем максимум 2 секунды
print(f"Результат: {result}")
except asyncio.TimeoutError:
print("Превышено время ожидания задачи!")
if __name__ == "__main__":
asyncio.run(main())
Ключевые выводы:
loop.run_in_executor()
для блокирующих операций.