asyncio
с использованием asyncio.Queue
, ключевой момент - распараллеливание обработки:
asyncio.Queue
). Обычно это корутины, которые получают данные (например, из API, БД) и кладут их в очередь. Важно избегать блокирующих операций в этих корутинах.asyncio.Queue
служит буфером между производителями и потребителями. Автоматически асинхронно управляет доступом к данным, чтобы не было гонок данных.asyncio.to_thread
или concurrent.futures.ThreadPoolExecutor
для CPU-bound задач.asyncio.create_task
для запуска нескольких потребителей. Также необходимо обеспечить graceful shutdown, например, через asyncio.gather
. Можно реализовать backpressure, если очередь заполнится, чтобы производители не перегружали систему.
Обработка высоконагруженных задач в asyncio
с использованием очередей (asyncio.Queue
) позволяет распределить нагрузку и предотвратить блокировку event loop, что критично для поддержания отзывчивости приложения.
Основные принципы:
asyncio.Queue
- это потокобезопасная очередь, предназначенная для асинхронной работы. Она позволяет передавать задачи от producer'ов (например, веб-сервера, получающего запросы) к consumer'ам (worker'ам, выполняющим обработку).asyncio.to_thread()
или concurrent.futures.ThreadPoolExecutor
/ProcessPoolExecutor
, чтобы перенести эту операцию в отдельный поток или процесс.Пример кода:
import asyncio
import time
async def worker(name: str, queue: asyncio.Queue):
print(f'Worker {name}: starting')
while True:
# Get a "work item" out of the queue.
task = await queue.get()
print(f'Worker {name}: Processing {task}')
# Simulate workload (replace with actual task processing)
try:
await asyncio.sleep(task) # Replace with actual processing logic
except asyncio.CancelledError:
print(f"Worker {name}: Cancelled while processing {task}")
break
# Signal to the queue that the "work item" has been processed.
queue.task_done()
print(f'Worker {name}: Completed {task}')
async def main():
# Create a queue that we will use to store our "workload".
queue = asyncio.Queue()
# Generate random workloads and add them to the queue.
total_tasks = 10
for i in range(total_tasks):
await queue.put(i + 1) # Simulate different processing times
# Create three worker tasks to process the queue concurrently.
tasks = []
num_workers = 3
for i in range(num_workers):
task = asyncio.create_task(worker(f'worker-{i+1}', queue))
tasks.append(task)
# Wait until the queue is fully processed.
await queue.join()
# Cancel our worker tasks.
for task in tasks:
task.cancel()
# Wait until all worker tasks are cancelled.
await asyncio.gather(*tasks, return_exceptions=True)
print('Finished')
if __name__ == "__main__":
asyncio.run(main())
Разъяснение кода:
worker()
- асинхронная функция, представляющая собой worker'а. Она бесконечно ждет задачи из очереди (queue.get()
), обрабатывает их (в примере - имитация обработки через asyncio.sleep()
), и сообщает очереди о завершении задачи (queue.task_done()
).main()
- асинхронная функция, создающая очередь (asyncio.Queue()
), наполняющая её задачами (queue.put()
), и запускающая несколько worker'ов.queue.join()
- ожидает, пока все элементы очереди не будут обработаны.task.cancel()
и дожидаются их завершения с помощью asyncio.gather(*tasks, return_exceptions=True)
. Это важно для корректного завершения программы.Важные моменты:
try...except
для перехвата исключений.asyncio.to_thread()
, чтобы перенести их выполнение в отдельный поток. Это позволит избежать блокировки event loop.Использование asyncio.Queue
в сочетании с асинхронными функциями и правильной обработкой блокирующих операций позволяет эффективно обрабатывать высоконагруженные задачи в asyncio
, сохраняя при этом отзывчивость приложения.