Как использовать `asyncio` для параллельного выполнения независимых задач с учетом ограничений по времени?

Используйте asyncio.gather для запуска независимых корутин параллельно. Для ограничения времени, примените asyncio.wait_for к каждой корутине или к asyncio.gather. В случае таймаута, asyncio.wait_for вызовет asyncio.TimeoutError, который следует обработать, чтобы корректно завершить задачу.

Чтобы использовать asyncio для параллельного выполнения независимых задач с учетом ограничений по времени, можно применить несколько техник:

  1. Создание и запуск задач: Независимые задачи оборачиваются в async функции (корутины). Для их выполнения в event loop создаются asyncio.Task объекты. asyncio.create_task(корутина) создает задачу, которая автоматически ставится в очередь на выполнение в event loop.
  2. asyncio.gather для параллельного выполнения: asyncio.gather(*задачи) позволяет запустить несколько задач одновременно. Эта функция дождется завершения всех задач и вернет список результатов (в том же порядке, в котором задачи были переданы). Если хотя бы одна задача завершится с ошибкой (исключением), то asyncio.gather перехватит его и повторно поднимет после завершения всех остальных задач. Можно указать return_exceptions=True, чтобы возвращались результаты и исключения.
  3. asyncio.wait для управления временем выполнения и завершения задач: asyncio.wait(задачи, timeout=время_в_секундах, return_when=условие) позволяет контролировать, сколько времени ожидать завершения задач и при каком условии вернуть управление. return_when может принимать значения:
    • asyncio.FIRST_COMPLETED: Вернет управление, как только первая задача завершится.
    • asyncio.FIRST_EXCEPTION: Вернет управление, как только первая задача завершится с исключением.
    • asyncio.ALL_COMPLETED: Вернет управление, когда все задачи завершатся (по умолчанию).
    Функция asyncio.wait возвращает кортеж из двух множеств: (done, pending), где done содержит завершенные задачи, а pending - задачи, которые не были завершены до истечения времени ожидания (или по другому условию).
  4. Таймауты на отдельные задачи (asyncio.timeout в Python 3.11+ или asyncio.wait_for в более ранних версиях): Можно задать таймаут на выполнение *каждой* отдельной задачи, чтобы предотвратить "зависание" конкретной корутины.
    • Python 3.11+: asyncio.timeout(время_в_секундах, корутина) позволяет выполнить корутину с ограничением по времени. Если корутина не завершится в течение указанного времени, будет поднято исключение asyncio.TimeoutError.
    • Python < 3.11: Используется asyncio.wait_for(корутина, timeout=время_в_секундах). Этот метод выполняет аналогичную функцию, но немного менее элегантен с точки зрения синтаксиса.
  5. Отмена задач (task.cancel()): Если задача не завершилась в течение отведенного времени (например, после использования asyncio.wait), ее можно отменить с помощью task.cancel(). Важно отметить, что отмена задачи – это запрос, а не приказ. Корутина должна корректно обрабатывать исключение asyncio.CancelledError, которое будет поднято в отмененной задаче, и освобождать ресурсы (например, закрывать соединения).

Пример кода:


import asyncio

async def my_task(task_id, delay):
    try:
        print(f"Задача {task_id}: начало (задержка {delay} сек)")
        await asyncio.sleep(delay)
        print(f"Задача {task_id}: завершена")
        return f"Результат задачи {task_id}"
    except asyncio.CancelledError:
        print(f"Задача {task_id}: отменена")
        return None  # или другое значение по умолчанию
    except Exception as e:
        print(f"Задача {task_id}: произошла ошибка: {e}")
        return None


async def main():
    tasks = [
        asyncio.create_task(my_task(1, 2)),
        asyncio.create_task(my_task(2, 5)),
        asyncio.create_task(my_task(3, 1))
    ]

    try:
        done, pending = await asyncio.wait(tasks, timeout=3, return_when=asyncio.FIRST_COMPLETED)

        print("Завершенные задачи:")
        for task in done:
            try:
                result = task.result() # Может поднять исключение, если задача завершилась с ошибкой
                print(f"  - {result}")
            except asyncio.CancelledError:
                print(f"  - Задача отменена")
            except Exception as e:
                print(f"  - Задача завершилась с ошибкой: {e}")

        print("Ожидающие задачи:")
        for task in pending:
            print(f"  - Отмена задачи")
            task.cancel() # Запрашиваем отмену
            try:
                await task # Дожидаемся отмены (необязательно, но рекомендуется)
            except asyncio.CancelledError:
                print("  - Задача успешно отменена")


    except Exception as e:
      print(f"Произошла ошибка в main: {e}")


if __name__ == "__main__":
    asyncio.run(main())

В этом примере:

  • Создаются три задачи, каждая из которых имитирует работу с задержкой.
  • asyncio.wait используется для ожидания завершения задач в течение 3 секунд или до завершения первой задачи.
  • После истечения времени ожидания завершенные задачи обрабатываются, а ожидающие задачи отменяются. Обратите внимание на обработку исключений, чтобы код был более надежным.

Важно: Не забывайте обрабатывать asyncio.CancelledError в ваших корутинах, чтобы корректно освобождать ресурсы при отмене задачи.

0